RabbitMQ—消息元数据解析指南
本文介绍了RabbitMQ的Java客户端实现包含生产者和消费者代码示例。生产者通过建立连接、创建信道、声明队列循环发送10条消息到hello队列消费者同样建立连接后订阅该队列通过DefaultConsumer接收并打印消息。文章重点解析了RabbitMQ的两个核心元数据Envelope包含投递标签、重投标志、交换机和路由键信息和AMQP.BasicProperties消息属性头如内容类型、优先级等详细说明了各参数含义及实际应用场景。这些元数据对消息路由、业务追踪和RPC通信等高级功能至关重要。⽣产者代码package rabbitmq.simple; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ProducerDemo { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(自己的服务器地址); connectionFactory.setPort(5672); //需要提前开放端口号 connectionFactory.setUsername(账号名字);//账号 connectionFactory.setPassword(密码); //密码 connectionFactory.setVirtualHost(主机名); //虚拟主机 Connection connection connectionFactory.newConnection(); //2. 开启信道 Channel channel connection.createChannel(); //3. 声明交换机 使用内置的交换机 //4. 声明队列 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, * MapString, Object arguments) * 参数说明: * queue: 队列名称 * durable: 可持久化 * exclusive: 是否独占 * autoDelete: 是否自动删除 * arguments: 参数 */ channel.queueDeclare(hello, true, false, false, null); //5. 发送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 参数说明: * exchange: 交换机名称 * routingKey: 内置交换机, routingkey和队列名称保持一致 * props: 属性配置 * body: 消息 */ for (int i 0; i 10; i) { String msg hello rabbitmq~i; channel.basicPublish(,hello, null, msg.getBytes()); } System.out.println(消息发送成功~); //6. 资源释放 channel.close(); connection.close(); } }消费者代码package rabbitmq.simple; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.TimeoutException; public class ConsumerDemo { public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { //1. 创建连接 ConnectionFactory connectionFactory new ConnectionFactory(); connectionFactory.setHost(自己的服务器地址); connectionFactory.setPort(5672); //需要提前开放端口号 connectionFactory.setUsername(账号名字);//账号 connectionFactory.setPassword(密码); //密码 connectionFactory.setVirtualHost(主机名); //虚拟主机 Connection connection connectionFactory.newConnection(); //2. 创建Channel Channel channel connection.createChannel(); //3. 声明队列(可以省略) channel.queueDeclare(hello,true, false, false, null); //4. 消费消息 /** * basicConsume(String queue, boolean autoAck, Consumer callback) * 参数说明: * queue: 队列名称 * autoAck: 是否自动确认 * callback: 接收到消息后, 执行的逻辑 */ DefaultConsumer consumer new DefaultConsumer(channel){ //从队列中收到消息, 就会执行的方法 Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(接收到消息: new String(body)); // System.out.println(接收到消息: consumerTag); // System.out.println(接收到消息: (String.valueOf(envelope))); // System.out.println(接收到消息: properties ); } }; channel.basicConsume(hello, true, consumer); //等待程序执行完成 Thread.sleep(2000); //5. 释放资源 channel.close(); connection.close(); } }接收到消息:hello rabbitmq~9接收到消息:amq.ctag-eK9h2CCuGOsETN7QBqO2xw接收到消息:Envelope(deliveryTag10, redeliverfalse, exchange, routingKeyhello)接收到消息:#contentHeaderbasic(content-typenull, content-encodingnull, headersnull, delivery-modenull, prioritynull, correlation-idnull, reply-tonull, expirationnull, message-idnull, timestampnull, typenull, user-idnull, app-idnull, cluster-idnull)RabbitMQ 消息元数据详解本文档详细介绍了 RabbitMQ 中两个核心的元数据对象Envelope物流信息和AMQP.BasicProperties消息属性头。一、Envelope信封对象Envelope包含了 RabbitMQ 消息的“物流信息”用于追踪消息的投递状态和路径。参数值含义解释deliveryTag10消息编号这是第 10 条投递给你的消息。就像快递单号用于确认收货ACK。redeliverfalse是否重投false 这是第一次投递给你true 这条消息之前给过别的消费者或之前给你但你没确认现在重新投递exchange空交换机名称空字符串表示使用的是内置默认交换机AMQP default。生产者代码里basicPublish(, hello, ...)第一个参数就是空字符串。routingKeyhello路由键决定消息要送到哪个队列。在简单模式下routingKey通常等于队列名hello。就像写地址“送到 hello 队列”。 详细解读1. deliveryTag 10作用RabbitMQ 给这条消息打的序号从 1 开始累加。应用场景当你使用手动确认autoAckfalse时需要用这个编号告诉服务器“第 10 条消息我处理完了可以删了”。// 手动确认示例 channel.basicAck(envelope.getDeliveryTag(), false);2. redeliver false意味着什么这条消息是“新鲜”的第一次发给你。如果为 true说明这条消息被“退件重发”了。可能原因之前的消费者处理超时/崩溃没有发送确认ACK。RabbitMQ 把消息重新发给其他消费者或再次发给你。3. exchange 空字符串表示默认交换机Default Exchange。这是 RabbitMQ 内置的直连交换机direct exchange。规则routingKey必须完全匹配队列名才能路由。4. routingKey hello路由键决定消息走哪条路。在默认交换机下routingKey 队列名。生产者代码里basicPublish(, hello, ...)的第二个参数hello就是routingKey。 一句话总结Envelope就是消息的“快递面单”告诉你这是第 10 个包裹deliveryTag10是第一次派送redeliverfalse从默认站点发出exchange目的地是 hello 队列routingKeyhello。而真正的“包裹内容”在body字节数组里二、AMQP.BasicProperties消息属性头AMQP.BasicProperties包含了消息的元数据信息关于消息的数据而非消息本身内容。如果在发送消息时第 3 个参数传了nullchannel.basicPublish(, hello, null, msg.getBytes());那么这些属性都会是空的null。 每个参数详解参数名当前值含义实际应用场景content-typenull内容类型MIME类型如application/json、text/plain。消费者收到后知道用什么格式解析content-encodingnull内容编码如gzip压缩、UTF-8。告诉消费者如何解码headersnull自定义头信息Map类型存自定义键值对用于业务路由或过滤delivery-modenull投递模式1非持久化重启丢失2持久化存磁盘重启还在prioritynull优先级数字越大越优先0-9。VIP客户的消息插队处理correlation-idnull关联ID用于 RPC 模式。客户端发请求时带个唯一ID服务端回复时带同样IDreply-tonull回复队列名RPC场景中告诉服务端“处理完后把结果发到这个队列”expirationnull过期时间毫秒如60000表示 1 分钟后若还没被消费自动删除message-idnull消息唯一ID业务层面的消息编号用于幂等性防止重复消费timestampnull时间戳消息发送时间用于日志追踪或延时计算typenull消息类型业务类型标识如order.created、user.registereduser-idnull发送者ID验证消息发送者的身份app-idnull应用ID标识哪个应用发送的如order-servicecluster-idnull集群ID已废弃基本不用 实际使用示例场景 1发送 JSON 数据 自定义业务头// 生产者 AMQP.BasicProperties props new AMQP.BasicProperties.Builder() .contentType(application/json) // 内容是 JSON .deliveryMode(2) // 持久化消息重启不丢 .priority(5) // 优先级 5较高 .messageId(ORDER-20240315-001) // 业务消息ID .expiration(300000) // 5 分钟后过期 .headers(Map.of(userId, 1001, // 自定义头 orderType, vip)) .build(); channel.basicPublish(, hello, props, jsonBytes);场景 2消费者读取这些属性Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { // 1. 判断内容类型 if (application/json.equals(properties.getContentType())) { // 按 JSON 解析 } // 2. 获取业务 ID幂等性校验 String msgId properties.getMessageId(); // 3. 获取自定义头 MapString, Object headers properties.getHeaders(); if (headers ! null) { String orderType (String) headers.get(orderType); } } 总结这些属性都是消息的“标签”主要用于路由决策headers、type消息特性priority、expiration、delivery-mode业务追踪message-id、correlation-id、timestampRPC 通信reply-to、correlation-id入门示例传null是因为简单场景不需要高级特性但在分布式微服务系统中这些属性至关重要
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2416595.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!