消息发送
发送同步消息
public class SyncProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer=new DefaultMQProducer(/*please_rename_unique_group_name*/"group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i=0;i<100;i++){
Message msg=new Message("base"
/*Topic*/
,"Tag1"
/*TagA*/
,("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/*Message*/
SendResult sendResult=producer.send(msg);
System.out.println("消息ID"+sendResult.getMessageQueue().getQueueId());
System.out.printf("%s%n",sendResult);
// TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
}
}
发送异步消息
DefaultMQProducer producer=new DefaultMQProducer("group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
for (int i=0;i<100;i++){
Message msg=new Message("base"
/*Topic*/
,"Tag2"
/*TagA*/
,("Hello RocketMQ"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/*Message*/
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送结果"+sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送异常:"+throwable.getMessage());
}
});
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
单向发送消息
DefaultMQProducer producer=new DefaultMQProducer(/*please_rename_unique_group_name*/"group1");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i=0;i<5;i++){
Message msg=new Message("base"
/*Topic*/
,"Tag3"
/*TagA*/
,("Hello RocketMQ单向消息"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
/*Message*/
// SendResult sendResult=producer.send(msg);
producer.sendOneway(msg);
// System.out.println("消息ID"+sendResult.getMessageQueue().getQueueId());
// System.out.printf("%s%n",sendResult);
TimeUnit.SECONDS.sleep(1);
}
producer.shutdown();
消费信息
负载均衡模式(默认模式)
consumer.setMessageModel(MessageModel.CLUSTERING);
广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
顺序消息
消息顺序
全局消息顺序
局部消息顺序
例如:订单的顺序流程:创建、付款、推送、完成
延时消息
批量消息
过滤消息
事务消息
事务流程

事务消息发送及提交
-
发送消息(half消息)
-
服务端响应消息写入结果
-
根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)
-
根据本地事务状态执行Commit或Rollback(Commit操作生成消息索引,消费对消费者可见)
事务补偿
-
对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次回查
-
Producer收到回查消息,检查回查消息对应的本地事务状态
-
根据本地事务状态,重新提交Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况
事务消息状态
三个状态:提交状态、回滚状态、中间状态



















