【RabbitMQ】Producer之publisher confirm、transaction - 基于AMQP 0-9-1(二)

news2025/7/19 17:06:30

上篇文章主要介绍Producer的mandatory参数,备份队列和TTL的内容,这篇文章讲继续介绍Producer端的开发,主要包括发布方确认和事务机制。

发布方确认

消息持久化机制可以保证应服务器出现异常导致消息丢失的问题,但是Producer将消息发送出去,并不知道消息是否正确到达服务端并持久化。如果在到达服务端,或者到达服务端未持久化到磁盘,消息就丢失,那么问题仍然存在。如下图,第一步或者第二步出现问题,消息依然会丢失。

 RabbitMQ为解决这个问题,提供了发布方确认(Publisher Confirm)机制。

生产者将Channel设置成confirm模式,所有在改channel上发布的消息都会被指派一个唯一的ID(序号从1开始),消息被路由到队列之后,RabbitMQ就会发送一个确认(ack,包含此消息的ID)给生产者,这样生产者就知道消息已经正确发送到RabbitMQ了,如果消息是持久化,RabbitMQ会等到消息落盘再回复。RabbitMQ回复的消息包含了deliveryTag,表示确认消息的序号,还包含multiple,表示到这个消息序号之前所有的消息都已得到处理了。

 下面通过代码演示发布方确认的使用。

channel.confirmSelect();

String message = "test ack";
channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

// 如果没有调用comfirmSelect方法开启,直接调用waitFormConfirms
// 会报java.lang.IllegalStateException
channel.waitForConfirms();

此中方式也是串行同步等待的方式,生产者发送消息之后,会被阻塞,直到RabbitMQ接收到消息返回(对于持久化消息,等待消息落盘后才返回),生产者接收到ack才会下一条消息的处理,这显然会有性能问题。

解决方案有两种:

  1. 批量发送确认
  2. 异步确认

批量发送

批量发送方法,客户端程序需要定量或者定时调用waitFormConfirms方法等待RabbitMQ的确认返回,相对于上面的方式,性能有极大的提升。

但也有问题,当同一批次中出现消息被nack或者超时,需要客户端程序处理并重试,这有可能导致消息重复。

下面是批量发送的代码实现。

channel.confirmSelect();
int MsgCount = 0;
while (true) {
    channel.basicPublish("exchange", "routingKey", null, "batch confirm test".getBytes());
    //将发送出去的消息存入缓存中,缓存可以是一个ArrayList 或者BlockingQueue 之类的
    if (++MsgCount >= BATCH_COUNT) {
        MsgCount = 0;
        try {
            if (channel.waitForConfirms()) {
                //将缓存中的消息清空
            }
            //将缓存中的消息重新发送
        } catch (InterruptedException e) {
            e.printStackTrace();
            //将缓存中的消息重新发送
        }
    }
}

异步发送

异步方案是推荐使用的方式,它的优点初了性能优良之外,还有只需要在回调方法nack中处理没有被RabbitMQ成功处理的消息,SpringAMQP中也是这种方案。

实现的时候,只需要添加ConfirmListener接口的实现,它主要有两个方法:handleAck和handleNack。下面是代码实现,是SpringAMQP的简化版(关于SpringAMQP的细节,小伙伴们可以关注RabbitMQ系列文章后续的更新)。 

// 维护消息序号和消息,在回调函数中做相应处理
// SpringAMQP也是这种方案,只不过这里简化了
ConcurrentSkipListMap<Long, String> unconfirmMap = new ConcurrentSkipListMap<>();

channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag , boolean multiple) throws IOException {
        System.out.println("Nack, SeqNo : " + deliveryTag + ", multiple : " + multiple);
        if (multiple) {
            confirmMap.headMap(deliveryTag - 1).clear();
        } else {
            confirmMap.remove(deliveryTag);
        }
    }

    // 处理发送失败的场景,尝试重发
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        System.out.println("handleNack : " + deliveryTag + "  " + multiple);
                // 注意:为防止消息一直失败,导致死循环,可以在消息上加属性x-retries,每次重发前,先判断已经发送的次数,达到阈值,不再发送
                if(multiple){
                    ConcurrentNavigableMap<Long, String> headMap = unconfirmMap.headMap(deliveryTag + 1);
                    Set<Map.Entry<Long, String>> entrySet = headMap.entrySet();
                    Iterator<Map.Entry<Long, String>> iterator = entrySet.iterator();
                    while(iterator.hasNext()){
                        String removed = iterator.next().getValue();
                        channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, removed.getBytes());
                    }
                } else {
                    String removed = unconfirmMap.remove(deliveryTag);
                    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, removed.getBytes());
                }
    }
});

//模拟一直发送消息
while (true) {
    long nextSeqNo = channel.getNextPublishSeqNo();
    channel.basicPublish(exchangeName, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes ());
    confirmSet.add(nextSeqNo);
}

事务机制

事务机制是解决发送端无法感知消息是否正确达到服务端的另外一种方案。事务的使用非常简单,先直接上代码感受下。

String message = "tx message";
try{
    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    channel.txCommit();
} catch(Exception e){
    e.printStackTrace();
    channel.txRollback();
}

使用txSelect方法开启事务,只有消息成功被Rabbit接收,事务才会提交,如果发生任何异常,消息都会被回滚。

使用事务的缺点就是性能问题,因为发送一条消息之后,会阻塞发送端,直到Rabbit把消息持久化到磁盘,才会返回响应给发送端,之后发送端才能继续发送下一条。所以推荐使用Publisher Confirm方案。

好了,以上就是基于AMQP 0-9-1 协议,关于Producer的常用API使用第二部分的分享。

RabbitMQ系列文章会陆续更新,欢迎各位小伙伴关注后面的技术分享。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/395504.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

线程池ThreadPoolExecutor,从0到0.6

ThreadPoolExecutor是JDK提供的在java.util.concurrent包中的一个用于创建线程池的工具类。 一、ThreadPoolExecutor的7个参数 corePoolSize&#xff1a;核心线程数&#xff0c;线程池中保留的最小的线程数量&#xff0c;即使它们是空闲的也不会被销毁&#xff0c;除非allowCor…

Modbus转profinet网关连接1200PLC在博图组态与驱动器通讯程序案例

本案例给大家介绍由兴达易控modbus转profinet网关连接1200PLC在博图软件无需编程&#xff0c;实现1200Profinet转modbus与驱动器通讯的程序案例 硬件连接&#xff1a;1200PLC一台&#xff1b;英威腾DA180系列驱动器一台&#xff1b;兴达易控modbus转profinet网关一台 下面就是…

【Git】拉取 Pull Requests 测试的两种方法

文章目录前言参考目录方法说明方法一&#xff1a;直接拉取方法二&#xff1a;使用 diff 文件2.1、保存 diff 文件2.2、新建分支并执行文件前言 最近有参与到框架帮忙进行简单的 Pull Requests&#xff08;以下简称 PR&#xff09; 测试&#xff0c;因为也是第一次接触到这种操…

代码随想录 动态规划||01背包理论 416

Day3601背包理论基础01背包有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品只能用一次&#xff0c;求解将哪些物品装入背包里物品价值总和最大。暴力的解法是指数级别的时间复杂度。进而才需要动态规划的解法来…

Java学习笔记 --- Servlet(1)

一、Servlet技术 1、Servlet基本介绍 1、Servlet 是 JavaEE 规范之一。规范就是接口 2、Servlet 就 JavaWeb 三大组件之一。三大组件分别是&#xff1a;Servlet 程序、Filter 过滤器、Listener 监听器。 3、Servlet 是运行在服务器上的一个 java 小程序&#xff0c;它可以…

院士交锋,专家论道|NLP大模型技术与应用十大挑战,剑指AI未来

2023年2月24日下午&#xff0c;第四届OpenI/O启智开发者大会NLP大模型分论坛在深圳人才研修院隆重举办。NLP大模型论坛会议现场众多NLP领域顶级专家学者与多家国产NLP大模型开发团队汇聚一堂&#xff0c;学术界与产业界破圈交流&#xff0c;激荡尖端思想、分享前沿动态&#xf…

Linux学习第二十二节-网卡IP设置

1.修改网卡IP地址 方式一&#xff1a;通过修改网卡配置文件修改 网卡配置文件位置&#xff1a; /etc/sysconfig/network-scripts/网卡名 #ifconfig 表示用于显示和设置网卡的参数 #ip addr 表示用于显示和设置网卡的参数 #systemctl restart network 表示重启网络 …

Spark Join大小表

Spark Join大小表无法广播过滤后大小表数据分布均匀大小表 : 大小表尺寸相差 3 倍以上 Join 优先考虑 BHJ小表的数据量 > 广播阈值时&#xff0c;优先考虑 SHJ 无法广播 大表 100GB、小表 10GB&#xff0c;都远超广播变量阈值 当小表的尺寸 > 8GB时&#xff0c;创建广…

剑指-Offer-30-包含min函数的栈

剑指 Offer 30.包含min函数的栈 题目描述&#xff1a; 定义栈的数据结构&#xff0c;请在该类型中实现一个能够得到栈的最小元素的 min 函数在该栈中&#xff0c;调用 min、push 及 pop 的时间复杂度都是 O(1)。 示例&#xff1a; MinStack minStack new MinStack(); minSt…

Python中的错误是什么,Python中有哪些错误

7.1 错误(errors) 由于Python代码通常是人类编写的&#xff0c;那么无论代码是在解释之前还是运行之后&#xff0c;或多或少总会出现一些问题。 在Python代码解释时遇到的问题称为错误&#xff0c;通常是语法和缩进问题导致的&#xff0c;这些错误会导致代码无法通过解释器的解…

2023年绿色建筑国际会议(ICoGB 2023)

2023年绿色建筑国际会议&#xff08;ICoGB 2023&#xff09; 重要信息 会议网址&#xff1a;www.icogb.org 会议时间&#xff1a;2023年5月19-21日 召开地点&#xff1a;斯德哥尔摩 截稿时间&#xff1a;2023年4月1日 录用通知&#xff1a;投稿后2周内 收录检索&#xff…

剑指 Offer 61 扑克牌中的顺子

摘要 扑克牌中的顺子 一、集合 Set 遍历 根据题意&#xff0c;此5张牌是顺子的 充分条件 如下&#xff1a; 除大小王外&#xff0c;所有牌 无重复 &#xff1b;设此5张牌中最大的牌为max&#xff0c;最小的牌为min&#xff08;大小王除外&#xff09;&#xff0c;则需满足…

深入理解浏览器解析机制和XSS向量编码

目录 1、HTML解析 字符实体(character entities) HTML字符实体(HTML character entities) 字符引用(character references) 在HTML中有五类元素 五类元素的区别如下 深入了解下RCDATA元素 2、URL解析 3、JavaScript解析 4、解析流 1、HTML解析 从XSS的角度来说&…

es倒排索引原理

1、简介 网上看到的一篇文章&#xff0c;对Lucene的倒排索引是如何执行的&#xff0c;说的比较易懂&#xff0c;就转过来分享下。 Elasticsearch是通过Lucene的倒排索引技术实现比关系型数据库更快的过滤。特别是它对多条件的过滤支持非常好&#xff0c;比如年龄在18和30之间&a…

kubeadm安装K8S(集群)

前言市面上很多k8s的安装工具&#xff0c;作为产品的设计者和推广者&#xff0c;K8S组织也知道自己的产品部署起来十分的困难&#xff0c;于是把开源爱好者写的工具kubeadmn收编为正规军&#xff0c;纳入到了自己的麾下。为什么我们要用kubeadmn来部署&#xff1f;因为kubeadm不…

【代码实践】DeepBDC for few-shot learning代码运行

DeepBDC是Jiangtao Xie等人在CVPR2022上提出的few-shot classification方法&#xff0c;论文全名为“Joint Distribution Matters: Deep Brownian Distance Covariance for Few-Shot Classification”。本文旨在记录在Window系统下运行该官方代码&#xff08;https://github.co…

Linux学习第二十四节-Podman容器

一、容器的概念 容器是由一个或多个与系统其余部分隔离的进程组成的集合。我们可以理解为“集装箱”。 集装箱是打包和装运货物的标准方式。它作为一个箱子进行标记、装载、卸载&#xff0c;以及从一个 位置运输到另一个位置。该容器的内容与其他容器的内容隔离&#xff0c…

传统企业数字化转型真的有必要吗?应该如何做转型?

随着数字经济的快速发展&#xff0c;各行各业数字化转型成为必然。从最初的信息化建设&#xff0c;到数字企业、数字政府建设&#xff0c;再到如今的数字经济建设&#xff0c;传统企业在数字化转型中的作用越来越大。与此同时&#xff0c;数字化转型对传统企业提出了更高的要求…

【Java开发面试】AHXX面试总结

1. java中常用的集合有哪些 java中常用的集合类有List,Set,Map,其中List和Set继承了Collection。 List的实现类有&#xff1a;ArrayList&#xff0c;LinkedList&#xff0c;Vector&#xff0c;Stack Set的实现类有&#xff1a;TreeSet&#xff0c;HashSet Map的实现类有&#…

MySQL运维篇之读写分离

04、读写分离 4.1、介绍 读写分离&#xff0c;简单地说是把对数据库的读和写操作分开&#xff0c;以对应不同的数据库服务器。主数据库提供写操作&#xff0c;从数据库提供读操作&#xff0c;这样能有效地减轻单台数据库的压力。 通过Mycat即可轻易实现上述功能&#xff0c;…