智能日志管理平台

  • 智能日志管理平台 > 使用文档 > 工作流管理(pipeline) >工作流使用指南 >数据计算

    数据计算

    最近更新时间: 2018-07-17 17:51:36

    工作流提供两种计算模式供您选择:流式计算批量计算。流式计算即实时计算,批量计算即离线计算。

    计算的触发模式分为调度执行和手动执行:

    调度执行:将工作流任务启动后,系统自动将自动开始运行,并按照条件执行调度;
    手动执行:工作流运行一遍就停止。

    流式计算的计算任务属于调度执行模式,批量计算的计算任务可以是调度执行模式(定时执行循环执行)或者手动执行模式。

    注意1:每一个工作流任务都可以同时包含一个或多个同种模式的计算任务,并且计算之间可以串行。

    注意2:一个工作流任务的不能同时包含调度模式(流式计算/定时批量计算/循环批量计算)和手动执行模式(手动执行)两种计算模式。

    流式计算

    流式计算相关参数填写

    参数 必填 说明
    名称 计算任务名称
    容器类型 计算任务需要的物理资源,不同的计算任务之间的计算资源互相隔离,互不影响
    计算模式 SQL 语句计算/自定义计算,两种计算方式可以并存,并且优先执行自定义计算
    间隔时间 计算任务的运行时间间隔
    数据起始位置 从最早还是最新数据开始计算

    SQL 计算就是自己编写 SQL 语句,对数据源或消息队列中的数据进行分批处理,并且以一定的时间粒度(可以自行设置,最小的粒度为 5 秒)进行聚合,将结果自动发送到一个新的消息队列中。

    批量计算

    批量计算相关参数填写

    参数 必填 说明
    名称 计算任务名称
    容器类型 计算任务需要的资源
    数据源名称 当数据源是对象存储节点的时候,支持多个数据源 JOIN 运算,因此需要指定数据源
    数据库表名 执行 SQL 计算的数据库表名
    触发方式 设置计算的触发方式:定时启动/循环启动/手动执行

    流式计算中,您可能遇到 SQL 满足不了的计算需求,这时您可以通过自定义计算完成。在这里我们提供引用 Plugin 和 UDF 帮您完成自定义计算。

    自定义计算

    引用 UDF

    UDF(User-Defined Function):用户自定义函数。

    在工作流中,UDF 和 Plugin 的差别如下:

    UDF 注册的函数只适用于 SQL 语句中;

    工作流 提供了提供了大约 50 多种内置 UDF 函数;如果这些函数不能满足您的需求,那么您也可以自行编写 UDF,提交 Jar 包并添加到 UDF 函数管理,提交后即可使用。

    自定义 UDF 的过程与自定义 Plugin 类似,如下:

    UDF 模板下载链接--->>>点此跳转

    解压后,使用 Java IDE 导入 Pandora-UDF 项目。

    等待项目依赖加载完成后,可以在 src/main/java/com.pandora/ 目录下查看一个简单的示例。

    这个示例中包含了一个名为 SimpleUdf 的 Class,在这个 Class 中有 4 个方法:

    1. String parseTime(String t)
    将 Input RFC3339 格式转为 date time 时间格式
    @param input rfc3339 格式,形如 2017-04-05T16:41:42.651614Z
    @return 返回date time格式时间 形如 2017-04-05 16:41:42
    如 parseTime("2017-04-05T16:41:42.651614Z")
    
    2. String parseTime(long t)
    将时间戳转为 date time 时间格式
    @param input 时间戳,单位为毫秒
    @return 返回date time格式时间 形如 2017-04-05 16:41:42
    如 parseTime(1499324233000)
    
    3. String parseTime(long t, String unit)
    将 Input RFC3339 格式转为 date time 时间格式
    @param input 时间戳,单位为毫秒
    @param unit 指定时间戳的单位,支持 s (秒), ms(毫秒), us(微妙), ns (纳秒)
    @return 返回date time格式时间 形如 2017-04-05 16:41:42
    如 parseTime(1499324233000, "ms")
    
    4. String parseTime(long t, String unit)
    将 Input RFC3339 格式转为 date time 时间格式
    @param input 时间戳,单位为毫秒
    @param unit 指定时间精度,1毫秒等于多少该精度单位时间
    @return 返回date time格式时间 形如 2017-04-05 16:41:42
    如 parseTime(149932423300000000, 100000) 解析百纳秒时间戳
    
    

    您可以在 src/main/java/com.pandora/ 目录下新建 Class 和方法,并在方法中编写 UDF 逻辑,代码编写完成后,需要将这个工程打成 Jar 包并上传至工作流,新增到自定义函数里,然后就可以使用这个 UDF 了。

    !> 注意:Jar 包名称中不可包含-_号。

    在 SQL 中使用 UDF:

    SELECT
      parseTime(t1) t1,
      parseTime(t2) t2,
      parseTime(t3, "s") t3, 
      parseTime(t4, 100000) t4 
    from 
      stream
    

    引用 Plugin

    通过下载我们提供的代码模板,在此基础上编写您的代码(输入类、输出类、业务逻辑代码),打成 Jar 包。在工作流的工作界面,您可以通过 Plugin 管理 上传您的 Jar 包到工作流,之后您就可以在流式计算中引用您自定义的 Plugin 来实现更为复杂的数据计算啦!

    Plugin 模板下载链接--->>>点此跳转

    解压后,使用 Java IDE 导入 Pandora-Plugin 项目。

    等待项目依赖加载完成后,可以在 src/main/java/demo/ 目录下查看一个简单的示例。

    src/main/java/userWrite/ 目录下是用户自行编写代码的地方。

    编写输入&输出类

    首先,我们要定义我们编写代码的输入和输出。

    输入通常是工作流中的消息队列,那么我们只需要将消息队列中的字段信息原封不动的复制一份即可,而输出是由我们自己编写的代码决定的,所以也需要提前定义好我们的输出信息。

    负责数据源的类是 src/main/java/userWrite/bean/SourceData.java

    负责输出数据的类是 src/main/java/userWrite/bean/OutputData.java

    我们只需要根据 src/main/java/demo/ 目录下的示例进行编写即可,代码中也有很详细的注释。

    编写业务逻辑代码

    业务逻辑代码的编写在 src/main/java/userWrite/customLogic/customlogic.java 中的 parse 方法中。

    注意 1:customlogic.java 此类名称可自由更改;最终的返回必须是List<OutputData>

    打包上传

    代码编写完成后,需要打成 Jar 包,然后上传至工作流平台。

    注意 1:Jar 包名称必须和 src/main/java/userWrite/customLogic/ 目录下包含 parse 方法的类名称一致。

    注意 2:目前 Plugin 仅支持 Java 语言。

    注意 3:和 Plugin 相比,推荐用户使用 UDF 来计算。

    魔法变量

    魔法变量的概念类似于编程语言中的变量,即您可以定义一个变量,在数据计算中或者 CDN 日志数据源类型的文件过滤条件中引用。目前魔法变量仅支持时间类型的值。

    目前系统提供了 8 个内置变量,只能引用,不可修改和删除:

    变量名称 类型 格式
    date 时间表达式 $(date) yyyy-MM-dd
    day 时间表达式 $(day) dd
    hour 时间表达式 $(hour) HH
    min 时间表达式 $(min) mm
    mon 时间表达式 $(mon) MM
    now 时间表达式 $(now) yyyy-MM-dd HH:mm:ss
    sec 时间表达式 $(sec) ss
    year 时间表达式 $(year) yyyy

    用户也可以自行创建魔法变量,并且创建的魔法变量的值可以引用系统内置变量。

    当我们需要使用魔法变量的时候,只需要输入 $(变量名称) 即可,如:

    select * from stream where time = $(now)
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close