RocketMQ(二十四)整合SpringBoot
SpringBoot整合rabbitmq使用案例
- 一 SpringBoot整合RocketMQ实现消息发送和接收
 - 消息生产者
 - 1)添加依赖
 - 2)配置文件
 - 3)启动类
 - 4)测试类
 
- 消息消费者
 - 1)添加依赖
 - 2)配置文件
 - 3)启动类
 - 4)测试类
 - 5)RocketMQMessageListener参数
 
- 测试
 
- RocketMQ发送同步消息
 - 同步消API介绍
 
- RocketMQ发送异步消息
 - 异步消息API介绍
 
- RocketMQ发送单向消息
 - RocketMQ消费者广播模式和负载均衡模式
 - RocketMQ实现顺序消息
 - RocketMQ实现延迟消息
 
一 SpringBoot整合RocketMQ实现消息发送和接收
消息生产者
1)添加依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
 
2)配置文件
server:
  port: 8081
  servlet:
    context-path: /
rocketmq:
  name-server: 127.0.0.1:9876
  producer:
    group: producer-demo1
 
3)启动类
@SpringBootApplication
public class RocketmqProducerApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext run = SpringApplication.run(RocketmqProducerApplication.class, args);
    }
}
 
4)测试类
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RocketmqProducerApplication.class})
public class ProducerTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    public void ConvertAndSendTest() {
        rocketMQTemplate.convertAndSend("springboot-mq", "hello springboot rocketmq");
    }
    
}
 
消息消费者
1)添加依赖
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
        </dependency>
 
2)配置文件
server:
  port: 8084
  servlet:
    context-path: /
rocketmq:
  name-server: 127.0.0.1:9876
  consumer:
    group: consumer-demo1
 
3)启动类
@SpringBootApplication
public class RocketmqConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(RocketmqConsumerApplication.class, args);
    }
}
 
4)测试类
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("收到消息内容:"+message);
    }
}
 
5)RocketMQMessageListener参数
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {
    String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";
    String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";
    String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";
    String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";
    String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";
    /**
     * 消费者分组
     *
     * @return
     */
    String consumerGroup();
    /**
     * 主题
     */
    String topic();
    /**
     * selectorType:消息选择器类型
     * - SelectorType.TAG:默认值,根据TAG选择,仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息
     * - SelectorType.SQL92:根据SQL92表达式选择
     */
    SelectorType selectorType() default SelectorType.TAG;
    /**
     * selectorType 对应的表达式
     */
    String selectorExpression() default "*";
    /**
     * consumeMode:消费模式
     * - ConsumeMode.CONCURRENTLY:默认值,并行处理
     * - ConsumeMode.ORDERLY:按顺序处理
     */
    ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;
    /**
     * messageMode:消息模型
     * - MessageModel.CLUSTERING:默认值,集群
     * - MessageModel.BROADCASTING:广播
     */
    MessageModel messageModel() default MessageModel.CLUSTERING;
    /**
     * 最大线程数,默认值 64
     */
    int consumeThreadMax() default 64;
    /**
     * 消费失败,最大重试次数
     * <p>
     * - 在并发模式中,-1表示16
     * - 在有序模式中,-1表示整数最大值
     */
    int maxReconsumeTimes() default -1;
    /**
     * 消息可能阻止使用线程的最长时间(分钟)
     */
    long consumeTimeout() default 15L;
    /**
     * 发送回复消息超时
     */
    int replyTimeout() default 3000;
    /**
     * 默认值 ${rocketmq.consumer.access-key:}
     */
    String accessKey() default ACCESS_KEY_PLACEHOLDER;
    /**
     * 默认值 ${rocketmq.consumer.secret-key:}
     */
    String secretKey() default SECRET_KEY_PLACEHOLDER;
    /**
     * 启用消息轨迹,默认值 false
     */
    boolean enableMsgTrace() default false;
    /**
     * 自定义的消息轨迹主题,默认值${rocketmq.consumer.customized-trace-topic:}
     * 没有配置此配置项则使用默认的主题
     */
    String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;
    /**
     * 命名服务器地址,默认值${rocketmq.name-server:}
     */
    String nameServer() default NAME_SERVER_PLACEHOLDER;
    /**
     * 默认值${rocketmq.access-channel:}
     */
    String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}
 
测试
运行生产者中ConvertAndSendTest测试方式,观察消费者监听器日志
RocketMQ发送同步消息
发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结
 果;
 相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如:
 短信通知,邮件通知,站内重要信息通知等。
 RocketMQTemplate 给我们提供了syncSend方法(有多个重载),来实现发送同步消息;
 下面给一个实例:
    /**
     * 发送同步消息
     */
    @Test
    public void sendSyncMessageTest() {
        for (int i = 0; i < 10; i++) {
            SendResult sendResult = rocketMQTemplate.syncSend("springboot-mq", "rocketmq同步消息!" + i);
            System.out.println(sendResult);
        }
    }
 
这里执行完发送同步消息返回执行结果 SendResult ;
同步消API介绍
//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
 
/**
 * 批量消息
 */
@Test
void asyncSendBatch() {
    Message<String> msg = MessageBuilder.withPayload("hello world test1").build();
    List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);
    SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);
    log.info("批量消息");
}
 
RocketMQ发送异步消息
发送异步消息是指producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API
 后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行
 。
 相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。
 RocketMQTemplate 给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;
    /**
     * 异步消息-String
     * 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包
     * 关键实现异步发送回调接口(SendCallback)
     * 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理
     * 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞
     */
    @Test
    public void sendAsyncMessage() {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.asyncSend("springboot-mq", "rocketmg异步消息!" + i,
                    new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("发送成功!");
                        }
                        @Override
                        public void onException(Throwable throwable) {
                            System.out.println("发送失败!");
                        }
                    });
        }
    }
 
类似发送同步消息,多了一个SendCallback回调接口参数,实现onSuccess和onException方法,分别
 表示异步发送成功和失败;
 
异步消息API介绍
//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)
 
RocketMQ发送单向消息
发送单向消息是 指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果
 这种方式主要用在不特别关心发送结果的场景,举例:日志发送;
 RocketMQTemplate 给我们提供了sendOneWay方法(有多个重载),来实现发送单向消息;
 下面给一个实例:
  /**
     * 单向消息
     * 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
     * 此方式发送消息的过程耗时非常短,一般在微秒级别
     * 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集
     */
    @Test
    public void sendOneWayMessage() {
        for (int i = 0; i < 10; i++) {
            rocketMQTemplate.sendOneWay("springboot-mq", "rocketmg单向消息!" + i);
        }
    }
 
RocketMQ消费者广播模式和负载均衡模式

 如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢?还是通过一
 些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?
 这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;
广播模式是每个消费者,都会消费消息;负载均衡模式是每一个消息只会被某一个消费者消费一次;
我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮
 箱,手机,站内提示;
 我们可以通过 @RocketMQMessageListener 的 messageModel 属性值来设置,
 MessageModel.BROADCASTING 是广播模式, MessageModel.CLUSTERING 是默认集群负载均衡模式;
 我们先集群负载均衡测试,加上 messageModel=MessageModel.CLUSTERING
/**
 * @ClassName: ConsumerService
 * @Description: 消息消费者
 * @Author wxl
 * @Date 2024-04-22
 * @Version 1.0.0
 **/
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}",messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("收到消息内容:"+message);
    }
}
 
RocketMQ实现顺序消息
rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;
 
 有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;
 RocketMQTemplate 给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:
 syncSendOrderly,发送同步顺序消息;
 asyncSendOrderly,发送异步顺序消息;
 sendOneWayOrderly,发送单向顺序消息;
 一般我们用第一种发送同步顺序消息;
 
 第三个参数hashKe,方法点进去:
 
 因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,
 产品ID作为参数值;
 发送到一个队列,这样方便搞顺序队列;
 以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性
 consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,
 单线程顺序接收消息;
 下面给具体事例,模拟两个订单发送消息;
消息生产者端:
    /**
     * 发送同步顺序消息
     */
    public void sendOrderlyMessage() {
        // hashKey用来计算决定消息发送到哪个消息队列    一般是订单ID,产品ID等
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");
        rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");
    }
 
消费者端:
@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",
        consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerService implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费者:收到消息内容:" + s);
    }
}
 
运行测试:

RocketMQ实现延迟消息
延迟消息
 对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,
 而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。
延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用
 户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检
 查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式
 去处理了。
Rocket的延迟消息
 RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s,
 10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类
 推。
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
 

 我们会发现,所有消息发送方法都有一个带int类型的delayLevel参数重载方法,这个就是设置延迟消息
 级别的参数。
 同时注意,每个带delayLevel参数的方法,也同时带有long类型的timeout参数,这个是设置消息发送超
 时时间,默认是3秒,我们也可以自行设置;
 同时还有 Message参数,如果发送这种类型的消息,可以携带具体的消息数据;
    /**
     * 发送延迟消息
     */
    @Test
    public void sendDelayMessage() {
        rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟1秒消息").build(), 3000, 1);
        rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟5秒消息").build(), 3000, 2);
        rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟10秒消息").build(), 3000, 3);
    }
 
运行测试:
 











![[极客大挑战 2019]PHP](https://img-blog.csdnimg.cn/direct/53ac43f480bc427ab4cb6f8c9b339d31.png)







