机器数据分析平台

  • 机器数据分析平台 > SDK 下载 > 服务端SDK >pdr-python-sdk > 自定义算子开发流程

    自定义算子开发流程

    最近更新时间: 2021-03-03 15:37:34

    Pandora支持 SPL 高级搜索分析语法,以管道符的形式表达数据分析的过程,每一步使用对应的SPL搜索命令完成相应的数据分析。平台内置提供了 100 多种命令用于数据处理和建模过程,可以实现数据搜索、关联、分析、可视化等操作。
    尽管如此,这些现有命令可能无法满足特定分析场景。自定义算子是用户定义的SPL搜索命令,用于扩展用户的特定分析需求。自定义算子SDK提供开发并将自定义算子加入到算子库的功能,您可以通过创建Python脚本来实现自定义搜索命令,实现数据的灵活分析。


    自定义算子开发流程

    自定义内置库中未实现的算子,主要流程如下:

    1. 新建一个App, 在App中开发算子逻辑。
    2. 完成开发后,将自定义算子App上传到应用平台。
    3. 在搜索界面中使用。

    SDK自定义算子类说明

    自定义算子提供三个基础类,可以通过继承的方式去实现自定义的命令:

    • SplBaseCommand:Spl处理类的基类,实现了标准输入输出中传输的协议,类中的字段都非常重要,详细说明如下:
    字段名称 描述 字段类型
    metainfo 保存了spl搜索中使用到的所有元数据 字典
    is_finish 表示是否将所有的数据都已经发送到了该算子 bool
    require_fields 描述该算子需要的字段名称,支持通配符 字符数组
    lines 接收到的数据 字典型数组

    其中metainfo字段中字典key为“searchinfo”的字段存储了本次执行的相关信息,详细说明如下:

    字段名称 描述 字段类型
    args 自定义算子的参数,比如自定义算子为 sample a=1 b, 那么args为 [‘a=1’, ‘b’] 字符串数组
    dispatch_dir 一个属于该app存放临时数据的目录 字符串
    owner 请求资源的拥有者 字符串
    command 查询的完整spl语句 字符串
    app 算子所属的app名字 字符串
    session_key session字段,可以用该session访问pandora2.0的其他接口 字符串
    username 请求该算子的请求者名字 字符串
    server_uri pandora2.0的地址 字符串
    • SplStreamingBatchCommand: 表示使用微批处理的方式,APP 每次会收到一批数据,默认大小为5000,完成后才会处理下一批数据,对内存占用较小
    • SplStreamingChunkCommand: 针对APP 需要收到所有数据才可以处理的场景,当数据量特别大的时候,内存占用会很大导致搜索不稳定,请谨慎使用

    自定义算子可扩展函数接口

    自定义算子对外提供的接口,可以根据业务场景扩展其中适当接口:

    • streaming_handle()

      算子逻辑函数,在该函数里实现对日志的处理,如果父类为SplStreamingBatchCommand,可能会被调用多次,如果父类为SplStreamingChunkCommand只会被调用一次

    • init_env_by_getinfo()

      接收到getinfo请求后,初始化参数的接口

    • config_require_fields()

      配置算子需要的字段,如果不明确可以填 “*” 表示需要所有字段

    示例

    示例一:通过扩展 streaming_handle() 函数,实现为每行数据增加一列的作用。

    #!/usr/bin/env python
    
    import random
    import logging
    
    from pdr_python_sdk.spl import *
    from pdr_python_sdk.on_demand_action import run
    
    class Random(SplStreamingBatchCommand):
        """
        在生成的每一条日志中添加一个number和arg字段,前者字段值为0~100之间的随机小数,后者为算子输入的参数.
        """
    
        def streaming_handle(self, lines):
            """
            业务逻辑处理接口,实现了添加随机字段的逻辑
            1. 如果父类是SplStreamingBatchCommand,数组最大值默认为5000,如果总日志数超过5000,streaming_handle会被调用多次
            2. 如果父类是SplStreamingChunkCommand,数组大小为总日志条数,streaming_handle只会被调用一次
    
            @param lines: 日志参数,是一个数组,里面的每一个元素为一条日志是字典类型,
            @return 返回值是要返回给页面的数据,可以是传进来的lines,也可以是重新生成的一个字典型数组

            """
    				
            arg = get_arguments(self.metainfo['searchinfo'])
            for line in lines:
                line['number'] = random.uniform(0.0, 100.0)
                line['arg'] = arg
            return lines
    				
        def get_arguments(self, meta_info):
            """
            处理请求的参数
            @return 返回算子的第一个参数
            """
            if len(meta_info['args']) == 0:
                raise RuntimeError('must have a args.')
    
            return meta_info['args'][0]
    
    
        def init_env_by_getinfo(self):
            """
    
            环境初始化接口,,如果没有需要初始化的逻辑可以在该类中删掉这个函数
            """
            logging.info('receive meta info is %s', self.metaInfo)
    
    
        def config_require_fields(self):
            """
            配置所需字段的接口,如果该算子只需要确定的部分字段,可以实现该接口,加快算子性能,如果需要所有字段可以在该类中删掉这个函数
            @param lines: 日志参数,是一个数组,里面的每一个元素为一条日志是字典类型
            @return 返回值是要返回给页面的数据,可以是传进来的lines,也可以是重新生成的一个字典型数组

            """
            require_fields = ['_time', 'name']
            return require_fields
    
    
    if __name__ == '__main__':
        run(Random, sys.argv, sys.stdin.buffer, sys.stdout.buffer)
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close