代码中包含了PHP在一般情况下使用方法和RabbitMQ开启了SSL验证后的使用方法(我这边消费队列是使用接口请求的方式,每次只从中取出一条)
安装amqp扩展
PHP使用RabbitMQ前,需要安装amqp扩展,之前文章中介绍了Windows环境PHP安装amqp扩展的方法:windows环境PHP使用RabbitMq安装amqp扩展_windows mq扩展安装-CSDN博客
Linux中安装amqp扩展:
### 先进入/usr/local目录下,下载两个文件到此目录(我的PHP版本是7.2):
wget -c https://github.com/alanxz/rabbitmq-c/releases/download/v0.8.0/rabbitmq-c-0.8.0.tar.gz
wget -c http://pecl.php.net/get/amqp-1.9.3.tgz
### 若使用的docker,将上面下载的两个包 拷贝到容器内【 docker cp ./文件 dockerID:/usr/local】,然后执行下面命令即可
### 解压rabbitmq-c-0.8.0.tar.gz
tar zxf rabbitmq-c-0.8.0.tar.gz
cd /usr/local/rabbitmq-c-0.8.0
./configure --prefix=/usr/local/rabbitmq-c-0.8.0
make && make install
### 然后解压 amqp-1.9.3.tgz 解压后amqp-1.9.3文件下内还有个amqp-1.9.3文件夹,将内部的amqp-1.9.3目录拷贝到/usr/local/下,执行下列命令:
cd /usr/local/amqp-1.9.3
/usr/local/bin/phpize
./configure --with-php-config=/usr/local/bin/php-config --with-amqp --with-librabbitmq-dir=/usr/local/rabbitmq-c-0.8.0
cp /usr/local/rabbitmq-c-0.8.0/librabbitmq/amqp_ssl_socket.h /usr/local/amqp-1.9.3/
make && make install
### 最后修改php.ini    加上配置:
extension = amqp.so 安装后,执行php -m 显示amqp  即表明扩展安装成功!
 即表明扩展安装成功!
加载PHP代码的扩展包
然后需要加载代码的扩展包,比较方便快捷的方法是使用composer 加载扩展包
composer require php-amqplib/php-amqplib
若想指定版本:composer require php-amqplib/php-amqplib:版本具体使用哪个版本可以在此链接内查询:https://packagist.org/packages/php-amqplib/php-amqplib
示例代码(包含开启了SSL的连接方式)
<?php
namespace common\helpers;
use models\setting\Log;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class AmqpHelper
{
    /**
     * rabbitMq 未开启ssl验证 消费者
     * @return false|string|void
     * @throws \AMQPChannelException
     * @throws \AMQPConnectionException
     * @throws \AMQPQueueException
     * @time 2024/12/2 13:43
     * @author zsh
     */
    public static function consumerResult()
    {
        //队列配置信息
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $conn = new \AMQPConnection($configParams);
        if (!$conn->connect()) {
            die("连接rabbitmq失败!\n");
        }
        //建立信道
        $channel = new \AMQPChannel($conn);
        // 创建队列
        $q = new \AMQPQueue($channel);
        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $q->setName($queueName);
        $q->setFlags(AMQP_DURABLE);             // 持久化
        // 绑定交换机与队列,并指定路由键
        $q->bind(\Yii::$app->params['cotaTct']['exchange'], \Yii::$app->params['cotaTct']['routingKey']);
        // 消息获取
        $ret = $q->get(AMQP_AUTOACK);
        if ($ret) {
//            echo "\nget data:\n";
//            var_dump($ret->getBody());
//            var_dump(json_decode($ret->getBody(), true));
            $conn->disconnect();
            return $ret->getBody();
        }else{
            $conn->disconnect();
            return false;
        }
    }
    /**
     * rabbitMq 开启ssl了验证 消费者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslConsumerResult()
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );
        $connection = new AMQPSSLConnection(
            $configParams['host'],
            $configParams['port'],
            $configParams['login'],
            $configParams['password'],
            $configParams['vhost'],
            $ssl_options);
        if (!$connection->isConnected()) {
            die("连接rabbitmq失败!\n");
        }
//        echo '链接成功...';
        $queueName = \Yii::$app->params['cotaTct']['queueName']; //队列名
        $exchange = \Yii::$app->params['cotaTct']['exchange'];
        $routingKey = \Yii::$app->params['cotaTct']['routingKey'];
        $channel = $connection->channel();
        // 声明交换器
        $channel->exchange_declare($exchange, 'topic', false, true, false);
        // 获取系统生成的消息队列名称,这里也可以指定一个队列名称
        $channel->queue_declare($queueName, false, true, false, false);
        // 将队列名与交换器名进行绑定,并指定routing_key(路由键值)
        $channel->queue_bind($queueName,$exchange,$routingKey);
        $message = '';
        // 定义收到消息回调函数
        $callback = function ($msg) use (&$message) {
//            echo 'Message:'.$msg->body;
            $message = $msg->body;
            // 手动确认消息是否正常消费
            $msg->delivery_info['channel']->basic_Ack($msg->delivery_info['delivery_tag']);
        };
        // 设置消费成功后才能继续进行下一个消费
        $channel->basic_qos(null, 1, null);
        // 开启消费no_ack=false,设置为手动应答
        $channel->basic_consume($queueName, '', false, false, false, false, $callback);
        // 循环进行消费
//        while ($channel->is_consuming()) {
//            try {
//                $channel->wait(null, false, $timeout = 10);
//            }catch (AMQPTimeoutException $ex){
//                // 没有消息可处理,退出循环
//                echo $ex->getMessage();
//                break;
//            }
//        }
        if ($channel->is_consuming()) {
            try {
                $channel->wait(null, false, $timeout = 5);
            }catch (AMQPTimeoutException $ex){
                // 没有消息可处理,退出循环
                echo $ex->getMessage();
            }
        }
        //关闭连接
        $channel->close();
        $connection->close();
        $return = $message;
        unset($message);
        $message = null;
        return $return;
    }
    /**
     * rabbitMq 未开启ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function producer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['queueHost'],
            'port' => \Yii::$app->params['cotaTct']['queuePort'],
            'login' => \Yii::$app->params['cotaTct']['queueLogin'],
            'password' => \Yii::$app->params['cotaTct']['queuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];
        try {
            $conn = new AMQPStreamConnection($configParams['host'], $configParams['port'], $configParams['login'], $configParams['password']);
            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);
            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);
            $channel->close();
            $conn->close();
            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }
    /**
     * rabbitMq 开启了ssl验证 生产者
     * @return mixed|string|void
     * @throws \ErrorException
     * @time 2024/12/2 13:44
     * @author zsh
     */
    public static function sslProducer($message)
    {
        $configParams = array(
            'host' => \Yii::$app->params['cotaTct']['prodQueueHost'],
            'port' => \Yii::$app->params['cotaTct']['prodQueuePort'],
            'login' => \Yii::$app->params['cotaTct']['prodQueueLogin'],
            'password' => \Yii::$app->params['cotaTct']['prodQueuePassword'],
            'vhost' => \Yii::$app->params['cotaTct']['queueVhost']
        );
        $exchangeName = \Yii::$app->params['cotaTct']['producerExchange'];
        // 创建SSL连接时忽略证书验证
        $ssl_options = array(
            'verify_peer' => false,
            'verify_peer_name' => false,
        );
        try {
            $conn = new AMQPSSLConnection(
                $configParams['host'],
                $configParams['port'],
                $configParams['login'],
                $configParams['password'],
                $configParams['vhost'],
                $ssl_options);
            if (!$conn->isConnected()) {
                die("连接rabbitmq失败!\n");
            }
            //创建channel
            $channel = $conn->channel();
            $channel->exchange_declare($exchangeName,'fanout',false,true,false);
            $messageData = new AMQPMessage($message);
            $channel->basic_publish($messageData, $exchangeName);
            $channel->close();
            $conn->close();
            return true;
        }catch (\Exception $e){
            Log::error('AMQP队列错误:'.$e,'AMQP');
            return false;
        }
    }
}


















