消息中间件以前常用RabbitMQ和ActiveMQ,由于业务需要,后期业务偏向大数据,现着重学习一下RocketMQ(RocketqMQ原理同ctg-mq),后续更新Kafka
一、RocketMQ特性

-  Kafka特性 (高性能分布式) 
 吞吐量大,支持消息大量推挤,支持topic离线,支持分布式,使用ZooKeeper实现负载均衡,支持Hadoop数据并行加载
-  RocketMQ特性 
 (1)Broker 服务器:Broker 服务器是RocketMQ的核心,主要功能:消息的处理(接收生产者Producer发送过来的消息(持久化),推送消息给消费者Consumer),消息的存储。NameServer 服务器:记录Producert信息、Broker信息、Consumer信息、Topic主题信息,NameServer服务器在这里作为控制中心、注册中心、路由,
 服务启动顺序,先启动NameServer再启动Broker,将Broker服务器的ip注册到NameServer服务中
 业务流程:如果生产者Producer需要推送消息至Broker服务器中,需要先去NameServer服务器中查找到对应的Broker服务器,然后生产者端Producer与Broker服务器建立连接
 (2)能够保证严格的消息顺序(顺序消费、顺序拉取)
 丰富的消息拉取模式:push模式(等待Broker推送消息,推荐使用,),pull模式(主动向Broker主动拉取消息),pull与push模式同时可以满足使用需求的情况下,建议优先使用push模式
 (3)可以多节点生产和多节点消费
 (4)消息事务机制,目前只有RocketMQ支持,Kafka和RabbitMQ不支持
 (5)亿级消息堆积
 (6)吞吐量高,但比Kafka低
 (7)消息重推、死信队列
-  RabbitMQ特性 
 吞吐量比Kafka、RocketMQ低…
  
二、RocketMQ消费模式
1.Push推模式-DefaultMQPushConsumer原理
Consumer消费者向Broker服务器发送请求,Consumer通过请求与Broker服务器保持一种长连接的形式,Broker服务器每5s检查一次是否存在消息,如果有就推送给Consumer消费者
2.Pull拉模式-DefaultMQPullConsumer原理
Consumer消费者主动去Broker服务器拉取数据,一般使用本地定时任务去拉取,由于需要保证消息的及时性,一般推荐使用Push推模式订阅消息
3. 轮询监控机制

 RocketMQ默认将Producer生产者消息发送至4(不一定4个)个队列中进行存储,Consumer消费方通过轮询的方式去监控这个4个队列(轮询监控机制)
4.ack机制
LocalTransactionState标识消息的状态,通过判断返回的枚举值enum做出相应处理
- COMMIT_MESSAGE
 消息可见,目前事务消息分为提交不可见消息和可见消息
- ROLLBACK_MESSAGE
 消息需要回滚
- UNKNOW
 消息异常或超时时返回该枚举值,重复回查信息
  
三、RocketMQ实战
(1)消息发送实现流程
- 引入pom依赖,目前最新版本为5.3.0,推荐使用4.4.0
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>版本</version>
</dependency>
或
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
- 创建DefualtMQProducer实例对象
- 设置NaemServer地址
- 开启DefaultMQProducer
- 创建消息Message
- 发送消息
- 关闭DefaultMQProducer
(2)消息消费流程实现
目前业务需要实现消费业务,着重学习消费端逻辑
- 创建DefaultMQPushConsumer
- 设置NameServer地址
- 设置subscribe,这里是要读取的主题信息
- 创建监听器MessageListener
- 获取消息信息
- 返回消息读取状态
消费者流程代码,这里使用Push推模式实现,并设置了消息拉取最大上限setConsumeMessageBatchMaxSize(2)为2条消息
 监听器这么选择普通监听器MessageListenerConcurrently,如果需要实现顺序消费可以选用MessageListenerOrderly
 消息消费时如果有异常出现,注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发RocketMQ消息重推机制
 如果消息消费成功,只需返回枚举类enum ConsumeConcurrentlyStatus.CONSUME_SUCCESS即可表示消息推送成功
public class Consumer {
    public static void main(String[] args) throws MQClientException {
        //1. 创建DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
        //2. 设置NameServer地址
        consumer.setNamesrvAddr("192.168.211.141:9876");
        //3. 设置subscribe,这里是要读取的主题信息
        consumer.subscribe("Topic_Name",//执行要消费的主题
                "Tags || TagsA || TagsB");//过滤规则  "*"则表示全部订阅
        //4. 创建监听器MessageListener
        //4.1 设置消息拉取最大数(上限)-最大拉取两条
        consumer.setConsumeMessageBatchMaxSize(2);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /*
                MessageListenerConcurrently 普通消息的接收
                MessageListenerOrderly 顺序消息的接收
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                /**
                 * List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限
                 */
                //5. 获取消息信息
                //迭代消息信息
                for (MessageExt msg : msgs) {
                    //获取主题
                    String topic = msg.getTopic();
                    //获取标签
                    String tags = msg.getTags();
                    //获取信息
                    byte[] body = msg.getBody();
                    try {
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);
                        //todo 实现业务...
                        System.out.println("Consumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);
                    } catch (UnsupportedEncodingException e) {
                        //throw new RuntimeException(e);
                        //注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制
                        e.printStackTrace();
                        //消息消费失败,重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //6. 返回消息读取状态
                    //消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                return null;
            }
        });
        //开启RockerMQ消费端
        consumer.start();
    }
}



















