RocketMQ5.0–部署与实例
一、Idea调试
1.相关配置文件
在E:\rocketmq创建conf、logs、store三个文件夹。从RocketMQ distribution部署目录中将broker.conf、logback_namesrv.xml、logback_broker.xml文件复制到conf目录。如下图所示。
 
 其中logback_namesrv.xml、logback_broker.xml分别是NameServer、Broker的日志配置文件,修改打印日志文件路径即可。broker.conf文件是Broker启动时的加载配置文件,如下代码所示。
注意:NameServer启动端口默认是9876,Broker启动端口默认10911。
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
 
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
 
namesrvAddr=127.0.0.1:9876
brokerIP1=192.168.156.245
 
# 存储路径
storePathRootDir=E:\\rocketmq\\store
#commitLog 存储路径
storePathCommitLog=E:\\rocketmq\\store\\commitlog
# 消费队列存储路径
storePathConsumeQueue=E:\\rocketmq\\store\\consumequeue
# 消息索引|存储路径
storePathindex=E:\\rocketmq\\store\\index
#checkpoint 文件存储路径
storeCheckpoint=E:\\rocketmq\\store\\checkpoint
#abort 文件存储路径
abortFile=E:\\rocketmq\\store\\abort
 
2.启动NameServer
org.apache.rocketmq.namesrv.NamesrvStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”,如下图所示。
 
 出现“The Name Server boot success. serializeType=JSON”时,则NameServer启动成功。
 
 3.启动Broker
org.apache.rocketmq.broker.BrokerStartup启动类配置环境变量ROCKETMQ_HOME,值是配置主目录“E:\rocketmq”;配置启动参数:-c E:\rocketmq\conf\broker.conf。如下图所示。
 
 出现“The broker[broker-a, 192.168.156.245:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876”时,则Broker启动成功。
 
 二、Linux部署
1.执行Maven命令
mvn -Prelease-all -DskipTests clean install -U
 
查看打包的可部署文件路径:.\distribution\target\rocketmq-5.0.0\rocketmq-5.0.0,如下图所示。
 
 2.复制rocketmq-5.0.0
 
 3.启动NameServer
nohup sh bin/mqnamesrv &
 
可能出现问题,如下图所示。原因是Windows系统下打包,换行符出现问题。解决:notepad++编辑器在Windows环境下将文本转换为Unix格式,步骤为:用Notepad++打开脚本 >> 编辑 >> 档案格式转换 >> 选择转换为UNIX格式。
 
 4.启动Broker
nohup sh bin/mqbroker -n 192.168.1.55:9876 -c /home/rocketmq-5.0.0/conf/broker.conf &
 
启动命令中,-n是指定NameServer地址,-c 是broker的配置文件。
 
 5.关闭NameServer、Broker命令
关闭NameServer:sh bin/mqshutdown namesrv
关闭Broker:sh bin/mqshutdown broker
三、事务消息实例
1.事务消息监听类
实现org.apache.rocketmq.client.producer.TransactionListener,该类有两个方法:
- executeLocalTransaction():保存本地事务中间表,用于Broker回查事务状态
 - checkLocalTransaction():Broker定时回查事务状态,根据事务状态提交或回滚事务消息
 
package com.common.instance.demo.config.rocketmq;
 
import com.common.instance.demo.entity.TMessageTransaction;
import com.common.instance.demo.service.TMessageTransactionService;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
import java.util.Date;
import java.util.List;
 
/**
 * @description 订单事务消息监听器实现类
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:44
 **/
@Component
public class OrderTransactionListenerImpl implements TransactionListener {
 
    @Resource
    private TMessageTransactionService tMessageTransactionService;
 
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
        // 组装事务
        TMessageTransaction tMessageTransaction = packageTMessageTransaction(message);
 
        // 保存事务中间表
        tMessageTransactionService.insert(tMessageTransaction);
 
        // 推荐返回UNKNOW状态,待事务状态回查
        return LocalTransactionState.UNKNOW;
    }
 
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        // 获取用户属性tabId
        String tabId = messageExt.getUserProperty("tabId");
 
        // 查询事务消息
        List<TMessageTransaction> tMessageTransactions = tMessageTransactionService.queryByTabId(tabId);
 
        if (!tMessageTransactions.isEmpty() && tMessageTransactions.size() <= 6) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }
 
        LogUtil.error("orderTransaction rollBack, tabId: " + tabId);
        return LocalTransactionState.ROLLBACK_MESSAGE;
    }
 
    // 组装事务
    private TMessageTransaction packageTMessageTransaction(Message message) {
        TMessageTransaction tMessageTransaction = new TMessageTransaction();
 
        // 获取用户属性tabId
        String tabId = message.getUserProperty("tabId");
        // 事务ID
        String transactionId = message.getTransactionId();
        tMessageTransaction.setTabId(tabId);
        tMessageTransaction.setTransactionId(transactionId);
        tMessageTransaction.setCreateBy("auto");
        tMessageTransaction.setCreateTime(new Date());
 
        return tMessageTransaction;
    }
 
}
 
2. 事务消息生产者
package com.common.instance.demo.config.rocketmq;
 
import com.alibaba.fastjson.JSON;
import com.common.instance.demo.entity.WcPendantTab;
import com.log.util.LogUtil;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
 
/**
 * @description 订单事务消息生产者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 16:54
 **/
@Component
public class OrderTransactionProducer {
 
    @Resource
    private OrderProducerProperties orderProducerProperties;
 
    @Resource
    private OrderTransactionListenerImpl orderTransactionListener;
 
    private TransactionMQProducer orderTransactionMQProducer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order transactionProducer");
            orderTransactionMQProducer = new TransactionMQProducer(orderProducerProperties.getProducerGroup());
            orderTransactionMQProducer.setNamesrvAddr(orderProducerProperties.getNameSrcAddr());
            orderTransactionMQProducer.setSendMsgTimeout(orderProducerProperties.getSendMsgTimeout());
            // 注册事务监听器
            orderTransactionMQProducer.setTransactionListener(orderTransactionListener);
            orderTransactionMQProducer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderTransactionProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    public void sendTransactionMessage(WcPendantTab data) {
        sendTransactionMessage(data, orderProducerProperties.getTopic(), orderProducerProperties.getTag(), null);
    }
 
    public void sendTransactionMessage(WcPendantTab data, String topic, String tags, String keys) {
        try {
            // 消息内容
            byte[] msgBody = JSON.toJSONString(data).getBytes(StandardCharsets.UTF_8);
            // 消息对象
            Message message = new Message(topic, tags, keys, msgBody);
            message.putUserProperty("tabId", data.getTabId());
 
            // 发送事务消息
            orderTransactionMQProducer.sendMessageInTransaction(message, null);
        } catch (Exception e) {
            LogUtil.error("OrderTransactionProducer.sendMessage()","send order rocketmq error", e);
        }
    }
 
    @PreDestroy
    public void stop() {
        if (orderTransactionMQProducer != null) {
            orderTransactionMQProducer.shutdown();
        }
    }
 
}
 
3. 事务消息消费者
package com.common.instance.demo.config.rocketmq;
 
import com.log.util.LogUtil;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
 
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.List;
 
/**
 * @description 订单消费者
 * @author TCM
 * @version 1.0
 * @date 2023/1/1 14:29
 **/
@Component
public class OrderConsumer implements MessageListenerConcurrently {
 
    @Resource
    private OrderConsumerProperties orderConsumerProperties;
 
    private DefaultMQPushConsumer orderMQConsumer;
 
    @PostConstruct
    public void start() {
        try {
            LogUtil.info("start rocketmq: order consumer");
            orderMQConsumer = new DefaultMQPushConsumer(orderConsumerProperties.getConsumerGroup());
            orderMQConsumer.setNamesrvAddr(orderConsumerProperties.getNameSrcAddr());
            orderMQConsumer.subscribe(orderConsumerProperties.getTopic(), orderConsumerProperties.getTag() == null ? "*":orderConsumerProperties.getTag());
            orderMQConsumer.setConsumeFromWhere(ConsumeFromWhere.valueOf(orderConsumerProperties.getConsumeFromWhere()));
            orderMQConsumer.registerMessageListener(this); // 注册监听器
            orderMQConsumer.start();
        } catch (MQClientException e) {
            LogUtil.error("OrderProducer.start()", "start rocketmq failed!", e);
        }
    }
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        int index = 0;
        try {
            for (; index < msgs.size(); index++) {
                // 完整消息
                MessageExt msg = msgs.get(index);
                // 消息内容
                String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
 
                LogUtil.info("消费组消息内容:" + messageBody);
            }
        } catch (Exception e) {
            LogUtil.error("OrderConsumer.consumeMessage()", "consume order rocketmq error", e);
        } finally {
            if (index < msgs.size()) {
                // 消费应答
                context.setAckIndex(index + 1);
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
 
    @PreDestroy
    public void stop() {
        if (orderMQConsumer != null) {
            orderMQConsumer.shutdown();
        }
    }
 
}
 
四、参考资料
 https://www.cnblogs.com/qdhxhz/p/11094624.html












![[Java基础] StringBuffer 和 StringBuilder 类应用及源码分析](https://img-blog.csdnimg.cn/fc9da7fb38224178a086701ed112ed15.png)






