【RocketMQ】RocketMQ入门
文章目录
- 【RocketMQ】RocketMQ入门
- 1. 消费模式
- 2. 发送/消费 消息
- 2.1 同步消息
- 2.2 异步消息
- 2.3 单向消息
- 2.4 延迟消息
- 2.5 批量消息
- 2.6 顺序消息
 
 
1. 消费模式
MQ的消费模式大致分为两种,一种是推Push,一种是拉pull。
Push模式:
- 优点: 
  - 及时性较好
 
- 缺点: 
  - 客户端没有做好流控的话容易导致客户端消息堆积甚至崩溃。
 
Pull模式:
- 优点: 
  - 客户端可以根据自己的消费能力进行消费
 
- 缺点: 
  - 拉取频率不好控制,频繁容易造成客户端压力过大,拉取间隔长容易造成消费不及时。
 
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
2. 发送/消费 消息
参考文档:RocketMQ官方文档
以下代码采用的都是rocketmq的原生api
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>
2.1 同步消息
同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

生产者发送消息代码如下:
@Test
public void simpleProducer() throws Exception {
    //创建一个生产者 (制定一个组名)
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    //连接namesrv
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //启动
    producer.start();
    for (int i = 1; i <= 10; i++) {
        //创建消息
        Message message = new Message("testTopic", ("我是一个简单的消息" + i).getBytes());
        //发送消息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult.getSendStatus());
    }
    //关闭生产者
    producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void simpleConsumer() throws Exception {
    //创建一个消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    //连接namesrv
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //订阅一个主题 * 标识订阅这个主题中所有消息,后期会有消息过滤
    consumer.subscribe("testTopic", "*");
    //设置一个监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            //这个就是消费的方法(业务处理)
            System.out.println("我是消费者");
            System.out.println(list.get(0).toString());
            System.out.println("消息内容:" + new String(list.get(0).getBody()));
            System.out.println("消费上下文" + consumeConcurrentlyContext);
            //返回值 CONSUME_SUCCESS 成功,消息会从mq出队
            // RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动
    consumer.start();
    //TimeUnit.SECONDS.sleep(100);
    //挂起当前jvm
    System.in.read();
}
2.2 异步消息
异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。
消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

注:异步发送生产者需要实现异步发送回调接口。
生产者发送消息代码如下:
@Test
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    //连接
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //启动
    producer.start();
    Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }
        @Override
        public void onException(Throwable throwable) {
            System.out.println("发送失败" + throwable.getMessage());
        }
    });
    System.out.println("我先执行");
    System.in.read();
}
消费者代码基本和同步消息的相同,不展示。
2.3 单向消息
发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

生产者发送消息代码如下:
@Test
public void onewayProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("onewayTopic", "这是一条单向消息".getBytes());
    producer.sendOneway(message);
    System.out.println("成功");
    producer.shutdown();
}
2.4 延迟消息
延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。应用场景是外卖15分钟未支付则取消订单。
RokcketMQ一共支持18个等级的延迟投递,具体时间如下:
| 投递等级(delay level) | 延迟时间 | 投递等级(delay level) | 延迟时间 | 
|---|---|---|---|
| 1 | 1s | 10 | 6min | 
| 2 | 5s | 11 | 7min | 
| 3 | 10s | 12 | 8min | 
| 4 | 30s | 13 | 9min | 
| 5 | 1min | 14 | 10min | 
| 6 | 2min | 15 | 20min | 
| 7 | 3min | 16 | 30min | 
| 8 | 4min | 17 | 1h | 
| 9 | 5min | 18 | 2h | 
生产者发送消息代码如下:
@Test
public void msProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("orderMsTopic", "订单消息".getBytes());
    //设置延迟等级
    //messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
    message.setDelayTimeLevel(3);//10s
    producer.send(message);
    System.out.println("发送事件:" + new Date());
    producer.shutdown();
}
2.5 批量消息
在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

注:批量消息大小不能超过1MIB(1024*1024),同一批的 topic 必须相同
生产者发送消息代码如下:
@Test
public void testBatchProducer() throws Exception{
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
    // 设置nameServer地址
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
            new Message("batchTopic", "我是一组消息的A消息".getBytes()),
            new Message("batchTopic", "我是一组消息的B消息".getBytes()),
            new Message("batchTopic", "我是一组消息的C消息".getBytes())
    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}
消费者消费信息代码如下:
@Test
public void msConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bathc-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("batchTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("收到消息了:" + new Date());
            System.out.println(list.size());
            System.out.println("消息体是:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}
2.6 顺序消息
待续。。。



















