mini-job极简分布式延迟任务队列 — 基于 Redis,支持 Cron 周期任务、异步协程和多执行器
mini-job极简分布式延迟任务队列 — 基于 Redis支持 Cron 周期任务、异步协程和多执行器。特性特性说明延迟任务设定延迟秒数到期自动执行Cron 周期调度支持标准 cron 表达式分 时 日 月 星期三种执行器async协程IO 密集、thread线程通用、process进程CPU 密集队列级执行器隔离Redis Key{ns}:ready:{executor}三队列隔离消费者只拉取专属队列零竞争死信队列失败的一次性任务自动进入死信队列可排查重试可见性超时回收消费者崩溃后任务自动回收重入队列不丢任务命名空间隔离多环境共用同一 Redis 实例Key 前缀隔离监控指标内置QueueMetrics统计各生命周期计数背压控制队列深度超阈值自动告警Pydantic 配置集中配置管理环境变量覆盖类型校验Lua 原子操作抢占和回收均为 Redis 端原子执行无竞态优雅关闭SIGTERM/SIGINT 信号处理等待任务完成安装pipinstallmini-job# 核心依赖pipinstallmini-job[script]# 含 pandas/numpy脚本执行模式依赖Python 3.12Redis 7.4croniterpydantic-settingspython-dotenv快速开始1. 确保 Redis 运行redis-cliping# PONG2. 生产者 — 发布任务frommini_jobimportDelayQueue dqDelayQueue(namespacemyapp)# 注册脚本动态执行模式dq.register_script(send_email, def handler(payload): to_email payload.get(to) print(f发送邮件到: {to_email}) return {status: sent, to: to_email} ,)# 发布任务 — executor 参数指定执行器类型dq.publish(send_email,{to:userexample.com,subject:欢迎,content:注册成功},executorasync,# async / thread / process)# 延迟 30 秒执行dq.publish(send_email,{...},delay_seconds30)# 每天凌晨 2 点执行cron 表达式分 时 日 月 星期dq.publish(send_email,{...},cron0 2 * * *)# 查询任务结果resultdq.get_task_result(task_id)3. 消费者 — 按类型独立启动frommini_jobimportDelayQueue# 注册本地函数defsend_sms(payload):print(f发送短信 -{payload[phone]})TASK_REGISTRY{send_sms:send_sms,}dqDelayQueue(namespacemyapp)dq.start(task_registryTASK_REGISTRY,executor_typeasync,# 本进程只消费 async 任务)启动不同执行器类型的消费者3 个终端python consumer.py async# 协程消费者 — IO 密集任务python consumer.py thread# 线程消费者 — 通用任务python consumer.py process# 进程消费者 — CPU 密集任务核心概念执行器类型类型适用场景实现推荐并发数asyncIO 密集发邮件、HTTP 请求、DB 操作asyncio协程100~500thread通用任务、阻塞操作ThreadPoolExecutor30~100processCPU 密集数据处理、报表生成ProcessPoolExecutorCPU 核数任务路由表TASK_REGISTRY{# 简单格式默认 async 执行器send_sms:send_sms,# 带配置格式指定执行器类型daily_report:(daily_report,{executor:thread}),}状态生命周期pending → running → completed ↘ failed → 死信队列一次性任务 下次重试周期任务Redis Key 设计{namespace}:ready:{executor} — 按执行器隔离的就绪 ZSetasync/thread/process {namespace}:processing:{id} — 消费者专属处理列表 {namespace}:processing:timeout — 全局超时追踪 ZSet {namespace}:dead_letter — 死信队列 {namespace}:dead_letter:detail — 死信详情 {namespace}:task:meta — 任务元数据 {namespace}:task:result:{id} — 任务结果独立 TTL {namespace}:scripts — 注册脚本API 参考DelayQueuedqDelayQueue(namespacemyapp)# 或使用配置对象frommini_jobimportQueueConfig dqDelayQueue(QueueConfig(namespacemyapp))生产者方法方法说明publish(func, payload, delay_seconds0, cronNone, executorasync)发布任务 → 返回 task_idregister_script(name, content, languagepython, use[])注册动态脚本get_script(name)获取脚本信息delete_script(name)删除脚本list_scripts()列出所有脚本get_task_result(task_id)查询任务状态和结果消费者方法方法说明start(task_registry, executor_typeasync, **kwargs)启动消费者stop()手动触发优雅关闭start()参数参数默认值说明task_registry(必填)任务路由表{name: func}executor_typeasync执行器类型async / thread / processpoll_interval0.5轮询间隔秒grab_limit80每次最多抢占任务数worker_threads50工作线程/协程/进程数task_timeout30单个任务超时秒visibility_timeout60可见性超时秒配置通过 Pydantic Settings 管理支持.env文件、环境变量覆盖、类型校验。队列配置DQ_*参数环境变量默认值类型说明namespaceDQ_NAMESPACEdqstrRedis Key 命名空间前缀多环境隔离consumer_idDQ_CONSUMER_ID自动生成str消费者唯一标识默认worker- 8 位 hexresult_ttlDQ_RESULT_TTL86400int任务结果保留时间秒默认 1 天reclaim_intervalDQ_RECLAIM_INTERVAL10int超时回收检查间隔轮询周期数每 N 轮检查一次Redis 连接配置DQ_REDIS_*参数环境变量默认值类型说明hostDQ_REDIS_HOSTlocalhoststrRedis 主机地址portDQ_REDIS_PORT6379intRedis 端口dbDQ_REDIS_DB0intRedis 数据库编号passwordDQ_REDIS_PASSWORDNonestrRedis 密码可选max_connectionsDQ_REDIS_MAX_CONNECTIONS50int连接池最大连接数socket_timeoutDQ_REDIS_SOCKET_TIMEOUT5.0float单次操作超时秒socket_connect_timeoutDQ_REDIS_SOCKET_CONNECT_TIMEOUT5.0float连接建立超时秒retry_on_timeoutDQ_REDIS_RETRY_ON_TIMEOUTTruebool超时是否自动重试health_check_intervalDQ_REDIS_HEALTH_CHECK_INTERVAL30int连接健康检查间隔秒消费者配置DQ_CONSUMER_*参数环境变量默认值类型说明poll_intervalDQ_CONSUMER_POLL_INTERVAL0.5float轮询间隔秒影响任务延迟精度grab_limitDQ_CONSUMER_GRAB_LIMIT80int每次最多抢占任务数建议 worker × 1.5~2worker_threadsDQ_CONSUMER_WORKER_THREADS50int工作协程/线程/进程数task_timeoutDQ_CONSUMER_TASK_TIMEOUT30int单个任务执行超时秒超时后标记失败visibility_timeoutDQ_CONSUMER_VISIBILITY_TIMEOUT60int可见性超时秒消费者需在此时间内完成任务shutdown_timeoutDQ_CONSUMER_SHUTDOWN_TIMEOUT30int优雅关闭最大等待时间秒max_queue_depthDQ_CONSUMER_MAX_QUEUE_DEPTH10000int队列深度告警阈值超阈值打印 WARNING示例.env# 队列DQ_NAMESPACEproductionDQ_CONSUMER_IDweb-server-01# RedisDQ_REDIS_HOSTredis.example.comDQ_REDIS_PORT6379DQ_REDIS_PASSWORDsecret# 消费者DQ_CONSUMER_POLL_INTERVAL0.3DQ_CONSUMER_GRAB_LIMIT100DQ_CONSUMER_WORKER_THREADS80DQ_CONSUMER_TASK_TIMEOUT60DQ_CONSUMER_VISIBILITY_TIMEOUT120监控# 获取监控指标快照snapshotdq.metrics.snapshot()# {published: 1000, completed: 980, failed: 15, timeout: 5, ...}指标说明指标含义published已发布任务总数completed成功完成数failed执行失败数timeout超时任务数dead_lettered进入死信队列数reclaimed超时回收重入队数项目结构mini_job/ ├── __init__.py # 公共导出 ├── config.py # Pydantic Settings 配置 ├── core/ │ ├── delay_queue.py # DelayQueue 核心 │ └── task.py # 任务模型 ├── executor/ │ ├── base.py # 执行器抽象基类 │ ├── async_io.py # 协程执行器 │ ├── thread.py # 线程执行器 │ └── process.py # 进程执行器 ├── redis/ │ ├── client.py # Redis 连接 Lua 脚本 │ └── scripts.lua # 原子 Lua 脚本 ├── utils/ │ ├── retry.py # 重试装饰器 │ ├── metrics.py # 监控指标 │ └── decorators.py # 任务装饰器 ├── consumer.py # 消费者示例 └── producer.py # 生产者示例LicenseMIT
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2564927.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!