消息队列RabbitMQ的配置操作及使用
一、RabbitMQ的体系结构RabbitMQ是一个基于AMQPAdvanced Message Queuing Protocol高级消息队列协议实现的开源消息中间件主要用于在分布式系统中存储和转发消息。它由Erlang语言编写以高性能、高可用性以及高扩展性而著称。官网地址RabbitMQ: One broker to queue them all | RabbitMQ1.1 基本架构设计①生产者Producer消息的发送方负责产生并发送消息到 rabbitMQ 服务端②交换机Exchange消息的分发中心负责将接收到的消息路由到一个或多个队列直连交换机Direct Exchange将消息路由到与消息中的路由键完全匹配的队列主题交换机Topic Exchange根据通配符匹配路由键将消息路由到相应队列扇出交换机Fanout Exchange将消息广播到与交换机绑定的所有队列头部交换机Headers Exchange根据消息头中的属性匹配到相应队列③队列消息的存储区用于存储生产者发送的消息④消费者消息的接收方负责从队列中获取消息并进行处理⑤绑定交换机和队列之间的关联关系。生产者将消息发送给交换机队列通过绑定交换机从 而接收消息⑥虚拟主机RabbitMQ 的基本工作单元每个虚拟主机拥有独立的用户、队列、交换机等资源⑦连接连接是指生产者、消费者与 RabbitMQ 之间的网络连接。每个连接可以包含多个信道 (Channel)每个信道是一个独立的会话通道可以进行独立的消息传递1.2 消息中间件的作用解耦降低应用程序之间的直接依赖性从而实现独立开发、部署和升级的能力异步可以将长时间的处理任务放入消息队列中异步处理从而提升响应速度削峰通过平衡系统负载来减轻峰值压力和填充低谷时的资源利用率1.3 常用消息队列对比二、基于 docker 安装 RabbitMQ2.1 拉取镜像docker pull rabbitmq // 如果需要包含管理插件的镜像可以拉取带有-management标签的镜像 docker pull rabbitmq:3.8-management2.2 运行镜像docker run -d --name rabbitmq --restartalways \ -p 5672:5672 -p 15672:15672 \ rabbitmq:management2.3 查看安装结果浏览器中输入 http:///127.0.0.1:15672进入rabbitMQ 管理界面默认用户名和密码都是guest注意如果是在阿里云环境中运行需要将5672和15672端口添加到安全组三、rabbitMQ 工作模式官网介绍了7中工作模式如下3.1 Work Queues生产者将消息发送到默认交换机再推送到自定义队列多个消费者监听同一个队列时谁先抢到消息算谁的。当生产者只有一个且消费者只有一个时就是“hello world”模式即简单模式。3.2 发布订阅模式publish/Subscribe需要创建一个fanout类型的交换机无需设置路由键交换机将消息广播到与其绑定的所有队列中再由消费者进行消费。3.3 路由模式routing需要创建一个 direct 类型的交换机并设置路由键将消息发送到特定的队列中然后由消费者消费消息。3.4 主题模式topics需要创建一个 topic 类型的交换机并设置用通配符表示的路由键将消息发送到匹配的队列中。匹配规则* 表示匹配一个词# 表示匹配0个或多个词四、基于 golang 使用 RabbitMQ在go语言中使用 RabbitMQ需引用第三方包github.com/rabbitmq/amqp091-gogo get https://github.com/rabbitmq/amqp091-go4.1 连接 rabbitMQ 服务器// 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close()4.2 创建连接管道 channel// 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close()4.3 创建交换机 exchange// 创建交换机 err ch.ExchangeDeclare( direct.test.go, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 )4.4 创建队列 queue// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.go, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 nil, // 其他参数 ) if err ! nil { panic(err) }4.5 绑定交换机与队列// 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.go, // 交换机名称 false, // 是否等待 nil, // 其他参数 )4.6 发布消息// 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 body : Hello World!88888 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, )4.7 消费消息// 消费消息 msgs, err : ch.Consume( queue.direct.test.go, // 队列名称 , // 消费者标识 false, // 是否自动确认 false, // 是否排他 false, // 是否等待 false, // 其他参数 nil, // 其他参数 ) // 取出管道中的消息并打印 for msg : range msgs { fmt.Printf(消费到消息内容%s \n, string(msg.Body)) msg.Ack(false) }生产者发送消息完整代码package main import ( context fmt time amqp github.com/rabbitmq/amqp091-go ) func main() { // 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close() // 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close() // 创建交换机 err ch.ExchangeDeclare( direct.test.go, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 ) // 创建队列 q, err : ch.QueueDeclare( queue.direct.test.go, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 nil, // 其他参数 ) if err ! nil { panic(err) } // 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.go, // 交换机名称 false, // 是否等待 nil, // 其他参数 ) // 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 body : Hello World!88888 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, ) fmt.Printf(推送消息%s, body) }消费者端接收消息完整代码package main import ( fmt amqp github.com/rabbitmq/amqp091-go ) func main() { // 连接 rabbitMQ 服务器延迟关闭 conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/) if err ! nil { panic(err) } defer conn.Close() // 创建channel ch, err : conn.Channel() if err ! nil { panic(err) } defer ch.Close() // 消费消息 如果生产者端或手动建立了交换机和队列及绑定关系则消费者端可直接根据队列名消费消息而无需再创建交换机、队列等 msgs, err : ch.Consume( queue.direct.test.go, // 队列名称 , // 消费者标识 false, // 是否自动确认 false, // 是否排他 false, // 是否等待 false, // 其他参数 nil, // 其他参数 ) // 取出管道中的消息并打印 for msg : range msgs { fmt.Printf(消费到消息内容%s \n, string(msg.Body)) msg.Ack(false) } }在队列 queque.direct.test.go 中可查看到消息内容五、RabbitMQ 进阶篇5.1 消息的可靠性投递三种消息丢失场景①生产者发送消息到rabbitMQ服务的过程中出现丢失②rabbitMQ服务器进行消息持久化的过程中出现丢失比如服务宕机重启③消费者拉取信息时存在网络波动等导致消息丢失或消息者处理消息异常导致丢失消息丢失的解决方案①消息确认机制针对生产者Confirm 模式是 RabbitMQ 提供的一种消息可靠性保障机制。当生产者通过 Confirm 模式发送消息时它会等待 RabbitMQ 的确认确保消息已经被正确地投递到了指定的 Exchange 中。消息正确投递到 queue 时会返回 ack。消息没有正确投递到 queue 时会返回 nack。如果 exchange 没有绑定 queue也会出现消息丢失使用方法:生产者通过 confirm.select 方法将 Channel 设置为 Confirm 模式发送消息后通过添加 add confirm listener 方法监听消息的确认状态。②消息持久化机制针对rabbitMQ服务器持久化机制是指将消息存储到磁盘以保证在 RabbitMQ 服务器宕机或重启时消息不会丢失。使用方法:生产者通过将消息的 delivery mode 属性设置为 2将消息标记为持久化。队列也需要进行持久化设置确保队列在 RabbitMQ 服务器重启后仍然存在。经典队列需要将durable属性设置为true。而仲裁队列和流式队列默认必须持久化保存。注意事项:持久化机制会影响性能因此在需要确保消息不丢失的场景下使用③ACK事务机制针对消费者ACK 事务机制用于确保消息被正确消费。当消息被消费者成功处理后消费者发送确认(ACK)给RabbitMQ告知消息可以被移除。这个过程是自动处理的也可以关闭进行手工发送 ACK。使用方法:在 RabbitMQ 中ACK 机制默认是开启的。当消息被消费者接收后会立即从队列中删除除非消费者发生异常。可以手动开启 ACK 机制通过将 auto_ack 参数设置为 False手动控制消息的 ACK注意事项:ACK 机制可以确保消息不会被重复处理但如果消费者发生异常或者未发送 ACK消息可能会被重复投递三种重复消费场景①生产者重复发送同一条消息②rabbit服务器消费者消费消息后没来得及发送ACK消息rabbit服务器挂掉MQ认为消息还未被消费当MQ重启后会继续推送这条消息③消费者消费者处理完消息没来得及发送ACK确认消息消费者挂掉。MQ认为消息还未被消费当消费者重启后会再次接收到这条消息重复消费的解决方案①使用数据库唯一约束局限性大②插入消费记录根据消息ID将消息先插入数据库插入成功后给rabbitMQ返回ACK确认消息。消费者处理完业务则增加标记表示消息已处理成功如果处理失败则记录失败次数及原因已提醒管理人员进行手动处理。消息堆积的原因消息堆积的解决方案①优化消费者性能增加消费者数量②增加队列的容量以存储更多的消息③将无法处理的消息转移到死信队列④将大消息分割为小消息提高处理速度⑤简化消费端业务处理逻辑⑥控制生产者发送消息的速度⑦设置消息优先级优先处理高优先级的消息5.2 消费端限流通过设置rabbitMQ的prefetch count参数可以控制服务器一次投递给消费者的消息数量以适应消费者处理消息的速率避免大量消息都投递到消费者。// 消费者端代码 //设置消费端限流 rabbitmq未收到ack消息时只投递1条消息到消费者 err ch.Qos( 1, // prefetch count 0, // prefetch size false, // global )5.3 消息超时在 RabbitMQ 中设置消息或队列的“超时”即自动过期/消失主要有两种方式都通过 x-message-ttl 参数来实现。当消息在队列中停留的时间超过这个值且未被消费者确认ACKRabbitMQ 会将该消息标记为“死信”Dead Letter或直接丢弃。优先级如果队列设置了 TTL消息也设置了 TTL取两者中较短的那个队列级 TTL该队列中的所有消息都需要相同的过期时间在声明队列 (QueueDeclare) 时通过 Arguments 参数设置。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl2, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间单位是毫秒 }, )消息级 TTL同一个队列中不同消息需要不同的过期时间例如VIP 用户消息保留 1 小时普通用户保留 5 分钟。在发布消息 (Publish) 时通过 Publishing 属性的 Expiration 字段设置。body : fmt.Sprintf(Hello World! %d, i) ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), Expiration: 5000, // 发布消息时可以设置该条消息的过期时间与队列的消息过期时间参数对比取较短的过期时间 }, )5.4 死信队列死信队列是 RabbitMQ 中一种特殊的机制用于接收那些无法正常被消费的消息。产生的原因消息被拒绝消息过期队列满了使用方法x-dead-letter-exchange指定死信消息要转发到的交换机名称x-dead-letter-routing-key指定死信消息转发时的路由键// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl2, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 amqp.QueueMessageTTLArg: 30000, // 队列中消息过期时间 x-dead-letter-exchange: exchange.direct.dead.leter, // 过期后发往死信队列绑定的交换机 x-dead-letter-routing-key: routing.key.dead.leter, // 指定路由键 }, )应用场景订单超时自动取消异常消息隔离与报警消息重试机制5.5 延迟队列延迟队列是消息中间件中一种特殊的场景指消息被发送后不会立即被消费者消费而是需要在指定的时间之后才能被消费。实现方式方案一正常队列 TTL 死信队列方案二rabbitmq_delayed_message_exchange 插件应用场景5.6 惰性队列尽可能将消息直接存储在磁盘上只在消费者请求时才将少量消息加载到 RAM 中。内存占用极低且稳定能轻松处理百万级甚至亿级的消息堆积不会因内存爆炸而宕机。使用方法创建队列时配置 x-queue-mode 参数为 lazy。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl3, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 x-queue-mode: lazy, // 关键配置开启惰性模式 }, )使用场景5.7 优先级队列优先级队列允许你在发送消息时给每条消息分配一个优先级0-255。当消费者从队列中取消息时RabbitMQ 会优先投递优先级高的消息而不是严格按照“先进先出”FIFO的顺序。使用方法在 RabbitMQ 中优先级队列不是默认开启的需要在声明队列时设置最大优先级参数 x-max-priority。数值越大优先级越高。创建队列时配置 x-max-priority 参数发送消息时设置消息的优先级 Priority。// 创建队列 q, err : ch.QueueDeclare( queue.direct.test.ttl3, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 map[string]any{ // 其他参数 x-max-priority: int32(10), // 设置最大优先级为 10 }, ) // 发送消息 ch.PublishWithContext(ctx, direct.test.go, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), Priority: 9, // 设置消息优先级值越大优先级越高 }, )应该场景优先级队列缺点内存消耗剧增只有“相对”优先没有“绝对”插队六、RabbitMQ 集群篇6.1 集群搭建环境说明 windows wsl 系统中运用 docker compose 搭建 rabbitmq 集群6.1.1 创建相关目录目录结构如下: rabbitmq-cluster/ ├── docker-compose.yml ├── entrypoint.sh └── data/ # (可选) 用于持久化数据脚本运行后会自动生成子文件夹6.1.2 创建自动加入集群的 shell 脚本 entrypoint.sh创建 entrypoint.sh 文件用于将 rabbitmq 节点自动加入到集群创建完该文件后需要确保脚本有执行权限执行命令 chmod x entrypoint.sh#!/bin/bash set -e # 配置变量 COOKIE_VAL${RABBITMQ_ERLANG_COOKIE:-secret_cookie_123} COOKIE_FILE/var/lib/rabbitmq/.erlang.cookie MY_HOST$(hostname) MASTER_NODErabbitrabbitmq1 echo [$(date)] 启动节点: $MY_HOST # 1. 统一 Erlang Cookie (集群通信的关键) if [ ! -f $COOKIE_FILE ]; then echo 写入 Erlang Cookie... echo $COOKIE_VAL $COOKIE_FILE chmod 600 $COOKIE_FILE chown rabbitmq:rabbitmq $COOKIE_FILE fi # 2. 判断是否为主节点 (rabbitmq1) if [ $MY_HOST rabbitmq1 ]; then echo [$(date)] 检测到是主节点 (rabbitmq1)直接启动服务... exec rabbitmq-server fi # 3. 非主节点逻辑后台启动 - 等待主节点 - 加入集群 - 前台运行 echo [$(date)] 检测到是从节点准备加入集群 $MASTER_NODE ... # 启动 RabbitMQ 为后台守护进程 rabbitmq-server -detached # 等待本地服务完全就绪 echo [$(date)] 等待本地服务就绪... until rabbitmqctl status /dev/null 21; do sleep 2 done # 等待主节点可连接 (防止主节点还没起好就尝试加入) echo [$(date)] 等待主节点 $MASTER_NODE 响应... while ! rabbitmqctl ping -n $MASTER_NODE /dev/null 21; do echo 主节点未就绪等待 2 秒... sleep 2 done # 执行集群加入操作 echo [$(date)] 执行加入集群操作... rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster $MASTER_NODE rabbitmqctl start_app echo [$(date)] 成功加入集群重启进程以前台模式运行... # 停止后台进程以便 exec 接管容器保持存活 rabbitmqctl stop # 4. 以前台模式正式运行 (此时已属于集群) exec rabbitmq-server6.1.3 创建 docker-compose.ymlservices: # --- 节点 1 (主节点/种子节点) --- rabbitmq1: image: rabbitmq:3.13-management-alpine hostname: rabbitmq1 container_name: rabbitmq1 environment: # 基础认证 - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin # 集群通信密钥 (所有节点必须一致) - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15673:15672 # 管理界面 - 5673:5672 # AMQP 端口 volumes: - ./data/rabbitmq1:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 2 --- rabbitmq2: image: rabbitmq:3.13-management-alpine hostname: rabbitmq2 container_name: rabbitmq2 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15674:15672 - 5674:5672 volumes: - ./data/rabbitmq2:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy # 确保节点 1 健康后再启动节点 2 networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 3 --- rabbitmq3: image: rabbitmq:3.13-management-alpine hostname: rabbitmq3 container_name: rabbitmq3 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15675:15672 - 5675:5672 volumes: - ./data/rabbitmq3:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy rabbitmq2: condition: service_healthy networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always networks: rabbitmq_net: driver: bridgeservices: # --- 节点 1 (主节点/种子节点) --- rabbitmq1: image: rabbitmq:3.13-management-alpine hostname: rabbitmq1 container_name: rabbitmq1 environment: # 基础认证 - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin # 集群通信密钥 (所有节点必须一致) - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15673:15672 # 管理界面 - 5673:5672 # AMQP 端口 volumes: - ./data/rabbitmq1:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 2 --- rabbitmq2: image: rabbitmq:3.13-management-alpine hostname: rabbitmq2 container_name: rabbitmq2 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15674:15672 - 5674:5672 volumes: - ./data/rabbitmq2:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy # 确保节点 1 健康后再启动节点 2 networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always # --- 节点 3 --- rabbitmq3: image: rabbitmq:3.13-management-alpine hostname: rabbitmq3 container_name: rabbitmq3 environment: - RABBITMQ_DEFAULT_USERadmin - RABBITMQ_DEFAULT_PASSadmin - RABBITMQ_ERLANG_COOKIEsecret_cookie_123 ports: - 15675:15672 - 5675:5672 volumes: - ./data/rabbitmq3:/var/lib/rabbitmq # 挂载脚本到容器内 - ./entrypoint.sh:/entrypoint.sh:ro entrypoint: [bash, /entrypoint.sh] depends_on: rabbitmq1: condition: service_healthy rabbitmq2: condition: service_healthy networks: - rabbitmq_net healthcheck: test: [CMD, rabbitmq-diagnostics, ping] interval: 10s timeout: 5s retries: 5 restart: always networks: rabbitmq_net: driver: bridge6.1.4 启动集群并查看相关日志# 启动集群 docker compose up -d # 查看日志 docker logs rabbitmq2或者查看节点的集群状态Running Nodes 显示三个节点表示创建集群成功# 查看集群状态 docker exec -it rabbitmq1 rabbitmqctl cluster_status6.1.5 访问网页管理端查看集群状态访问任意节点的管理端可以查看到三个节点如图所示# 访问管理端用户名及密码都是admindocker-compose.yml中配置的 http://127.0.0.1:156736.2 Quorum 队列6.2.1 概念Quorum Queue仲裁队列只能在 RabbitMQ 集群Cluster环境中创建和使用创建队列后会在集群中的所有节点都创建副本其中一个节点被选举为 Leader负责处理读写请求 另外的节点作为 Follower同步存储数据副本。采用 “多数派写入” 机制当消息写入时必须由集群中超过半数的节点确认成功后才视为写入成功并返回给生产者。当消费者成功处理消息并发送 ACK 确认后该消息会从集群中所有节点Leader 和所有 Follower上同时删除。6.2.2 特点6.3.3 使用方法在任意一个节点上创建队列type 选择 Quorum即可创建仲裁队列。6.3 Stream 队列RabbitMQ Stream Queue流队列是 RabbitMQ 从 3.9 版本引入的一种全新队列类型专为高吞吐、大数据量、日志类场景设计。 它的核心设计理念借鉴了 Apache Kafka将 RabbitMQ 从一个传统的“即时消费”消息代理扩展为支持“日志回放”和“无限存储”的流处理平台。机制消息像写日志一样按顺序追加到磁盘文件中。区别经典/仲裁队列消息被消费并 ACK 后立即删除。流队列消息被消费后不会删除而是永久保留直到达到配置的保留策略如时间或大小限制。优势支持历史消息回放。新消费者加入时可以从头开始读取或者从任意时间点Offset开始读取。6.4 基于 go 语言使用集群消息队列的方法6.4.1 创建连接func connectToCluster(nodes []string) (*amqp.Connection, error) { var lastErr error // 遍历所有节点尝试连接 for _, nodeUrl : range nodes { fmt.Printf(正在尝试连接节点: %s ...\n, nodeUrl) conn, err : amqp.Dial(nodeUrl) if err nil { // 连接成功 fmt.Printf(✅ 成功连接到节点: %s\n, nodeUrl) return conn, nil } lastErr err log.Printf(❌ 连接节点 %s 失败: %v, nodeUrl, err) // 可选可以在这里加一个极短的延时避免瞬间风暴 time.Sleep(100 * time.Millisecond) } // 如果所有节点都试过了还是失败返回最后一个错误 return nil, fmt.Errorf(无法连接到集群中的所有节点最后错误: %w, lastErr) }6.4.2 创建仲裁队列需要指定队列的类型为仲裁队列x-queue-type quorum// 创建仲裁队列 q, err : ch.QueueDeclare( queue.direct.test.cluster, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 amqp.Table{ amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点集群使用仲裁队列 }, )6.4.3 完整代码示例package main import ( context fmt log time amqp github.com/rabbitmq/amqp091-go ) // 配置集群地址用逗号分隔多个节点 // 格式amqp://用户名:密码节点1:端口节点2:端口节点3:端口/虚拟主机 var clusterUrls []string{ amqp://admin:admin127.0.0.1:5673/, amqp://admin:admin127.0.0.1:5674/, amqp://admin:admin127.0.0.1:5675/, } func main() { // 启动重连循环 reconnectLoop() } func connectToCluster(nodes []string) (*amqp.Connection, error) { var lastErr error // 遍历所有节点尝试连接 for _, nodeUrl : range nodes { fmt.Printf(正在尝试连接节点: %s ...\n, nodeUrl) conn, err : amqp.Dial(nodeUrl) if err nil { // 连接成功 fmt.Printf(✅ 成功连接到节点: %s\n, nodeUrl) return conn, nil } lastErr err log.Printf(❌ 连接节点 %s 失败: %v, nodeUrl, err) // 可选可以在这里加一个极短的延时避免瞬间风暴 time.Sleep(100 * time.Millisecond) } // 如果所有节点都试过了还是失败返回最后一个错误 return nil, fmt.Errorf(无法连接到集群中的所有节点最后错误: %w, lastErr) } func reconnectLoop() { var conn *amqp.Connection var ch *amqp.Channel var notifyClose chan *amqp.Error // 重连间隔 reconnectInterval : 5 * time.Second for { // 1. 尝试连接集群 // Dial 会依次尝试 urls 中的地址直到成功或全部失败 // 注意amqp091-go 的 Dial 通常只接受一个 URL 字符串 fmt.Printf(正在尝试连接 RabbitMQ 集群: %s\n, clusterUrls) c, err : connectToCluster(clusterUrls) if err ! nil { log.Printf(连接集群失败: %v, %s 后重试..., err, reconnectInterval) time.Sleep(reconnectInterval) continue } conn c notifyClose make(chan *amqp.Error) conn.NotifyClose(notifyClose) // 2. 创建 Channel channel, err : conn.Channel() if err ! nil { log.Printf(创建 Channel 失败: %v, err) conn.Close() time.Sleep(reconnectInterval) continue } ch channel fmt.Println(✅ 成功连接到 RabbitMQ 集群并创建 Channel) // 3. 在这里执行业务逻辑 (发布/消费) // 建议将 ch 传递给具体的业务协程 doWork(ch) // 4. 等待连接关闭通知 (阻塞) err -notifyClose log.Printf(连接断开: %v, err) // 清理资源 if ch ! nil { ch.Close() } if conn ! nil { conn.Close() } // 5. 等待一段时间后重试 time.Sleep(reconnectInterval) } } func doWork(ch *amqp.Channel) { // 模拟业务运行 // 在实际应用中这里会启动消费者协程或生产者循环 // 它们会使用传入的 ch // 创建交换机 err : ch.ExchangeDeclare( direct.test.cluster, // 交换机名称 direct, // 交换机类型 true, // 是否持久化 false, // 是否自动删除 false, // 是否内部交换机 false, // 是否等待 nil, // 其他参数 ) if err ! nil { fmt.Println(err) } // 创建队列 q, err : ch.QueueDeclare( queue.direct.test.cluster, // 队列名称 true, // 是否持久化 false, // 是否自动删除 false, // 是否排他 false, // 是否等待 amqp.Table{ amqp.QueueTypeArg: amqp.QueueTypeQuorum, // 重点集群使用仲裁队列 }, ) if err ! nil { panic(err) } // 绑定交换机与队列 ch.QueueBind( q.Name, // 队列名称 test, // 路由键 direct.test.cluster, // 交换机名称 false, // 是否等待 nil, // 其他参数 ) // 创建超时context ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second) defer cancel() // 推送消息 for i : range 30 { body : fmt.Sprintf(Hello World! rabbitmq cluster %d, i) ch.PublishWithContext(ctx, direct.test.cluster, // 交换机名称 test, // 路由键 false, // 是否等待 false, // 是否立即 amqp.Publishing{ ContentType: text/plain, Body: []byte(body), }, ) fmt.Printf(推送消息%s\n, body) time.Sleep(1 * time.Second) } }参考视频消息中间件夺命连环18问一口气刷完面试必问的消息中间件面试内容让你面试少走99%的弯路_哔哩哔哩_bilibili
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2410294.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!