php方案 PHP 实现分布式任务调度
一、分布式任务调度类XXL-Job composerrequireswoole/ide-helper predis/predis 架构[调度中心 Scheduler]→ Redis →[执行器节点 Worker xN]↑ ↓ 定时触发 执行任务上报结果 调度中心?php// scheduler.php - 单实例负责触发任务requirevendor/autoload.php;useSwoole\Timer;usePredis\Client;$redisnewClient();// 注册任务定义$jobs[order.timeout[cron*/5 * * * *,routeround_robin],report.daily[cron0 2 * * *,routerandom],cache.warmup[cron*/1 * * * *,routebroadcast],// 所有节点都执行];// 抢分布式锁只有一个调度中心实例工作functionacquireLock(Client$redis,string$key,int$ttl30):bool{return(bool)$redis-set($key,gethostname(),EX,$ttl,NX);}functionmatchCron(string$cron):bool{[$min,$hour,$dom,$mon,$dow]explode( ,$cron);$tgetdate();$matchfn($expr,$val)$expr*?true:(str_starts_with($expr,*/)?$val%(int)substr($expr,2)0:in_array($val,array_map(intval,explode(,,$expr))));return$match($min,$t[minutes])$match($hour,$t[hours])$match($dom,$t[mday])$match($mon,$t[mon])$match($dow,$t[wday]);}// 每分钟检查一次生产用每秒检查Timer::tick(60000,function()use($redis,$jobs){if(!acquireLock($redis,scheduler:lock))return;// 没抢到锁就跳过foreach($jobsas$jobName$job){if(!matchCron($job[cron]))continue;$taskIduniqid($jobName.:,true);$taskjson_encode([id$taskId,job$jobName,route$job[route],triggertime(),]);match($job[route]){broadcastbroadcastTask($redis,$jobName,$task),round_robin$redis-rpush(queue:worker:.nextWorker($redis),$task),default$redis-rpush(queue:worker:.randomWorker($redis),$task),};// 记录调度日志$redis-hset(job:log:$taskId,status,dispatched,time,time());echo[$taskId] 已调度$jobName\n;}});functionbroadcastTask(Client$redis,string$job,string$task):void{foreach($redis-smembers(workers:online)as$worker){$redis-rpush(queue:worker:$worker,$task);}}functionnextWorker(Client$redis):string{$workers$redis-smembers(workers:online);$idx(int)$redis-incr(scheduler:rr_index)%count($workers);return$workers[$idx];}functionrandomWorker(Client$redis):string{return$redis-srandmember(workers:online);}echo调度中心启动\n;\Swoole\Event::wait();执行器节点多实例?php// worker.php - 多实例部署requirevendor/autoload.php;useSwoole\Process;usePredis\Client;$redisnewClient();$workerIdgethostname().:.getmypid();// 注册自己上线$redis-sadd(workers:online,$workerId);$redis-expire(workers:online,30);// 注册任务处理器$handlers[order.timeoutfunction(array$task):array{// 实际业务逻辑echo处理订单超时任务\n;return[processed42,statusok];},report.dailyfunction(array$task):array{echo生成日报\n;return[report_iduniqid(),statusok];},cache.warmupfunction(array$task):array{echo预热缓存\n;return[keys1024,statusok];},];// 心跳保持在线状态\Swoole\Timer::tick(10000,fn()$redis-sadd(workers:online,$workerId)$redis-expire(workers:online,30));// 主循环阻塞拉取任务echoWorker$workerId启动\n;while(true){// blpop 阻塞等待超时2秒重试$item$redis-blpop(queue:worker:$workerId,2);if(!$item)continue;$taskjson_decode($item[1],true);$job$task[job];echo[{$task[id]}] 开始执行$job\n;$redis-hset(job:log:{$task[id]},status,running,worker,$workerId);try{$resultisset($handlers[$job])?($handlers[$job])($task):thrownew\RuntimeException(未知任务:$job);$redis-hset(job:log:{$task[id]},status,success,result,json_encode($result));echo[{$task[id]}] 完成\n;}catch(\Throwable$e){$redis-hset(job:log:{$task[id]},status,failed,error,$e-getMessage());// 失败重试最多3次$retries(int)$redis-hget(job:log:{$task[id]},retries);if($retries3){$redis-hset(job:log:{$task[id]},retries,$retries1);$redis-rpush(queue:worker:$workerId,json_encode($task));}echo[{$task[id]}] 失败:{$e-getMessage()}\n;}}查询任务状态?php// 查日志$log$redis-hgetall(job:log:$taskId);// [status success, result ..., worker host:pid]大白话解释 分布式任务调度 单机定时任务crontab 一台服务器挂了 → 任务不跑了 任务太多 → 一台跑不完 分布式任务调度 调度中心只管什么时候触发什么任务执行器只管怎么跑任务Redis中间传话的 流程 调度中心 → 到点了order.timeout该跑了 → 塞进Redis队列 Worker1 → 从队列取出来跑 → 跑完上报结果 Worker挂了 → 其他Worker继续跑不影响
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2413575.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!