平台支持用户通过插件的形式实现定时调度任务的业务逻辑,下面以Python SDK为例说明如何进行自定义调度任务开发。
安装 Python 环境
自定义API 需要Python 3.8+ 以上的环境,以及 pip 包管理工具。非常建议使用 conda 或者 pipenv 等工具来管理你的Python 环境。
安装Pandora Python SDK
确认安装完整的Python 环境,并且安装好 pip 包管理工具。执行如下命令
sudo pip install pdr_python_sdk
创建模板应用,执行如下命令,注意填写你的APP英文名字和标题,假设app英文名为sched_task_test。
create_pandora_app --name <APP英文名字> --title <APP的标题>
编写自定义调度任务脚本
详细内容参考SDK帮助手册 获取Python SDK,然后在bins目录下可以查看 sched_task_example.py 脚本。注意我们继承了 OnDemandSchedTask 类型,并且覆盖了 do_handle_task 接口,在接口中实现了我们调度任务的业务逻辑:
import sys
import os
# add library to python path , don't forget it
lib_name = 'libs'
sys.path.insert(0, os.path.sep.join([os.path.dirname(os.path.abspath(__file__)), lib_name]))
import logging
import json
from pdr_python_sdk.sched_task.on_demand_sched_task import OnDemandSchedTask
from pdr_python_sdk.on_demand_action import run
class SchedTaskExample(OnDemandSchedTask):
def do_handle_init(self, packet):
if packet.body().contains_metadata():
metadata = packet.body().metadata()
logging.info("metadata = %s", metadata)
def do_handle_task(self, params:dict):
logging.info(params)
return json.dumps({"status": "success", "data": {}, "message": "successful"})
if __name__ == '__main__':
run(SchedTaskExample, sys.argv, sys.stdin.buffer, sys.stdout.buffer)
运行单元测试
在项目中运行命令,可以执行在 bins/tests 目录下的单元测试代码,保障我们APP的质量。
./run.sh unittest
打包并安装APP
将我们的应用项目打包,如果运行成功的话,应用的包将会出现在 dist 目录下:
./run.sh package
$ ls dist
sched_task_test.tar.gz
去pandora 2.0的应用商店的“应用管理”界面上传APP 的程序包,或者可以执行以下命令,自动上传:
export PANDORA_PORT=<PANDORA端口>
export PANDORA_SCHEME=<https|http>
export PANDORA_TOKEN=<PANDORA访问Token>
export PANDORA_HOST=<PANDORA服务IP或者域名>
./run.sh upload
向调度平台注册任务
示例代码: sched_task_example.py
1、添加一个Job任务
方法签名: create_schedule(job: Job)
job = Job('demoJobName')
job.set_app_name('demoAppName')
job.set_parameter('action', 'run')
job.set_crontab('0 0/2 * * * ?')
job.set_status(True)
job.set_execute_batch(2)
task_1a = Task('customHandler')
task_1a.set_parameter('file', 'send_msg.py')
task_1a.set_parameter('env', 'user=jack')
task_1a.set_parameter('params', {'money': 100, 'type': 'RMB'})
task_2a = Task('customHandler')
task_2a.set_parameter('file', 'putin_agree.py')
task_2a.set_parameter('env', 'user=Putin')
task_2a.set_parameter('params', {'action': 'agree'})
task_2a.add_task(task_1a)
task_3a = Task('customHandler')
task_3a.set_parameter('file', 'trump_agree.py')
task_3a.set_parameter('env', 'user=Trump')
task_3a.set_parameter('params', {'action': 'agree'})
task_3a.add_task(task_1a)
job.add_task(task_2a)
job.add_task(task_3a)
job_id = conn.create_schedule(job)
2、更新一个Job信息
方法签名: update_schedule(job: Job)
job.set_parameter('action', 'stop')
conn.update_schedule(job)
3、查询一个Job信息
方法签名:get_schedule(<String: jobId>)
job = conn.get_schedule(job_id)
4、删除一个Job信息
方法签名:delete_schedule(<String: jobId>)
conn.delete_schedule(job_id)
文档反馈
(如有产品使用问题,请 提交工单)