RocketMQ消息发送基本示例(推送消费者)-CSDN博客
RocketMQ消费者主动拉取消息示例-CSDN博客
RocketMQ顺序消息-CSDN博客
RocketMQ广播消息-CSDN博客
RocketMQ延时消息-CSDN博客
批量消息
批量消息是指将多条消息合并成一个批量消息,一次发送出去,原先的都是一次发一条.批量消息的好处是减少网络IO,提高吞吐量.
批量消息的使用限制:
消息大小不能超过4M,虽然源码注释不能超过1M,实际使用不超过4M即可.平衡整体性能,建议保持1M
相同的Topic
相同的waitStoreMsgOK
不能是延迟,不能是事务消息
注意:::::::批量发送主要是为了优化生产者发送消息的效率,但在消费者端,消息仍然是逐条处理的。
package com.example.rocketmqdemo.batch;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
 * @author hrui
 * @date 2024/8/1 12:16
 */
public class BatchProducer {
    public static void main(String[] args) {
        //创建一个DefaultMQProducer实例,指定生产者组名为"group1"
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");//生产者组和消费者组是不同概念  不需要相同
        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        producer.setNamesrvAddr("xxx.xxx.xxx:9876");
        try {
            //启动生产者实例
            producer.start();
            //批量发送用一个Message数组或集合
            List<Message> messages = new ArrayList<>();
            //发送10条消息
            for (int i = 0; i < 2; i++) {
                //创建消息实例,指定主题为"Topic1",标签为"Tag1",消息内容为"Hello World"加上编号
                Message message = new Message("Batch", "Tag1", ("Hello World" + i).getBytes(StandardCharsets.UTF_8));
                messages.add(message);
            }
            //批量发送消息
            SendResult sendResult = producer.send(messages);
            //打印消息发送结果
            System.out.println("批量消息发送成功:返回---->" + sendResult);
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        } finally {
            //关闭生产者实例,释放资源
            producer.shutdown();
        }
    }
}
package com.example.rocketmqdemo.batch;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
 * @author hrui
 * @date 2024/8/1 12:20
 */
public class BathConsumer {
    public static void main(String[] args) {
        //创建一个DefaultMQPushConsumer实例,指定消费者组名为"group1"
        //采用长轮询机制,模拟推送效果,但本质上是主动拉取。适合低延迟、高实时性的场景。
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
        //设置NameServer地址,RocketMQ客户端通过NameServer获取Broker的路由信息
        consumer.setNamesrvAddr("xxx.xxx.xxx:9876");
        try {
            //订阅主题"Topic1",过滤标签为"*",表示接收所有消息
            consumer.subscribe("Batch", "*");
            //设置消息监听器,处理接收到的消息
            //可以传入两种类型的监听器:
            //1. MessageListenerOrderly(顺序消费):保证消息按顺序处理
            //2. MessageListenerConcurrently(并发消费):消息并发处理,不保证顺序
            consumer.setMessageListener(new MessageListenerConcurrently() {
                //consumeMessage方法用于处理接收到的消息列表
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println(Thread.currentThread().getName());
                    for (int i=0;i<list.size();i++){
                        System.out.println(i+"_消息消费成功_"+new String(list.get(i).getBody()));broker是将两条消息分别发送的
                    }
                    //返回消费状态,CONSUME_SUCCESS表示消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //启动消费者实例,开始接收消息
            consumer.start();
        } catch (Exception e) {
            //捕获并打印异常信息
            e.printStackTrace();
        }
    }
}
如果List<Message> 超过4M

如果超过4M 可以用分批次处理 可以自定义实现迭代器去实现





![[Docker][Docker Volume]详细讲解](https://i-blog.csdnimg.cn/direct/8a1a340f6ef446b6bfbca9f170111789.png)













