RocketMQ—RocketMQ快速入门
RocketMQ提供了发送多种发送消息的模式,例如同步消息,异步消息,顺序消息,延迟消息,事务消息等。
消息发送和监听的流程
消息生产者
- 创建消息生产者producer,并制定生产者组名
- 指定Nameserver地址
- 启动producer
- 创建消息对象,指定主题Topic、Tag和消息体等
- 发送消息
- 关闭生产者producer
消息消费者
- 创建消费者consumer,制定消费者组名
- 指定Nameserver地址
- 创建监听订阅主题Topic和Tag等
- 处理消息
- 启动消费者consumer
demo程序
简单程序
新建一个springboot项目。
引入以下依赖
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.9.2</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>
</dependencies>
生产者简单案例如下:
/**
     * 生产者
*/
@Test
public void simpleProducer() throws Exception {
    //创建一个生产者,并指定一个组名
    DefaultMQProducer producer = new DefaultMQProducer("test-1-producer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    producer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //启动
    producer.start();
    //指定topic,创建一个消息
    Message message = new Message("testTopic1", "一条简单消息".getBytes());
    //发送消息,并获得状态
    SendResult sendResult = producer.send(message);
    System.out.println("发送消息的状态为:"+sendResult);
    //关闭生产者
    producer.shutdown();
}
运行程序,可以看到运行结果如下:

可以看到出现了新的topic

消费者简单案例如下:
/**
     * 接受消息
 */
@Test
public void simpleConsumer() throws Exception {
    //创建一个消费者,并指定一个组名
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-1-consumer-group");
    //连接namesrv,参数是namesrv的ip地址:端口号
    consumer.setNamesrvAddr(MqConstant.NAME_SERVER_ADDR);
    //订阅一个主题 *号表示订阅这个主题中所有的消息
    consumer.subscribe("testTopic1","*");
    //设置一个监听器(一直监听,异步回调方式)
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            //这里就是消费的方式 做业务处理
            System.out.println("消费者");
            System.out.println(list.get(0).toString());
            System.out.println("消息内容:"+new String(list.get(0).getBody()));
            System.out.println("消费上下文:"+consumeConcurrentlyContext);
            //返回消费的状态 ,CONSUME_SUCCESS表示成功,消息会从队列中出队
            //如果返回RECONSUME_LATER/报错/null,表示消费失败,消息会重新回到队列,给当前消费者或者其他消费者消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动消费者
    consumer.start();
    //挂起当前jvm,防止主线程结束,让监听器一直监听
    System.in.read();
}
运行截图如下:




















