机器数据分析平台

  • 机器数据分析平台 > 应用开发者文档 > APP开发手册 > APP开发说明 > 自定义后端功能 >自定义调度任务

    自定义调度任务

    最近更新时间:2021-08-05 15:46:53

    平台支持用户通过插件的形式实现定时调度任务的业务逻辑,下面以Python SDK为例说明如何进行自定义调度任务开发。

    安装 Python 环境

    自定义API 需要Python 3.8+ 以上的环境,以及 pip 包管理工具。非常建议使用 conda 或者 pipenv 等工具来管理你的Python 环境。

    安装Pandora Python SDK

    确认安装完整的Python 环境,并且安装好 pip 包管理工具。执行如下命令

    sudo pip install pdr_python_sdk
    

    创建模板应用,执行如下命令,注意填写你的APP英文名字和标题,假设app英文名为sched_task_test。

    create_pandora_app --name <APP英文名字> --title <APP的标题>
    

    编写自定义调度任务脚本

    详细内容参考SDK帮助手册 获取Python SDK,然后在bins目录下可以查看 sched_task_example.py 脚本。注意我们继承了 OnDemandSchedTask 类型,并且覆盖了 do_handle_task 接口,在接口中实现了我们调度任务的业务逻辑:

    import sys
    import os
    
    # add library to python path , don't forget it
    lib_name = 'libs'
    sys.path.insert(0, os.path.sep.join([os.path.dirname(os.path.abspath(__file__)), lib_name]))
    
    import logging
    import json
    
    from pdr_python_sdk.sched_task.on_demand_sched_task import OnDemandSchedTask
    from pdr_python_sdk.on_demand_action import run
    
    
    class SchedTaskExample(OnDemandSchedTask):
    
        def do_handle_init(self, packet):
            if packet.body().contains_metadata():
                metadata = packet.body().metadata()
                logging.info("metadata = %s", metadata)
    
        def do_handle_task(self, params:dict):
            logging.info(params)
            return json.dumps({"status": "success", "data": {}, "message": "successful"})
    
    
    if __name__ == '__main__':
        run(SchedTaskExample, sys.argv, sys.stdin.buffer, sys.stdout.buffer)
    
    

    运行单元测试

    在项目中运行命令,可以执行在 bins/tests 目录下的单元测试代码,保障我们APP的质量。

    ./run.sh unittest
    

    打包并安装APP

    将我们的应用项目打包,如果运行成功的话,应用的包将会出现在 dist 目录下:

    ./run.sh package 
    
    $ ls dist 
    sched_task_test.tar.gz
    

    去pandora 2.0的应用商店的“应用管理”界面上传APP 的程序包,或者可以执行以下命令,自动上传:

    export PANDORA_PORT=<PANDORA端口>
    export PANDORA_SCHEME=<https|http>
    export PANDORA_TOKEN=<PANDORA访问Token>
    export PANDORA_HOST=<PANDORA服务IP或者域名>
    
    ./run.sh upload
    

    向调度平台注册任务

    示例代码: sched_task_example.py

    1、添加一个Job任务

    方法签名: create_schedule(job: Job)

    job = Job('demoJobName')
    job.set_app_name('demoAppName')
    job.set_parameter('action', 'run')
    job.set_crontab('0 0/2 * * * ?')
    job.set_status(True)
    job.set_execute_batch(2)
    
    task_1a = Task('customHandler')
    task_1a.set_parameter('file', 'send_msg.py')
    task_1a.set_parameter('env', 'user=jack')
    task_1a.set_parameter('params', {'money': 100, 'type': 'RMB'})
    
    task_2a = Task('customHandler')
    task_2a.set_parameter('file', 'putin_agree.py')
    task_2a.set_parameter('env', 'user=Putin')
    task_2a.set_parameter('params', {'action': 'agree'})
    task_2a.add_task(task_1a)
    
    task_3a = Task('customHandler')
    task_3a.set_parameter('file', 'trump_agree.py')
    task_3a.set_parameter('env', 'user=Trump')
    task_3a.set_parameter('params', {'action': 'agree'})
    task_3a.add_task(task_1a)
    
    job.add_task(task_2a)
    job.add_task(task_3a)
    
    job_id = conn.create_schedule(job)
    

    2、更新一个Job信息

    方法签名: update_schedule(job: Job)

    job.set_parameter('action', 'stop')
    conn.update_schedule(job)
    

    3、查询一个Job信息

    方法签名:get_schedule(<String: jobId>)

    job = conn.get_schedule(job_id)
    

    4、删除一个Job信息

    方法签名:delete_schedule(<String: jobId>)

    conn.delete_schedule(job_id)
    
    以上内容是否对您有帮助?
  • Qvm free helper
    Close