RocketMQ 5.0 学习笔记

news2025/7/22 18:09:11

1. 需求

背景:业务需要,平台将使用rocketMQ来实现消息的发送与消费,替代redis的消息功能。

需要在搭建好rocketMQ平台后,进行研究和验证。

技术:Springboot + RocketMQ5.0

使用场景:签到活动,给用户推送消息,日志上报等

2. 笔记

2.1 安装RocketMQ 5.0

2.1.1 下载

官网:https://rocketmq.apache.org/zh/docs/quickStart/01quickstart/

请注意下载二进制包,二进制包是已经编译完成后可以直接运行的,源码包是需要编译后运行的。

二进制包:https://dist.apache.org/repos/dist/release/rocketmq/5.1.0/rocketmq-all-5.1.0-bin-release.zip

2.1.2 修改参数

解压文件到 D:\rocketmq-5.0,就相当于安装好了
默认的java运行内存很大,这里要修改一下内存配置:
进入bin目录,修改runbroker.sh文件和runserver.sh(如果是windows系统,修改runbroker.cmd文件runserver.cmd

原本是4g,4g,2g的配置,我这里修改为了256m,256m,256m的配置,两个文件都是修改成这样就差不多了。
linux: JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m"
windows: set "JAVA_OPT=%JAVA_OPT% -server -Xms256m -Xmx256m"

修改保存后,就是启动了。
在这里插入图片描述

2.2 启动

启动可以按照官网的quick start启动,如下:

2.2.1 本地windows启动
  1. 启动NameServer
start mqnamesrv.cmd

在这里插入图片描述

  1. 启动broker
start mqbroker -n 127.0.0.1:9876 autoCreateTopicEnable=true

在这里插入图片描述

  1. 启动proxy
start mqbroker.cmd -n 127.0.0.1:9876

在这里插入图片描述

2.2.2 Linux启动

分别是在解压后的rocketMQ文件夹下执行如下命令:

  1. 启动mqnamesrv请在rocketMQ解压后的文件夹中的bin目录同级使用下面这些命令
### 启动namesrv
$ nohup sh bin/mqnamesrv &
 
### 验证namesrv是否启动成功
$ tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
  1. 再启动Broker + proxy
### 先启动broker
$ nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

### 验证broker是否启动成功, 比如, broker的ip是192.168.1.2 然后名字是broker-a
$ tail -f ~/logs/rocketmqlogs/broker_default.log 
The broker[broker-a,192.169.1.2:10911] boot success...

可以使用jps查看或者使用如下命令查看日志文件:

tail -f ~/logs/rocketmqlogs/broker.log
  1. 如果用jps可查看启动的服务

OK,启动完成

2.3 RocketMQ Dashborad

参见git官网 https://gitcode.net/mirrors/apache/rocketmq-dashboard/

  1. 本地启动rocketmq-dashboard项目

将项目拉到本地后,idea打开,修改配置文件application.yml

rocketmq:
  config:
    # if this value is empty,use env value rocketmq.config.namesrvAddr  NAMESRV_ADDR | now, default localhost:9876
    # configure multiple namesrv addresses to manage multiple different clusters
    namesrvAddrs:
      - 127.0.0.1:9876  #修改namesrv的地址
      - 127.0.0.2:9876

启动:

windos:启动项目,idea -> run application

linux:先用maven打成jar包,然后 java-jar 启动

2.4 rocketmq + springboot

项目结构:
在这里插入图片描述

  1. 导入依赖
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client-java</artifactId>
            <version>5.0.4</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
  1. application.properties
server.port=8080

#自定义proxy地址
rocketmq.proxy = 127.0.0.1:8081
  1. 配置生产者
package com.example.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Slf4j
@Configuration
public class RocketConfig {

    // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
    @Value("${rocketmq.proxy}")
    private String mqProxy;

    @Bean(name="MyProducer")
    public Producer createProducer(){
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(mqProxy);
        ClientConfiguration configuration = builder.build();

        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
        try {
            log.info("初始化rocketmq5.0生产者: proxy:{}",mqProxy);
            Producer producer = provider.newProducerBuilder()
                    .setClientConfiguration(configuration).build();
            log.info("初始化rocketmq5.0生产者成功: proxy:{}", mqProxy);
            return producer;
        } catch (ClientException e) {
            log.info("初始化rocketmq5.0生产者失败:{}", e);
        }
        return null;
    }
}
  1. 消费者
package com.example.service;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;
import java.util.Collections;

@Slf4j
@Component
public class RocketConsumer {

    @Value("${rocketmq.proxy}")
    private String mqProxy;

    // 为消费者指定所属的消费者分组,Group需要提前创建。
    private static final String My_Consumer_Group = "myConsumerGroup1";

    // 指定需要订阅哪个目标Topic,Topic需要提前创建。
    private static final String My_Topic = "myTopicTest1";

    @Bean(name = "MyConsumer")
    public void mqConsumer(){
        ClientServiceProvider provider = ClientServiceProvider.loadService();
        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
                .setEndpoints(mqProxy).build();
        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
        String tag = "*";
        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);

        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
        try {
            log.info("构建消费者:proxy: {}, consumer_group: {}, topic: {}", mqProxy, My_Consumer_Group, My_Topic);
            provider.newPushConsumerBuilder().setClientConfiguration(clientConfiguration)
                    .setConsumerGroup(My_Consumer_Group)
                    // 设置预绑定的订阅关系。
                    .setSubscriptionExpressions(Collections.singletonMap(My_Topic, filterExpression))
                    // 设置消费监听器。
                    .setMessageListener(messageView -> {
                        // 处理消息并返回消费结果。
                        log.info("消费消息:{}", messageView);
                        log.info("消息内容:messageId={}, messageBody={}", messageView.getMessageId(),
                                StandardCharsets.UTF_8.decode(messageView.getBody()).toString());
                        return ConsumeResult.SUCCESS;
                    }).build();
            log.info("构建消费者成功:proxy: {}, consumer_group: {}, topic: {}", mqProxy, My_Consumer_Group, My_Topic);
        } catch (ClientException e) {
            log.info("构建消费者异常:proxy: {}, consumer_group: {}, topic: {}, Excepiton:", mqProxy, My_Consumer_Group, My_Topic, e);
        }
    }
}
  1. 接口测试
package com.example.controller;

import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.message.MessageBuilder;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;

@RestController
public class TestMqController {

    @Resource(name = "MyProducer")
    private Producer producer;

    @GetMapping("/sendMessage")
    public String sendMessage() throws ClientException {
        MessageBuilder messageBuilder = new MessageBuilderImpl();
        String msgStr = "a test message for rocketmq5.0 ...";
        Message message = messageBuilder.setTopic("myTopicTest1")
                .setBody(msgStr.getBytes(StandardCharsets.UTF_8)).build();
        SendReceipt send = producer.send(message);
        return "success";
    }
}
  1. 启动服务

在这里插入图片描述

  1. 调接口
http://127.0.0.1:8080/sendMessage
  1. 后台日志
2023-03-07 19:19:15.915  INFO 21508 --- [onsumption-1-34] com.example.service.RocketConsumer       : 消费消息:MessageViewImpl{messageId=01A87EEA967B8354040418B7B300000000, topic=myTopicTest1, bornHost=DESKTOP-RNCSLDE, bornTimestamp=1678187955816, endpoints=ipv4:127.0.0.1:8081, deliveryAttempt=1, tag=null, keys=[], messageGroup=null, deliveryTimestamp=null, properties={}}
2023-03-07 19:19:15.915  INFO 21508 --- [onsumption-1-34] com.example.service.RocketConsumer       : 消息内容:messageId=01A87EEA967B8354040418B7B300000000, messageBody=a test message for rocketmq5.0 ...
  1. dashborad

本地启动rocketmq dashborad, 修改服务启动端口 server.port: 8088

访问面板:http://127.0.0.1:8088/#/ (我调用了4次接口)

在这里插入图片描述
在这里插入图片描述

3. 思考

  1. 消费异常如何处理?
    • 打印日志,记录msgId, body等
    • 重试机制是怎样的
  2. 之前的消息是存在redis,消息如何从redis平滑迁移到rocketmq

代码参考文章:RocketMQ 5.0 实战

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/395019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DBCO intermidate 3,二苯并环辛炔-四乙酰甘露糖胺一种生化小分子糖标记

DBCO-四乙酰甘露糖胺 &#xff0c;二苯并环辛炔-四乙酰甘露糖胺 | 纯度&#xff1a;95% | DBCO intermidate 31.试剂信息&#xff1a;CAS&#xff1a;N/A外观&#xff1a;固体/粉末分子量&#xff1a;C33H34N2O11分子式&#xff1a;634.64溶解性&#xff1a;溶于有机溶剂&#…

Unity项目优化方案2023

每年整个新活&#xff0c;每年出个手游项目。又到了项目收尾的季节&#xff0c;也是最掉头发的时候。这两周开启漫漫的优化之路。老方法&#xff0c;先按住Ctrl7&#xff0c;打开profiler性能分析工具&#xff0c;找到性能占用的大头。不用看也能猜到&#xff0c;Batches是优化…

JavaEE简单示例——MyBatis的二级缓存机制

简单介绍&#xff1a; 在之前&#xff0c;我们介绍了关于MyBatis的一级缓存机制&#xff0c;之前我们说过&#xff0c;一级缓存是基于SqlSession的对同一条SQL语句多次查询的时候&#xff0c;会将第一次查询的结果缓存到内存中&#xff0c;之后的所有的相同的查询会直接从内存…

修改redis改key值不改过期时间

今天在做图片验证码的时候遇到一个问题。用redis的生命周期来存放&#xff0c;用户输入错误次数。 三十秒内输错三次就&#xff0c;等待三十分钟。 那么问题来了&#xff0c;如果说第一次输入错误&#xff0c;应该是 key为用户用&#xff0c;value 为 次数2 ex就为30秒 &…

[数据结构]:13-插入排序(顺序表指针实现形式)(C语言实现)

目录 前言 已完成内容 插入排序实现 01-开发环境 02-文件布局 03-代码 01-主函数 02-头文件 03-PSeqListFunction.cpp 04-SortCommon.cpp 05-SortFunction.cpp 结语 前言 此专栏包含408考研数据结构全部内容&#xff0c;除其中使用到C引用外&#xff0c;全为C语言代…

M2E2: Cross-media Structured Common Space for Multimedia Event Extraction 论文解读

Cross-media Structured Common Space for Multimedia Event Extraction 论文&#xff1a;multimediaspace2020.pdf (illinois.edu) 代码&#xff1a;limanling/m2e2: Cross-media Structured Common Space for Multimedia Event Extraction (ACL2020) (github.com) 期刊/会议…

【3D点云】目标检测总结(持续汇总)

系列文章目录 提示&#xff1a;这里可以添加系列文章的所有文章的目录&#xff0c;目录需要自己手动添加 例如&#xff1a;第一章 Python 机器学习入门之pandas的使用 提示&#xff1a;写完文章后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目…

Java Spring 中 Resources 路径若干问题

ant-style资源路径通配符 ANT通配符有三种&#xff1a; 最长匹配规则&#xff08;has more characters&#xff09;&#xff0c;即越精确的模式越会被优先匹配到。例如&#xff0c;URL请求/app/dir/file.jsp&#xff0c;现在存在两个路径匹配模式/**/*.jsp 和 /app/dir/*.js…

C++回顾(九)——多继承

9.1 多继承 9.1.1 概念 一个类有多个直接基类的继承关系称为多继承&#xff08;多个父类&#xff09;多继承声明语法 class 派生类名 : 访问控制 基类名1 , 访问控制 基类名2 , … , 访问控制 基类名n {数据成员和成员函数声明 }&#xff1b;类 C 可以根据访问控制同时…

Git在某个节点切出新分支

操作前&#xff0c;必须先备份分支&#xff0c;避免丢失代码&#xff01;&#xff01;&#xff01;&#xff01; 操作前&#xff0c;必须先备份分支&#xff0c;避免丢失代码&#xff01;&#xff01;&#xff01;&#xff01; 操作前&#xff0c;必须先备份分支&#xff0c;避…

Mr. Cappuccino的第46杯咖啡——Maven多模块项目可插拔式打包部署方案

Maven多模块项目可插拔式打包部署方案需求调研前准备项目结构模块之间的依赖关系项目pom文件项目代码代码运行效果方案调研需要实现的效果解决方案代码实现打包运行效果完整打包测试插拔式打包测试最佳实践项目结构测试运行效果完整打包测试插拔式打包测试需求 早期的【train-h…

吐血整理,自动化测试pytest测试框架,资深测试带你少走弯路......

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 Pytest框架详解 py…

ledcode----丢失的数字

目录 题目截图&#xff1a; 题目接口&#xff1a; 第一种解法&#xff1a; 思路&#xff1a; 第二种解法&#xff1a;差值法 思路&#xff1a; 第三种解法&#xff1a;位运算异或法 关于异或操作符的预备知识&#xff1a; 思路&#xff1a; 例子&#xff1a;输入数组[0…

海思3531a pjsip交叉编译

学习文档&#xff1a; PJSUA2 Documentation — PJSUA2 Documentation 1.0-alpha documentationhttps://www.pjsip.org/docs/book-latest/html/index.html ./configure --prefix/opensource/pjproject-2.12/build3531a \ --host/opt/hisi-linux/x86-arm/arm-hisi…

MySQL全解[集群篇]

目录日志错误日志二进制日志格式查看删除查询日志慢查询日志主从复制原理docker搭建分库分表拆分策略垂直拆分垂直分库垂直分表水平拆分水平分库水平分表实现技术MyCat2mysql2对比mycat1.xdocker运行mycat2日志 错误日志 错误日志是 MySQL 中最重要的日志之一&#xff0c;它记…

NYUv2生成边界GT(2)

由NYUv2生成边界GT(1)可知&#xff0c;我们每张GT图片都生成一个对应的.bin文件。存放在D:\datasets\data_proc\train\edge_labels_40文件夹下&#xff0c;下一步我们需要生成.png文件&#xff0c;即需要使用convert_bin_to_png.py。 # -*- coding: utf-8 -*- import numpy as…

【Azure 架构师学习笔记】-Azure Storage Account(1)- Queue Storage

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Storage Account】系列。 接上文 【Azure 架构师学习笔记】-Azure Storage Account&#xff08;1&#xff09;-类型简介 前言 Azure Storage Queues 是一个专门用来处理基于云环境队列的Azure 服务。每个队列都维护着…

Netty之ChannelHandler初解

目录 目标 Netty版本 Netty官方API 实战 Netty服务器 入栈ChannelHandler读入数据顺序案例 出栈ChannelHandler写出数据顺序案例 ChannelHandlerContext和NioSocketChannel写入数据时有什么不同 Pipeline添加多个ChannelHandler有什么意义 目标 掌握ChannelHandler基…

海康工业相机使用教程

工业相机使用一、硬件连接1、准备材料2、相机供电&#xff08;1&#xff09;区分电源适配器正负极&#xff08;2&#xff09;连接相机电源线缆&#xff08;3&#xff09;连接完成后&#xff0c;相机蓝色灯常亮则成功3、软件连接&#xff08;1&#xff09;MVS客户端下载地址&…

你真的会在阳光下拍照片么?

你好&#xff0c;我是小麥。 上节课我们讲了如何通过影子判断光的质量&#xff0c;也就是光的软硬&#xff0c;这节课我们来接着说一说光的方向和环境光的实际运用。 虽然在现实生活里&#xff0c;我们可能没有从软硬的角度观察过光线&#xff0c;但我相信你在拍照片的时候一…