RocketMQ 提供了多种消息类型,以满足不同业务场景对 顺序性、事务性、时效性 的要求。其核心设计思想是通过解耦 “消息传递模式” 与 “业务逻辑”,实现高性能、高可靠的分布式通信。
一、主要类型包括
- 普通消息(基础类型)
- 顺序消息(保证消费顺序)
- 定时 / 延迟消息(控制投递时间)
- 事务消息(分布式事务最终一致性)
- 批量消息(提升吞吐量)
二、消息类型及代码示例
1. 普通消息(Normal Message)
描述:最基础的消息类型,支持异步发送、批量发送等模式,适用于无需严格顺序和事务保证的场景(如日志收集、通知推送)。
核心优势:高吞吐量、低延迟,生产端通过负载均衡自动选择 Broker。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
public class NormalMessageExample {
public static void main(String[] args) throws ClientException, InterruptedException {
// 1. 加载服务提供者(支持SPI扩展)
ClientServiceProvider provider = ClientServiceProvider.loadService();
// 2. 配置认证信息(密钥管理)
String accessKey = "yourAccessKey";
String secretKey = "yourSecretKey";
StaticSessionCredentialsProvider credentialsProvider =
new StaticSessionCredentialsProvider(accessKey, secretKey);
// 3. 构建客户端配置(支持多协议、TLS加密)
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081") // 支持域名或IP:端口列表
.setCredentialProvider(credentialsProvider)
.setRequestTimeout(Duration.ofSeconds(3)) // 请求超时时间
.build();
// 4. 创建生产者(支持自动重试、批量发送)
String topic = "normal-message-topic";
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(clientConfiguration)
.setTopics(topic)
.setMaxAttempts(3) // 发送失败最大重试次数
.build();
// 5. 构建并发送消息
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setBody("Hello RocketMQ 5.0!".getBytes())
.setTag("order") // 可选标签,用于消息过滤
.setKeys("key123") // 消息业务键,用于查询
.build();
// 同步发送(阻塞当前线程直到返回结果)
SendReceipt receipt = producer.send(message);
System.out.println("消息发送成功: " + receipt.getMessageId());
// 异步发送示例
/*
producer.sendAsync(message).thenAccept(sendReceipt -> {
System.out.println("异步发送成功: " + sendReceipt.getMessageId());
}).exceptionally(throwable -> {
System.out.println("异步发送失败: " + throwable.getMessage());
return null;
});
*/
// 6. 关闭资源(重要!避免内存泄漏)
producer.close();
}
}
2. 顺序消息(Ordered Message)
描述:通过将同一业务主键的消息路由到相同队列,保证消息消费顺序与发送顺序一致。
应用场景:金融交易流水、订单状态变更、时序数据处理。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.util.List;
public class OrderedMessageExample {
public static void main(String[] args) throws ClientException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
String topic = "ordered-message-topic";
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics(topic)
.build();
// 模拟订单状态变更(同一订单ID的消息必须顺序处理)
String[] orderIds = {"order1001", "order1002", "order1001"};
String[] orderStatus = {"CREATED", "PAYED", "SHIPPED"};
for (int i = 0; i < orderIds.length; i++) {
String orderId = orderIds[i];
String status = orderStatus[i % orderStatus.length];
// 关键:通过MessageGroup确保相同订单的消息发送到同一队列
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setBody(("订单[" + orderId + "]状态变更为: " + status).getBytes())
.setMessageGroup(orderId) // 消息组决定消息路由的队列
.setKeys(orderId) // 设置业务键便于查询
.build();
SendReceipt receipt = producer.send(message);
System.out.println("发送顺序消息: " + receipt.getMessageId()
+ ", 订单ID: " + orderId + ", 状态: " + status);
}
producer.close();
}
}
3. 定时 / 延迟消息(Scheduled/Delay Message)
描述:消息发送后,需等待指定时间(或到达指定时间点)才会被消费者可见。
应用场景:订单超时自动关闭、任务调度、延迟重试。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.time.Duration;
import java.time.Instant;
public class DelayMessageExample {
public static void main(String[] args) throws ClientException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
String topic = "delay-message-topic";
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics(topic)
.build();
// 方式一:使用绝对时间戳(精确到毫秒)
long timestamp = Instant.now().plus(Duration.ofMinutes(5)).toEpochMilli();
Message messageByTimestamp = provider.newMessageBuilder()
.setTopic(topic)
.setBody("5分钟后执行的定时消息".getBytes())
.setDeliveryTimestamp(timestamp) // 设置投递时间戳
.build();
// 方式二:使用预定义延迟级别(需Broker配置支持)
Message messageByLevel = provider.newMessageBuilder()
.setTopic(topic)
.setBody("延迟30秒的消息".getBytes())
.addProperty("DELAY", "3") // 假设3对应30秒(需Broker配置)
.build();
// 发送延迟消息
SendReceipt receipt = producer.send(messageByTimestamp);
System.out.println("延迟消息发送成功: " + receipt.getMessageId()
+ ", 将于 " + Instant.ofEpochMilli(timestamp) + " 可见");
producer.close();
}
}
4. 事务消息(Transactional Message)
描述:
通过两阶段提交机制,保证本地事务与消息发送的最终一致性。
核心流程:
发送半消息(对消费者不可见)
- 执行本地事务
- 根据事务结果提交或回滚半消息
- 支持事务状态回查(处理超时情况)
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.TransactionResolution;
import org.apache.rocketmq.client.apis.producer.TransactionalProducer;
public class TransactionalMessageExample {
public static void main(String[] args) throws ClientException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
String topic = "transactional-message-topic";
TransactionalProducer producer = provider.newTransactionalProducerBuilder()
.setClientConfiguration(config)
.setTopics(topic)
// 关键:设置事务状态回查处理器(当Broker长时间未收到事务状态时触发)
.setTransactionChecker(messageView -> {
System.out.println("回查事务状态: " + messageView.getBodyAsString());
// 根据业务ID查询本地事务状态
String bizId = messageView.getKeys().iterator().next();
boolean transactionStatus = checkLocalTransactionStatus(bizId);
return transactionStatus ?
TransactionResolution.COMMIT : TransactionResolution.ROLLBACK;
})
.build();
try {
// 1. 开启事务上下文
producer.beginTransaction();
// 2. 发送半消息(未提交状态)
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setBody("用户账户扣款成功,通知库存系统扣减".getBytes())
.setKeys("order_12345") // 设置业务键,用于回查
.build();
producer.send(message);
// 3. 执行本地事务(如数据库操作)
boolean localTransactionResult = executeLocalTransaction();
// 4. 根据本地事务结果提交或回滚
if (localTransactionResult) {
producer.commit(); // 提交事务,消息对消费者可见
System.out.println("本地事务执行成功,消息提交");
} else {
producer.rollback(); // 回滚事务,消息被丢弃
System.out.println("本地事务执行失败,消息回滚");
}
} catch (Exception e) {
producer.rollback(); // 异常时回滚
e.printStackTrace();
} finally {
producer.close();
}
}
private static boolean executeLocalTransaction() {
// 模拟本地事务:如用户账户扣款
System.out.println("执行本地事务...");
return true; // 返回事务执行结果
}
private static boolean checkLocalTransactionStatus(String bizId) {
// 模拟查询本地事务状态(如查询数据库订单状态)
System.out.println("查询本地事务状态: " + bizId);
return true; // 实际应根据业务ID查询真实状态
}
}
5. 批量消息(Batch Message)
描述:将多条消息打包为一个批次发送,减少网络开销,提升吞吐量。
注意事项:
- 所有消息必须属于同一 Topic
- 总大小不能超过 4MB(默认限制,可配置)
- 不支持事务和延迟属性
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.util.ArrayList;
import java.util.List;
public class BatchMessageExample {
public static void main(String[] args) throws ClientException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
String topic = "batch-message-topic";
Producer producer = provider.newProducerBuilder()
.setClientConfiguration(config)
.setTopics(topic)
.build();
// 创建批量消息集合
List<Message> messages = new ArrayList<>();
for (int i = 0; i < 100; i++) { // 示例:批量发送100条消息
Message message = provider.newMessageBuilder()
.setTopic(topic)
.setBody(("批量消息-" + i).getBytes())
.setKeys("key-" + i)
.build();
messages.add(message);
}
// 智能拆分大批次(避免超过4MB限制)
List<List<Message>> batches = splitMessages(messages);
// 发送所有批次
for (List<Message> batch : batches) {
List<SendReceipt> receipts = producer.send(batch);
System.out.println("批量发送成功,共" + receipts.size() + "条消息");
}
producer.close();
}
// 智能拆分大批次消息(实际生产中建议实现)
private static List<List<Message>> splitMessages(List<Message> messages) {
// 简单实现:实际应根据消息大小动态拆分
List<List<Message>> result = new ArrayList<>();
result.add(messages);
return result;
}
}
6. 消费者示例(通用)
描述:RocketMQ 5.0 支持 Push 和 Pull 两种消费模式,以下是基于长轮询的 PushConsumer 示例。
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import java.time.Duration;
public class ConsumerExample {
public static void main(String[] args) throws ClientException, InterruptedException {
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfiguration config = ClientConfiguration.newBuilder()
.setEndpoints("localhost:8081")
.build();
// 订阅主题和过滤表达式(支持SQL92语法)
String topic = "normal-message-topic";
FilterExpression filterExpression = new FilterExpression(
"TAG = 'order' AND age > 18", // 示例SQL过滤条件
FilterExpressionType.SQL92
);
// 创建PushConsumer(基于长轮询的"伪推"模式)
PushConsumer consumer = provider.newPushConsumerBuilder()
.setClientConfiguration(config)
.setConsumerGroup("my-consumer-group") // 消费组决定消息负载方式
.setSubscriptionExpressions(Map.of(topic, filterExpression))
.setMaxPollInterval(Duration.ofSeconds(30)) // 长轮询超时时间
.setConsumptionThreadCount(10) // 消费线程数
.setMessageListener(messageView -> {
try {
// 处理消息逻辑(业务代码)
System.out.println("接收到消息: " + messageView.getBodyAsString());
System.out.println("消息属性: " + messageView.getProperties());
// 模拟业务处理耗时
Thread.sleep(100);
// 返回消费结果(成功/失败)
return ConsumeResult.SUCCESS;
} catch (Exception e) {
// 消费失败时返回RETRY,消息将重试消费
System.out.println("消息消费失败: " + e.getMessage());
return ConsumeResult.FAILURE;
}
})
.build();
// 保持主线程运行,避免消费者立即关闭
System.out.println("消费者已启动,按Ctrl+C退出...");
Thread.sleep(Long.MAX_VALUE);
}
}
关键依赖配置(Maven)
<dependencies>
<!-- RocketMQ 5.0 Java客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client-java</artifactId>
<version>5.1.0</version>
</dependency>
<!-- gRPC依赖 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.54.0</version>
</dependency>
<!-- 序列化依赖 -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.21.9</version>
</dependency>
</dependencies>
三、最佳实践建议
-
连接配置:
- 生产环境建议使用域名而非 IP,支持动态扩容
- 开启 TLS 加密(通过
ClientConfiguration.setSslTrustStorePath
)
-
消息大小:
- 单条消息建议不超过 1MB
- 批量消息总大小不超过 4MB(可通过
producer.setMaxMessageSize
调整)
-
异常处理:
- 生产者需捕获
ClientException
并实现重试逻辑 - 消费者应避免长时间阻塞,建议使用异步处理
- 生产者需捕获
-
性能调优:
- 生产者:调整
sendMsgTimeout
和maxAttempts
参数 - 消费者:根据业务吞吐量调整
consumptionThreadCount
- 生产者:调整
-
监控告警:
- 监控 Topic 的 TPS、RT、堆积量等指标
- 配置告警阈值(如单队列堆积超过 10 万条)
四、总结
RocketMQ支持多种消息类型以满足不同业务需求:普通消息适用于高吞吐场景;顺序消息保证消费顺序;定时/延迟消息控制投递时间;事务消息确保分布式事务一致性;批量消息提升吞吐量。每种类型都提供了对应的Java代码示例,包括生产者配置、消息构建和发送逻辑。最佳实践建议包括合理配置连接、控制消息大小、完善异常处理、性能调优和监控告警。通过解耦消息传递与业务逻辑,RocketMQ实现了高性能、高可靠的分布式通信能力。