自定义算子开发流程
Pandora支持 SPL 高级搜索分析语法,以管道符的形式表达数据分析的过程,每一步使用对应的SPL搜索命令完成相应的数据分析。平台内置提供了 100 多种命令用于数据处理和建模过程,可以实现数据搜索、关联、分析、可视化等操作。
尽管如此,这些现有命令可能无法满足特定分析场景。自定义算子是用户定义的SPL搜索命令,用于扩展用户的特定分析需求。自定义算子SDK提供开发并将自定义算子加入到算子库的功能,您可以通过创建Python脚本来实现自定义搜索命令,实现数据的灵活分析。
自定义算子开发流程
自定义内置库中未实现的算子,主要流程如下:
- 新建一个App, 在App中开发算子逻辑。
- 完成开发后,将自定义算子App上传到应用平台。
- 在搜索界面中使用。
SDK自定义算子类说明
自定义算子提供三个基础类,可以通过继承的方式去实现自定义的命令:
- SplBaseCommand:Spl处理类的基类,实现了标准输入输出中传输的协议,类中的字段都非常重要,详细说明如下:
字段名称 | 描述 | 字段类型 |
---|---|---|
metainfo | 保存了spl搜索中使用到的所有元数据 | 字典 |
is_finish | 表示是否将所有的数据都已经发送到了该算子 | bool |
require_fields | 描述该算子需要的字段名称,支持通配符 | 字符数组 |
lines | 接收到的数据 | 字典型数组 |
其中metainfo字段中字典key为“searchinfo”的字段存储了本次执行的相关信息,详细说明如下:
字段名称 | 描述 | 字段类型 |
---|---|---|
args | 自定义算子的参数,比如自定义算子为 sample a=1 b, 那么args为 [‘a=1’, ‘b’] | 字符串数组 |
dispatch_dir | 一个属于该app存放临时数据的目录 | 字符串 |
owner | 请求资源的拥有者 | 字符串 |
command | 查询的完整spl语句 | 字符串 |
app | 算子所属的app名字 | 字符串 |
session_key | session字段,可以用该session访问pandora2.0的其他接口 | 字符串 |
username | 请求该算子的请求者名字 | 字符串 |
server_uri | pandora2.0的地址 | 字符串 |
- SplStreamingBatchCommand: 表示使用微批处理的方式,APP 每次会收到一批数据,默认大小为5000,完成后才会处理下一批数据,对内存占用较小
- SplStreamingChunkCommand: 针对APP 需要收到所有数据才可以处理的场景,当数据量特别大的时候,内存占用会很大导致搜索不稳定,请谨慎使用
自定义算子可扩展函数接口
自定义算子对外提供的接口,可以根据业务场景扩展其中适当接口:
-
streaming_handle()
算子逻辑函数,在该函数里实现对日志的处理,如果父类为SplStreamingBatchCommand,可能会被调用多次,如果父类为SplStreamingChunkCommand只会被调用一次
-
init_env_by_getinfo()
接收到getinfo请求后,初始化参数的接口
-
config_require_fields()
配置算子需要的字段,如果不明确可以填 “*” 表示需要所有字段
示例
示例一:通过扩展 streaming_handle() 函数,实现为每行数据增加一列的作用。
#!/usr/bin/env python
import random
import logging
from pdr_python_sdk.spl import *
from pdr_python_sdk.on_demand_action import run
class Random(SplStreamingBatchCommand):
"""
在生成的每一条日志中添加一个number和arg字段,前者字段值为0~100之间的随机小数,后者为算子输入的参数.
"""
def streaming_handle(self, lines):
"""
业务逻辑处理接口,实现了添加随机字段的逻辑
1. 如果父类是SplStreamingBatchCommand,数组最大值默认为5000,如果总日志数超过5000,streaming_handle会被调用多次
2. 如果父类是SplStreamingChunkCommand,数组大小为总日志条数,streaming_handle只会被调用一次
@param lines: 日志参数,是一个数组,里面的每一个元素为一条日志是字典类型,
@return 返回值是要返回给页面的数据,可以是传进来的lines,也可以是重新生成的一个字典型数组
"""
arg = get_arguments(self.metainfo['searchinfo'])
for line in lines:
line['number'] = random.uniform(0.0, 100.0)
line['arg'] = arg
return lines
def get_arguments(self, meta_info):
"""
处理请求的参数
@return 返回算子的第一个参数
"""
if len(meta_info['args']) == 0:
raise RuntimeError('must have a args.')
return meta_info['args'][0]
def init_env_by_getinfo(self):
"""
环境初始化接口,,如果没有需要初始化的逻辑可以在该类中删掉这个函数
"""
logging.info('receive meta info is %s', self.metaInfo)
def config_require_fields(self):
"""
配置所需字段的接口,如果该算子只需要确定的部分字段,可以实现该接口,加快算子性能,如果需要所有字段可以在该类中删掉这个函数
@param lines: 日志参数,是一个数组,里面的每一个元素为一条日志是字典类型
@return 返回值是要返回给页面的数据,可以是传进来的lines,也可以是重新生成的一个字典型数组
"""
require_fields = ['_time', 'name']
return require_fields
if __name__ == '__main__':
run(Random, sys.argv, sys.stdin.buffer, sys.stdout.buffer)
文档反馈
(如有产品使用问题,请 提交工单)