php方案 Beanstalkd
安装 composer require pda/pheanstalk monolog/monolog---项目结构 src/├── Queue/│ ├── QueueManager.php # 队列管理器 │ ├── JobDispatcher.php # 任务派发 │ └── WorkerPool.php # Worker 管理 ├── Jobs/│ ├── JobInterface.php # 任务接口 │ ├── EmailJob.php # 发邮件任务 │ ├── ImageResizeJob.php # 图片处理任务 │ └── ReportJob.php # 报表任务 └── Worker/└── Worker.php # Worker 主进程---安装 Beanstalkd#Ubuntu/Debianapt install beanstalkd#macOSbrew install beanstalkd # 启动 beanstalkd-l0.0.0.0-p11300-z1048576#-z 最大 job 大小1MB---1.任务接口?php// src/Jobs/JobInterface.phpnamespaceApp\Jobs;interface JobInterface{publicfunctionhandle():void;publicfunctionfailed(\Throwable $e):void;}---2.具体任务?php// src/Jobs/EmailJob.phpnamespaceApp\Jobs;classEmailJobimplements JobInterface{publicfunction__construct(publicreadonly string $to,publicreadonly string $subject,publicreadonly string $body,){}publicfunctionhandle():void{// 发送邮件逻辑mail($this-to,$this-subject,$this-body);echo邮件已发送至: {$this-to}\n;}publicfunctionfailed(\Throwable $e):void{echo邮件发送失败: {$e-getMessage()}\n;// 记录到数据库或告警}}?php// src/Jobs/ImageResizeJob.phpnamespaceApp\Jobs;classImageResizeJobimplements JobInterface{publicfunction__construct(publicreadonly string $sourcePath,publicreadonlyint$width,publicreadonlyint$height,publicreadonly string $outputPath,){}publicfunctionhandle():void{$imageimagecreatefromjpeg($this-sourcePath);$resizedimagescale($image,$this-width,$this-height);imagejpeg($resized,$this-outputPath,90);imagedestroy($image);imagedestroy($resized);echo图片已处理: {$this-outputPath}\n;}publicfunctionfailed(\Throwable $e):void{echo图片处理失败: {$e-getMessage()}\n;}}---3.队列管理器?php// src/Queue/QueueManager.phpnamespaceApp\Queue;use Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;use Pheanstalk\Values\Timeout;use App\Jobs\JobInterface;classQueueManager{privatePheanstalk $conn;publicfunction__construct(string $host127.0.0.1,int$port11300,){$this-connPheanstalk::create($host,$port);}// ── 派发任务 ────────────────────────────────────────────publicfunctiondispatch(JobInterface $job,string $tubedefault,int$priority1024,// 0最高, 4294967295最低int$delay0,// 秒延迟执行int$ttr60,// 任务超时时间秒):int{$payloadjson_encode([classget_class($job),payloadserialize($job),attempts0,]);$jobId$this-conn-useTube(newTubeName($tube))-put(data:$payload,priority:$priority,delay:$delay,ttr:$ttr,);echo任务已入队 [{$tube}] ID: {$jobId-getId()}\n;return$jobId-getId();}// ── 延迟任务 ────────────────────────────────────────────publicfunctiondispatchDelayed(JobInterface $job,int$delaySeconds,string $tubedefault):int{return$this-dispatch($job,$tube,delay:$delaySeconds);}// ── 获取队列统计 ────────────────────────────────────────publicfunctionstats(string $tubedefault):array{$stats$this-conn-statsTube(newTubeName($tube));return[ready$stats[current-jobs-ready],delayed$stats[current-jobs-delayed],buried$stats[current-jobs-buried],reserved$stats[current-jobs-reserved],total$stats[total-jobs],];}// ── 清空管道 ────────────────────────────────────────────publicfunctionflush(string $tubedefault):int{$count0;$this-conn-watch(newTubeName($tube));while($job$this-conn-peekReady(newTubeName($tube))){$this-conn-delete($job);$count;}return$count;}publicfunctiongetConnection():Pheanstalk{return$this-conn;}}---4.Worker 主进程?php// src/Worker/Worker.phpnamespaceApp\Worker;use Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;use Pheanstalk\Values\Timeout;use Monolog\Logger;use Monolog\Handler\StreamHandler;classWorker{privatePheanstalk $conn;privateLogger $log;privatebool$runningtrue;privateint$maxRetries3;publicfunction__construct(privatearray $tubes[default],string $host127.0.0.1,int$port11300,){$this-connPheanstalk::create($host,$port);$this-lognewLogger(worker);$this-log-pushHandler(newStreamHandler(php://stdout));$this-log-pushHandler(newStreamHandler(/var/log/worker.log));}publicfunctionrun():void{// 监听多个管道foreach($this-tubes as $tube){$this-conn-watch(newTubeName($tube));}$this-conn-ignore(newTubeName(default));// 只处理指定管道// 捕获信号优雅停机if(function_exists(pcntl_signal)){pcntl_signal(SIGTERM,fn()$this-runningfalse);pcntl_signal(SIGINT,fn()$this-runningfalse);}$this-log-info(Worker 启动,[tubes$this-tubes]);while($this-running){if(function_exists(pcntl_signal_dispatch)){pcntl_signal_dispatch();}// 阻塞等待任务超时 5 秒重试$job$this-conn-reserveWithTimeout(5);if($jobnull){continue;}$this-processJob($job);}$this-log-info(Worker 已停止);}privatefunctionprocessJob(\Pheanstalk\Contract\JobIdInterface $job):void{$raw$this-conn-peek($job)-getData();$datajson_decode($raw,true);$this-log-info(处理任务,[id$job-getId(),class$data[class]??unknown,]);try{/** var \App\Jobs\JobInterface $jobInstance */$jobInstanceunserialize($data[payload]);$jobInstance-handle();// 成功 → 删除任务$this-conn-delete($job);$this-log-info(任务完成,[id$job-getId()]);}catch(\Throwable $e){$attempts($data[attempts]??0)1;$this-log-error(任务失败,[id$job-getId(),attempts$attempts,error$e-getMessage(),]);if($attempts$this-maxRetries){// 超过重试次数 → bury人工介入$this-conn-bury($job);$this-log-warning(任务已 bury,[id$job-getId()]);try{$jobInstanceunserialize($data[payload]);$jobInstance-failed($e);}catch(\Throwable){}}else{// 指数退避重新入队30s、60s、120s$delay30*(2**($attempts-1));$data[attempts]$attempts;$this-conn-delete($job);$this-conn-useTube(newTubeName(default))-put(data:json_encode($data),priority:1024,delay:$delay,ttr:60,);$this-log-info(任务重试中{$delay}s 后,[id$job-getId()]);}}}}---5.多管道优先级派发示例?php// dispatch.phprequirevendor/autoload.php;use App\Queue\QueueManager;use App\Jobs\EmailJob;use App\Jobs\ImageResizeJob;use App\Jobs\ReportJob;$queuenewQueueManager(127.0.0.1,11300);// 普通邮件默认优先级$queue-dispatch(newEmailJob(aliceexample.com,欢迎注册,感谢您的注册),tube:emails,);// 高优先级验证码邮件priority0 最高$queue-dispatch(newEmailJob(bobexample.com,验证码123456,您的验证码),tube:emails,priority:0,ttr:30,);// 图片处理延迟 5 秒$queue-dispatch(newImageResizeJob(/uploads/photo.jpg,800,600,/uploads/photo_800.jpg),tube:images,delay:5,ttr:120,);// 报表任务凌晨执行延迟到明天 0 点$secondsUntilMidnightstrtotime(tomorrow)-time();$queue-dispatchDelayed(newReportJob(daily,date(Y-m-d)),$secondsUntilMidnight,tube:reports,);// 查看队列状态print_r($queue-stats(emails));---6.启动多个专用 Worker?php// worker.php命令行入口requirevendor/autoload.php;use App\Worker\Worker;$tube$argv[1]??default;$workernewWorker(tubes:[$tube]);$worker-run();# 启动多个 Worker 进程处理不同管道 php worker.php emailsphp worker.php emails# 并发处理邮件 php worker.php imagesphp worker.php reports# 或用 Supervisor 管理---7.Supervisor 配置生产环境;/etc/supervisor/conf.d/workers.conf[program:worker-emails]commandphp/var/www/worker.php emails numprocs3autostarttrueautorestarttrueredirect_stderrtruestdout_logfile/var/log/worker-emails.log[program:worker-images]commandphp/var/www/worker.php images numprocs2autostarttrueautorestarttruestdout_logfile/var/log/worker-images.log[program:worker-reports]commandphp/var/www/worker.php reports numprocs1autostarttrueautorestarttruestdout_logfile/var/log/worker-reports.log supervisorctl rereadsupervisorctl update supervisorctl status---8.Buried 任务管理监控面板?php// src/Queue/BuriedManager.phpuse Pheanstalk\Pheanstalk;use Pheanstalk\Values\TubeName;classBuriedManager{publicfunction__construct(privatePheanstalk $conn){}// 查看 buried 任务publicfunctioninspect(string $tube):?array{$job$this-conn-peekBuried(newTubeName($tube));if(!$job)returnnull;return[id$job-getId(),datajson_decode($job-getData(),true),];}// 重新执行所有 buried 任务publicfunctionkickAll(string $tube,int$count100):int{return$this-conn-kick($count);}// 删除所有 buried 任务publicfunctiondeleteAllBuried(string $tube):int{$count0;while($job$this-conn-peekBuried(newTubeName($tube))){$this-conn-delete($job);$count;}return$count;}}---核心概念速查 tube → 队列管道按业务类型分 ready → 待处理 reserved → 处理中 delayed → 延迟等待 buried → 失败存档人工介入 ttr → 任务执行超时时间 priority →0最高优先级 kick → 将 buried 任务重新变为 ready
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2511634.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!