函数定义(来自 github.com/streadway/amqp)
func (ch *Channel) Publish(
exchange string,
key string,
mandatory bool,
immediate bool,
msg Publishing,
) error
这个方法的作用是:向指定的交换机 exchange 发送一条消息 msg,带上路由键 key。
参数名 | 类型 | 含义 |
---|---|---|
exchange | string | 指定要将消息发布到哪个 交换机(exchange)。可以是 “” 表示默认交换机。 |
key | string | 路由键(routing key),根据交换机类型决定消息怎么路由。 |
mandatory | bool | 是否强制投递。若为 true 且无法路由到队列,则会触发 Basic.Return (需要监听返回)。 |
immediate | bool | 是否立即投递。很少使用,RabbitMQ 通常不支持,建议设为 false 。 |
msg | Publishing | 消息体及其元数据(headers、content-type、body等) |
📦 msg(Publishing)结构
type Publishing struct {
ContentType string
ContentEncoding string
DeliveryMode uint8 // 1=非持久化 2=持久化
Priority uint8
CorrelationId string
ReplyTo string
Expiration string
MessageId string
Timestamp time.Time
Type string
UserId string
AppId string
Body []byte
Headers Table
}
常用字段:
Body: 消息内容([]byte)
ContentType: 比如 “application/json”,“text/plain”
DeliveryMode: 2 表示持久化消息,1 表示不持久化(内存中)
Headers: 自定义属性(可以设置 key-value)
✅ 使用示例:基本用法
body := "Hello RabbitMQ!"
err := channel.Publish(
"my-exchange", // exchange 这表示你要将消息发布到一个叫 "my-exchange" 的交换机。
"my-key", // routing key 会被用于匹配绑定在交换机上的队列。
false, // mandatory 如果消息无法路由到队列,不返回任何信息。
false, // immediate 不要求立即投递(几乎总是 false)。
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body), //消息正文,以字节形式传递。
DeliveryMode: amqp.Persistent, // 2 = 持久化
},
)
if err != nil {
log.Fatalf("Publish failed: %s", err)
}
通俗的讲一下mandatory和immediate两个参数及其应用场景
参数 | 通俗解释 |
---|---|
mandatory | “找不到接收方要告诉我”(确保消息不被悄悄丢掉) |
immediate | “没有消费者就别投了”(对方不在线就别发) |
err := ch.Publish(
"logs", "debug.key",
true, // mandatory
false,
msg,
)
结果:
如果没有任何队列绑定了 “logs” 交换机并匹配 “debug.key”,这条消息就会被退回来,你可以通过监听 channel.NotifyReturn() 获取退回消息。
⚠️ 注意:RabbitMQ 早就不支持 immediate = true了!这个参数基本是“历史遗留”。几乎都设置为false
RabbitMQ 默认就不支持 immediate。 设置 immediate = true 会直接报错:“immediate=true” not supported
带 mandatory 回退处理机制的 RabbitMQ 生产者完整示例代码
✅ 功能概览
启动 RabbitMQ 连接与通道
使用 mandatory = true 发布消息
使用 NotifyReturn() 接收“退回的消息”
输出退回原因和消息内容
package main
import (
"log"
"time"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 连接 RabbitMQ
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "连接失败")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "打开通道失败")
defer ch.Close()
// 声明交换机(topic 类型)
err = ch.ExchangeDeclare(
"my-exchange", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "声明交换机失败")
// 设置 return 回退监听(必须在 Publish 之前设置)
returns := ch.NotifyReturn(make(chan amqp.Return))
// 模拟发送消息,但没有任何队列绑定这个 key → 消息将被退回
err = ch.Publish(
"my-exchange", // exchange
"unmatched.key", // routing key
true, // mandatory:要求通知投递失败
false, // immediate:RabbitMQ 不支持
amqp.Publishing{
ContentType: "text/plain",
Body: []byte("This message will be returned"),
DeliveryMode: amqp.Persistent,
},
)
if err != nil {
log.Printf("Publish error: %s", err)
}
// 检查是否被回退(注意:这是异步的)
select {
case ret := <-returns:
log.Println("❗ 消息被退回!")
log.Printf("原因:%s", ret.ReplyText)
log.Printf("交换机:%s", ret.Exchange)
log.Printf("路由键:%s", ret.RoutingKey)
log.Printf("内容:%s", string(ret.Body))
case <-time.After(2 * time.Second):
log.Println("✅ 消息已成功路由(没有被退回)")
}
}
🧪 测试说明
你不绑定任何队列到 my-exchange + unmatched.key,运行这段代码会看到:
❗ 消息被退回!
原因:NO_ROUTE
交换机:my-exchange
路由键:unmatched.key
内容:This message will be returned