机器数据分析平台

  • 机器数据分析平台 > 应用开发者文档 > APP开发手册 > APP开发说明 > 自定义后端功能 >自定义算子

    自定义算子

    最近更新时间:2021-01-29 20:14:00

    Pandora支持 SPL 高级搜索分析语法,以管道符的形式表达数据分析的过程,每一步使用对应的SPL搜索命令完成相应的数据分析。平台内置提供了 100 多种命令用于数据处理和建模过程,可以实现数据搜索、关联、分析、可视化等操作。
    尽管如此,这些现有命令可能无法满足特定分析场景。为了拓展SPL的能力,除了平台自带的SPL算子外,可以通过插件的形式实现自定义算子的逻辑。自定义算子是用户定义的SPL搜索命令,用于扩展用户的特定分析需求。自定义算子SDK提供开发并将自定义算子加入到算子库的功能,下面以Python SDK为例说明如何进行自定义算子开发,实现数据的灵活分析。


    自定义算子类型

    用户可以自定义的算子类型如下:

    算子类型 描述 示例 计算方式
    流式命令 对搜索返回的事件数据逐个应用SPL命令进行处理 eval,fields,rex,rename,replace,where,search 集中式/分布式
    转换统计命令 将搜索返回的事件转换为用于统计目的的数值 chart,stats,rare,timechart,top 集中式/分布式
    搜索导出 将搜索分析结果导出到第三方平台 export 集中式/分布式

    安装 Python 环境

    自定义API 需要Python 3.8+ 以上的环境,以及 pip 包管理工具。非常建议使用 conda 或者 pipenv 等工具来管理你的Python 环境。

    安装Pandora Python SDK

    确认安装完整的Python 环境,并且安装好 pip 包管理工具。执行如下命令

    sudo pip install pdr_python_sdk
    

    创建模板应用,执行如下命令,注意填写你的APP英文名字和标题。

    create_pandora_app --name <APP英文名字> --title <APP的标题>
    

    配置自定义算子的配置项

    在 resources/customoperators 目录下可以看到 spl.json文件,文件配置如下:

    {
        "name":"sample",           // 算子的名字
        "type":"centralized",      // 算子的类型,支持 streaming/centralized/generating
        "env":"python3",           // 算子实现的语言
        "filename":"sample.py"     // 算子实现的文件名,该文件存储在bins目录下
    }
    
    算子类型 描述
    streaming 对数据进行分布式流式计算,适合于无状态的计算
    centralized 中心化算子,会将结果汇聚到一个节点后进行集中计算
    generating 算子本身不参与计算,只是负责生成数据,主站不会传递任何数据给该算子

    编写自定义算子脚本

    详细内容参考SDK帮助手册 获取 Python SDK,然后在bins目录下创建spl_example.py 脚本放置在 bins 目录下,脚本内容为:

    #!/usr/bin/env python
    import sys
    import os
    
    # add library to python path , don't forget it
    lib_name = 'libs'
    sys.path.insert(0, os.path.sep.join([os.path.dirname(os.path.abspath(__file__)), lib_name]))
    
    from pdr_python_sdk.spl import *
    from pdr_python_sdk.on_demand_action import run
    
    
    class SplExample(SplStreamingBatchCommand):
    
        def streaming_handle(self, lines):
            """
            TODO: implement your own business logic
    
            :param lines: list of key-value dict
            :return:
            """
            return self.handle_lines(lines)
    
        @staticmethod
        def handle_lines(lines):
            for line in lines:
                line['number'] = 100
            return lines
    
    
    if __name__ == '__main__':
        run(SplExample, sys.argv, sys.stdin.buffer, sys.stdout.buffer)
    

    注意,我们继承了SplStreamingBatchCommand类型,并且覆盖了streaming_handle的方法,去完成我们对于每行数据的计算逻辑。

    打包并安装APP

    将我们的应用项目打包,如果运行成功的话,应用的包将会出现在 dist 目录下:

    ./run.sh package 
    
    $ ls dist 
    scientific_app.tar.gz
    

    去pandora 2.0的应用商店的“应用管理”界面上传APP 的程序包,或者可以执行以下命令,自动上传:

    export PANDORA_PORT=<PANDORA端口>
    export PANDORA_SCHEME=<https|http>
    export PANDORA_TOKEN=<PANDORA访问Token>
    export PANDORA_HOST=<PANDORA服务IP或者域名>
    
    ./run.sh upload
    

    执行命令

    去搜索分析界面执行“* | sample” 命令,能够看到每条日志都会新增"number": 100 这样的字段

    以上内容是否对您有帮助?
  • Qvm free helper
    Close