安装
# 创建项目
composer create-project topthink/think 5.0.*
# 安装队列扩展
composer require topthink/think-queue
配置
// application/extra/queue.php
<?php
return [
'connector' => 'Redis', // Redis 驱动
'expire' => 0, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
'default' => 'default', // 默认的队列名称
'host' => '127.0.0.1', // redis 主机ip
'port' => 6379, // redis 端口
'password' => '', // redis 密码
'select' => 0, // 使用哪一个 db,默认为 db0
'timeout' => 0, // redis连接的超时时间
'persistent' => false,
];
数据库
CREATE TABLE `qf_test` (
`id` int(10) NOT NULL AUTO_INCREMENT,
`task_type` varchar(50) DEFAULT '' COMMENT '任务类型',
`data` text COMMENT '数据',
`pdate` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
创建队列(入队)-生产者
<?php
namespace app\api\controller;
use think\Controller;
class Index extends Controller {
// 生产者-入队
public function qf() {
// 1.当前任务将由哪个类来负责处理。
$jobHandlerClassName = 'app\api\job\QfDev';
// 2.队列名称,如果为新队列,会自动创建
$jobQueueName = "qfDevQueue";
// 3.当前任务所需的业务数据.
$jobData = [ 'ts' => time(), 'bizId' => uniqid() , 'data' => $_GET ];
// 4.将该任务推送到消息队列,等待对应的消费者去执行
$isPushed = Queue::push( $jobHandlerClassName , $jobData , $jobQueueName );
// 把任务分配到队列中,延迟10s后执行
// $isPushed = Queue::later(10,$jobHandlerClassName,$jobData,$jobQueueName);
// database驱动时,返回值:1|false;
// redis驱动时,返回值:随机字符串|false
if( $isPushed !== false ){
echo date('Y-m-d H:i:s') . " a new Hello Job is Pushed to the MQ"."<br>";
}else{
echo 'something went wrong.';
}
}
}
消费队列(出队)-消费者
<?php
namespace app\api\job;
use think\queue\Job;
class QfDev {
public function fire(Job $job,$data) {
// 检查数据【可省】
$flag = $this->checkJob($data);
if(!$flag){
$job->delete();
return;
}
$isJobDone = $this->doJob($data);
if ($isJobDone) {
// ...
// 执行完,删除任务
$job->delete();
}else{
// 检查方法执行次数
if ($job->attempts() > 3) {
$job->delete();
// 重新发布,延期2秒再次执行
//$job->release(2);
}
}
}
// 检查数据
private function checkJob($data){
// ... 数据检查
return true;
}
// 业务处理
private function doJob($data)
{
// ... 业务处理
return true;
}
}
访问
// 请求接口
http://localhost/api/index/qf
队列命令
# 单次执行
开始一个队列
php think queue:work
停止所有队列
php think queue:restart
重启所有消息队列
php think queue:restart
php think queue:work
# 多次执行
php think queue:work --daemon --queue helloJobQueue
宝塔任务进程管理器