文章目录
- 定时任务库对比
- 简介
- 与其余框架的区别
- 安装
- 初试
- 调度器基础
- 测试方法
- 字符串格式
- 具体时间间隔
- 周期
- 某时间段
 
- 条件 API
- 条件逻辑
- 方法对比
 
- 执行选项
- 在主进程和线程中执行
- 进程
- 线程
- 异步
- 设置默认选项
 
- 日志
- 流水线
- 在一个任务后执行
- 输入作为输出
- 会话级参数
- 函数参数
- TODO:元参数
 
- 自定义条件
 
- 元任务
- 遇到的坑
- 参考文献
定时任务库对比
推荐阅读 Python timing task - schedule vs. Celery vs. APScheduler
| 库 | 大小 | 优点 | 缺点 | 适用场景 | 
|---|---|---|---|---|
| Schedule | 轻量级 | 易用无配置 | 不能动态添加任务或持久化任务 | 简单任务 | 
| Celery | 重量级 | ①任务队列 ②分布式 | ①不能动态添加定时任务到系统中,如Flask(Django可以) ②设置起来较累赘 | 任务队列 | 
| APScheduler | 相对重量级 | ①灵活,可动态增删定时任务并持久化 ②支持多种存储后端 ③集成框架多,用户广 | 重量级,学习成本大 | 通用 | 
| Rocketry | 轻量级 | 易用功能强 | 尚未成熟,文档不清晰 | 通用 | 
简介

Rocketry 是 Python 的任务调度框架,易用、简洁、强大。可通过 Python 装饰器语法进行任务调度,支持定时、并发(异步、多线程、多进程)、条件触发等。
感觉没有 Celery 和 APScheduler 成熟
与其余框架的区别
常见任务调度框架有:
- Crontab
- APScheduler
- Airflow
Rocketry 的调度程序基于语句,有相同的调度策略,也可以使用自定义调度语句进行扩展。
此外,Rocketry 非常易用,无需复杂配置,但可用于大型应用程序。
安装
pip install rocketry
初试
import datetime
from rocketry import Rocketry
app = Rocketry()
@app.task('every 5 seconds')
def do_things():
    print(datetime.datetime.now())
if __name__ == "__main__":
    app.run()
调度器基础
创建调度器规则的方式,可通过与、或、非组合,还可用于任务的开始、结束、终止:
- 字符串格式
- 条件 API
- 条件类
测试方法
判断当前时间是否在 10:00 到 14:00 之间
from rocketry.conds import time_of_day
condition = time_of_day.between('10:00', '14:00')
print(condition.observe())
字符串格式
简单易写,但静态代码分析器无法检查语句是否正确
具体时间间隔
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('every 10 seconds')
def do_constantly():
    """每10秒执行"""
    print(sys._getframe().f_code.co_name)
@app.task('every 1 minute')
def do_minutely():
    """每1分钟执行"""
    print(sys._getframe().f_code.co_name)
@app.task('every 1 hour')
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)
@app.task('every 1 day')
def do_daily():
    """每1天执行"""
    print(sys._getframe().f_code.co_name)
@app.task('every 2 days 2 hours 20 seconds')
def do_custom():
    """每2天2小时20秒执行"""
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
周期
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly')
def do_secondly():
    """每1秒钟执行"""
    print(sys._getframe().f_code.co_name)
@app.task('minutely')
def do_minutely():
    """每1分钟执行"""
    print(sys._getframe().f_code.co_name)
@app.task('hourly')
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)
@app.task('daily')
def do_daily():
    """每1天执行"""
    print(sys._getframe().f_code.co_name)
@app.task('weekly')
def do_weekly():
    """每1周执行"""
    print(sys._getframe().f_code.co_name)
@app.task('monthly')
def do_monthly():
    """每1个月执行"""
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
某时间段
- before:之前
- after:之后
- between:之间
- starting:开始
import sys
from rocketry import Rocketry
app = Rocketry()
@app.task('minutely before 45')
def do_minutely():
    """每分钟的45秒前执行"""
    print(sys._getframe().f_code.co_name)
@app.task('hourly after 45:00')
def do_hourly():
    """每小时的45分后执行"""
    print(sys._getframe().f_code.co_name)
@app.task('daily between 08:00 and 14:00')
def do_daily():
    """每天的08:00到14:00这段时间内执行"""
    print(sys._getframe().f_code.co_name)
@app.task('weekly on Monday')
def do_weekly():
    """每周的周一执行"""
    print(sys._getframe().f_code.co_name)
@app.task('monthly starting 3rd')
def do_monthly():
    """每个月的3号开始执行"""
    print(sys._getframe().f_code.co_name)
@app.task('time of day between 10:00 and 18:00')
def do_constantly_during_day():
    """每天的10:00到18:00这段时间内执行"""
    print(sys._getframe().f_code.co_name)
@app.task('time of week between Saturday and Sunday')
def do_constantly_during_weekend():
    """每周的周六到周日这段时间内执行"""
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
条件 API
import sys
from rocketry import Rocketry
from rocketry.conds import every, hourly, daily, after_success, true, false
app = Rocketry()
@app.task(every('10 seconds'))
def do_constantly():
    """每10秒执行"""
    print(sys._getframe().f_code.co_name)
@app.task(hourly)
def do_hourly():
    """每1小时执行"""
    print(sys._getframe().f_code.co_name)
@app.task(daily.between('08:00', '14:00'))
def do_daily():
    """每天08:00到14:00执行一次"""
    print(sys._getframe().f_code.co_name)
@app.task(after_success(do_daily))
def do_after():
    """do_daily成功后执行"""
    print(sys._getframe().f_code.co_name)
@app.task(true & false & ~(true | false))
def do_logic():
    """逻辑执行"""
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
条件逻辑
- &:与
- |:或
- ~:非
import sys
from rocketry import Rocketry
from rocketry.conds import true, false
app = Rocketry()
@app.task(true)
def do_constantly():
    print(sys._getframe().f_code.co_name)
@app.task(false)
def do_never():
    print(sys._getframe().f_code.co_name)
@app.task(true & false)
def do_and():
    """与"""
    print(sys._getframe().f_code.co_name)
@app.task(true | false)
def do_or():
    """或"""
    print(sys._getframe().f_code.co_name)
@app.task(~false)
def do_not():
    """非"""
    print(sys._getframe().f_code.co_name)
@app.task((true | false) & ~(true | false))
def do_nested():
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
方法对比
执行选项
- main:在主进程和线程中执行(默认)
- process:在单独进程中执行
- thread:在单独线程中执行
- async:异步执行
| 执行选项 | 是否并发 | 能否被终止 | 能否修改 session | 
|---|---|---|---|
| main | ✖ | ✖ | ✔ | 
| process | ✔ | ✔ | ✖ | 
| thread | 部分 | ✔ | ✔ | 
| async | 部分 | ✔ | ✔ | 
threading.current_thread():获取当前线程
os.getpid():获取当前进程 ID
在主进程和线程中执行
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='main')
def do_main():
    """在主进程和线程中执行"""
    print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())
if __name__ == '__main__':
    app.run()
    # do_main <_MainThread(MainThread, started 15676)> 23448
    # do_main <_MainThread(MainThread, started 15676)> 23448
    # do_main <_MainThread(MainThread, started 15676)> 23448
进程
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='process')
def do_process():
    """在单独进程中执行"""
    print(sys._getframe().f_code.co_name, threading.current_thread(), os.getpid())
if __name__ == '__main__':
    app.run()
    # do_process <_MainThread(MainThread, started 20364)> 14612
    # do_process <_MainThread(MainThread, started 25636)> 25996
    # do_process <_MainThread(MainThread, started 27000)> 18504
线程
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='thread')
def do_thread():
    """"""
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
if __name__ == '__main__':
    app.run()
    # do_thread <Thread(Thread-1, started 28064)> 26768
    # do_thread <Thread(Thread-2, started 3916)> 26768
    # do_thread <Thread(Thread-3, started 17328)> 26768
异步
import os
import sys
import asyncio
import threading
from rocketry import Rocketry
app = Rocketry()
@app.task('secondly', execution='async')
async def do_async():
    """异步执行"""
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
async def main():
    rocketry_task = asyncio.create_task(app.serve())
    await rocketry_task
if __name__ == '__main__':
    asyncio.run(main())
    # do_async <_MainThread(MainThread, started 22752)> 24976
    # do_async <_MainThread(MainThread, started 22752)> 24976
    # do_async <_MainThread(MainThread, started 22752)> 24976
设置默认选项
import os
import sys
import threading
from rocketry import Rocketry
app = Rocketry(config={'task_execution': 'thread'})
@app.task('secondly')
def do_thread():
    print(sys._getframe().f_code.co_name, threading.currentThread(), os.getpid())
if __name__ == '__main__':
    app.run()
    # do_thread <Thread(Thread-1, started 28064)> 26768
    # do_thread <Thread(Thread-2, started 3916)> 26768
    # do_thread <Thread(Thread-3, started 17328)> 26768
日志
内置日志格式有:
- rocketry.log.MinimalRecord: 最简略的日志
- rocketry.log.LogRecord: 经典的日志元素
- rocketry.log.TaskLogRecord: 类似- LogRecord,同时包含开始、结束、运行次数
import os
import datetime
from rocketry import Rocketry
from redbird.repos import CSVFileRepo
from rocketry.log import MinimalRecord, LogRecord, TaskLogRecord
filename = 'logs.csv'
if os.path.exists(filename):
    os.remove(filename)
app = Rocketry(logger_repo=CSVFileRepo(filename=filename, model=MinimalRecord))
@app.task('secondly')
def do_things():
    print(datetime.datetime.now())
if __name__ == '__main__':
    app.run()
流水线
- 在一个任务执行后、成功后、失败后,执行任务
- 将一个任务的输出作为另一个任务的输入
在一个任务后执行
- after_success:成功后
- after_fail:失败后
- after_finish:完成后
import sys
import random
from rocketry import Rocketry
from rocketry.conds import after_success, after_fail, after_finish
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def do_things():
    if random.randint(0, 10) % 2 == 0:
        print(sys._getframe().f_code.co_name, '\tfail!')
        raise Exception
    print(sys._getframe().f_code.co_name, '\tsuccess!')
@app.task(after_success(do_things))
def do_after_success():
    """成功后执行"""
    print(sys._getframe().f_code.co_name)
@app.task(after_fail(do_things))
def do_after_fail():
    """失败后执行"""
    print(sys._getframe().f_code.co_name)
@app.task(after_finish(do_things))
def do_after_fail_or_success():
    """完成后执行"""
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
输入作为输出
import sys
from rocketry import Rocketry
from rocketry.args import Return
from rocketry.conds import after_success
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def do_first():
    print(sys._getframe().f_code.co_name)
    return 'Hello World'
@app.task(after_success(do_first))
def do_second(arg=Return(do_first)):
    print(sys._getframe().f_code.co_name, arg)
if __name__ == '__main__':
    app.run()
会话级参数
有两种参数:
- 任务级别
- 会话级别(多数情况下使用) 
  - Arg:
- SimpleArg:只传递值
- FuncArg:会话级函数实参
 
import sys
from rocketry import Rocketry
from rocketry.args import Arg, SimpleArg
app = Rocketry(execution='main')
app.params(
    my_arg='Hello world'
)
@app.task('every 3 seconds')
def do_arg(item=Arg('my_arg')):
    print(sys._getframe().f_code.co_name, item)
@app.task('every 3 seconds')
def do_simple_arg(item=SimpleArg('Hello world')):
    print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
    app.run()
函数参数
会话级别
import sys
import datetime
from rocketry import Rocketry
from rocketry.args import Arg
app = Rocketry(execution='main')
@app.param('my_arg')
def get_item():
    return datetime.datetime.now()
@app.task('every 3 seconds')
def do_func_arg(item=Arg('my_arg')):
    print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
    app.run()
任务级别
import sys
import datetime
from rocketry import Rocketry
from rocketry.args import FuncArg
app = Rocketry(execution='main')
def get_item():
    return datetime.datetime.now()
@app.task('every 3 seconds')
def do_func_arg(item=FuncArg(get_item)):
    print(sys._getframe().f_code.co_name, item)
if __name__ == '__main__':
    app.run()
TODO:元参数
元参数包含调度系统组件的参数,用于任务操作会话,可关闭调度器或添加、删除任务等
会话参数
from rocketry import Rocketry
from rocketry.args import Session
app = Rocketry(execution='main')
@app.task('every 3 seconds')
def manipulate_session(session=Session()):
    print(session)
if __name__ == '__main__':
    app.run()
任务参数
from rocketry import Rocketry
from rocketry.args import Task
app = Rocketry(execution='main')
@app.task()
def do_things():
    ...
@app.task('every 3 seconds')
def manipulate_task(this_task=Task(), another_task=Task('do_things')):
    print(this_task)
    print(another_task)
if __name__ == '__main__':
    app.run()
自定义条件
import sys
from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def things_ready():
    return True or False
@app.task(daily & things_ready)
def do_things():
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
传参,判断文件是否存在
import sys
from pathlib import Path
from rocketry import Rocketry
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def file_exists(file):
    return Path(file).is_file()
@app.task(daily & file_exists(__file__))
def do_things():
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
传参,判断任务名
import sys
from pathlib import Path
from rocketry import Rocketry
from rocketry.args import Task
from rocketry.conds import daily
app = Rocketry(execution='main')
@app.cond()
def is_right_task(this_task=Task()):
    return this_task.name.startswith('do_')
@app.task(daily & is_right_task)
def do_things():
    print(sys._getframe().f_code.co_name)
if __name__ == '__main__':
    app.run()
元任务
可以在运行时:
- 终止调度器
- 重启调度器
- 强制任务运行
- 禁用任务
- 创建、更新、删除任务
遇到的坑
FutureWarning: Default execution will be changed to ‘async’. To suppress this warning, specify task_execution, ie. Rocketry(execution=‘async’)
实例化 Rocketry 对象时指定 execution,如
from rocketry import Rocketry
app = Rocketry(execution='main')
参考文献
- Rocketry Documentation
- Rocketry GitHub



















