(八)RabbitMQ发布确认

news2025/8/12 18:32:26

发布确认

  • 1、发布确认原理
  • 2、发布确认策略
    • 2.1、开启发布确认的方法
    • 2.2、单个确认发布
    • 2.3、批量确认发布
    • 2.4、异步确认发布
    • 2.5、处理异步未确认消息

1、发布确认原理

书面文:生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置basic.ack 的multiple 域,表示到这个序列号之前的所有消息都已经得到了处理

confirm 模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果 RabbitMQ 因为自身内部错误导致消息丢失,就会发送一条 nack 消息,生产者应用程序同样可以在回调方法中处理该 nack 消息。

声明队列持久化,然后生产消费消息时设置持久化,最后消息持久化到磁盘里后才进行发布确认,保证消息的持久化不丢失。这个是消息持久化的发布确认

2、发布确认策略

2.1、开启发布确认的方法

Channel channel = RabbitMQUtil.getChannel();
 //开启发布确认
 channel.confirmSelect();

2.2、单个确认发布

它是一种同步确认发布的方式,也就是发布一个消息之后只有它被确认发布,后续的消息才能继续发布,最大的缺点就是:发布速度特别的慢,这种方式最多提供每秒不超过数百条发布消息的吞吐量

    /**
     * 单个确认
     */
    public static void single() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMQUtil.getChannel();
        //队列名字
        String queueName = UUID.randomUUID().toString();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(queueName,true,false,false,null);
        //开始时间
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = i+"";
            channel.basicPublish("",queueName,null,msg.getBytes());
            // waitForConfirm():消息发布确认是否成功
            if(channel.waitForConfirms()){
                System.out.println("消息发布成功");
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

2.3、批量确认发布

与单个等待确认消息相比,先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现问题了,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息。当然这种方案仍然是同步的,也一样阻塞消息的发布


    /**
     * 批量确认
     */
    public static void batch() throws IOException, TimeoutException, InterruptedException {
        Channel channel = RabbitMQUtil.getChannel();
        //队列名字
        String queueName = UUID.randomUUID().toString();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(queueName,true,false,false,null);
        //开始时间
        long begin = System.currentTimeMillis();
        //批量确认消息大小
        int batchSize = 100;
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = i+"";
            channel.basicPublish("",queueName,null,msg.getBytes());
            int j  = 0;
            j++;
            //发100条确认一次
            if(j == 99){
                channel.waitForConfirms();
                j =0;
            }
        }
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

2.4、异步确认发布

异步确认虽然编程逻辑比上两个要复杂,但是性价比最高,无论是可靠性还是效率都没得说,是利用回调函数来达到消息可靠性传递的,这个中间件也是通过函数回调来保证是否投递成功,也因为是异步的,发消息和确认回调是异步的,所以生产者不需要管确认,只需要疯狂发送消息就行

在这里插入图片描述


    /**
     * 异步发布确认
     */
    public static  void Async() throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //队列名字
        String queueName = UUID.randomUUID().toString();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(queueName,true,false,false,null);
        //开始时间
        long begin = System.currentTimeMillis();
        //消息确认成功的回调函数,两个参数:一个是消息的标识,一个是开启批量
        ConfirmCallback ackCallback = (long deliveryTag, boolean multiple)->{
            System.out.println("成功发布确认的消息:"+deliveryTag);
        };
        //消息确认失败回调函数
        ConfirmCallback nackCallback = (long deliveryTag, boolean multiple)->{
            System.out.println("未成功发布确认的消息:"+deliveryTag);
        };
        //准备消息的监听器,监听成功发布消息以及没成功的
        channel.addConfirmListener(ackCallback,nackCallback);//这是异步的
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = i+"";
            channel.basicPublish("",queueName,null,msg.getBytes());
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

2.5、处理异步未确认消息

这个高并发有序Map进行线程之间消息的传递,很有用

最好的解决的解决方案就是把未确认的消息放到一个基于内存的能被发布线程访问的队列,比如说用 ==ConcurrentSkipListMap(基于跳表的Map,有序)==这个队列在 confirm callbacks 线程与发布线程之间进行消息的传递,或者 ConcurrentNavigableMap

  1. 记录所有需要发送的消息,消息总和
  2. 删除已经发送确认的消息,剩下的就是未确认的消息
  3. 剩下的就是没有确认的,可以进行其他操作
/**
     * 异步发布确认
     */
    public static void Async() throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //队列名字
        String queueName = UUID.randomUUID().toString();
        //开启发布确认
        channel.confirmSelect();
        channel.queueDeclare(queueName, true, false, false, null);

        /**
         * 用于回调线程与发布线程之间的消息传递
         * 线程安全有序的哈希表,适用于高并发有序map
         * 1、将序号标签与消息内容进行关联
         * 2、轻松批量的根据序号删除。因为是有序的
         * 3、支持高并发
         */
        ConcurrentSkipListMap<Long, String> outstandingConfirm = new ConcurrentSkipListMap<>();

        //消息确认成功的回调函数,两个参数:一个是消息的标识,一个是开启批量
        ConfirmCallback ackCallback = (long deliveryTag, boolean multiple) -> {
            if (multiple) {
                //2、删除已经发送确认的消息,剩下的就是未确认的消息
                //这里 headMap() 是拿到这个小于等于该参数为key的map,也就是确认了的消息map,第二个参数是包不包括这个key
                //因为这里是批量的,所以这个deliveryTag之前的都是确认的,没有不确认的
                ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirm.headMap(deliveryTag, true);
                confirmed.clear();
            }else {
                //如果不是批量的直接删除就行,因为剩下的就是没有确认的消息
                outstandingConfirm.remove(deliveryTag);
            }
            System.out.println("成功发布确认的消息:" + deliveryTag);
        };
        //消息确认失败回调函数
        ConfirmCallback nackCallback = (long deliveryTag, boolean multiple) -> {
            //打印未确认的消息
            String msg = outstandingConfirm.get(deliveryTag);
            System.out.println("未成功发布确认的消息标记:" + deliveryTag+"消息内容:"+msg);
        };
        //准备消息的监听器,监听成功发布消息以及没成功的
        channel.addConfirmListener(ackCallback, nackCallback);//这是异步的
        //开始时间
        long begin = System.currentTimeMillis();
        for (int i = 0; i < MESSAGE_COUNT; i++) {
            String msg = i + "";
            //1、记录所有需要发送的消息,消息总和
            //其实这个PublishSeqNo也就是消息的标识 deliveryTag
            outstandingConfirm.put(channel.getNextPublishSeqNo(), msg);
            channel.basicPublish("", queueName, null, msg.getBytes());
        }
        //结束时间
        long end = System.currentTimeMillis();
        System.out.println("发布" + MESSAGE_COUNT + "个单独确认消息,耗时" + (end - begin) + "ms");
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/33424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python集合类型详解(一)——集合定义与集合操作符

今天继续给大家介绍Python相关知识&#xff0c;本文主要内容是Python集合类型定义与集合操作符。 一、集合类型定义 在Python中&#xff0c;集合是一种非常重要的组合数据类型。Python中的集合与数学中的集合非常相似&#xff0c;集合中的数据没有顺序&#xff0c;并且每个元…

第二章:Pythonocc官方demo 案例45(几何轴向曲线偏置)

源代码&#xff1a; #!/usr/bin/env python##Copyright 2009-2016 Jelle Feringa (jelleferingagmail.com) ## ##This file is part of pythonOCC. ## ##pythonOCC is free software: you can redistribute it and/or modify ##it under the terms of the GNU Lesser General …

【优化调度】遗传算法求解工件的并行调度组合优化问题【含Matlab源码 2234期】

⛄ 一、 遗传算法简介 1 问题描述 假定一个加工系统有m台机器和n件工件&#xff0c;每个工件包含一道或多道工序,工件的加工顺序是确定的,但每个工件可能有几条可行的加工路线,即每道工序可在多台不同的机床上加工,工序的加工时间和加工费用随机床的性能不同而变化。作业调度的…

并查集解析

文章目录&#x1f6a9;并查集的理解&#x1f6a9;并查集的结构与原理&#x1f6a9;并查集的实现&#x1f341;整体框架&#x1f341;路径压缩&#x1f6a9;总结&#x1f6a9;并查集的理解 并查集是基于数组操作的一个特殊数据结构&#xff0c;和以前学习[数组的堆排序]时有点相…

分析设备树文件

1.设备树是干嘛的 硬件资源有很多&#xff0c;想要实现分类管理&#xff0c;方便驱动去控制它&#xff0c;则需要设备树来管理硬件信息。 所以&#xff0c;设备树主要存放了一些设备节点信息&#xff0c;键值对&#xff0c;和属性&#xff1b;节点中也可以包含子节点。 2.设…

安全架构中的前端安全防护研究

国家互联网应急中心发布的被篡改网站数据让很多人触目惊心&#xff0c;近年来各种Web网站攻击事件频频发生&#xff0c;网站SQL注入&#xff0c;网页被篡改、信息失窃、甚至被利用成传播木马的载体Web安全形势日益严峻&#xff0c;越来越受到人们的关注。 Gartner 对安全架构的…

创建计划协议、维护创建计划、收货

创建计划协议事务码&#xff1a;ME31L创建计划协议 &#xff08;ME32L 修改计划协议 ME33L查询计划协议 ME2L查询采购订单&#xff09; 输入&#xff1a;供应商、协议类型、协议日期、采购组织、采购组、工厂、存储地点等信息后回车。 然后输入有效截至日期&#xff0c; 再点击…

计算机毕业设计java+springboot宠物商城系统

一、项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot Maven mybatis Vue 等等组成&#xff0c;B…

【JVM】垃圾回收:垃圾收集器

一、语境中的并行与并发 并行 并行描述的时多条垃圾收集器线程之间的关系&#xff0c;说明同一时间有多条这样的线程在协同工作&#xff0c;通常默认此时用户线程是处于等待状态。 并发 并发描述的是垃圾收集器线程与用户线程之间的关系&#xff0c;说明同一时间垃圾收集器线程…

简单实现一个虚拟形象系统

前言 上周启动居家开会的时候&#xff0c;看到有人通过「虚拟形象」功能&#xff0c;给自己带上了口罩、眼镜之类&#xff0c;于是想到了是不是也可以搞一个简单的虚拟形象系统。 大致想来&#xff0c;分为以下几个部分&#xff1a; 卷积神经网络(CNN) 下面讲解一下三层CN…

视频格式转换器哪个好用?万兴优转-好用的视频格式转换器

视频格式转换器是用于转换视频格式的软件&#xff0c;是指用于视频转换、音频转换、CD轨抓取、音视频混合转换、音视频剪切、连接转换、视频水印叠加、滚动字幕、个性化文字、图片叠加、视频相框叠加的音视频转换工具。 也就是说&#xff0c;视频有非常多的格式如AVI、VCD、SVC…

【JavaWeb从零到一】会话技术CookieSessionJSP

&#x1f680;【JavaWeb从零到一】系列文章目录 &#x1f6a9;【JavaWeb从零到一】前置知识 &#x1f6a9;【JavaWeb从零到一】Mysql基础总结 &#x1f6a9;【JavaWeb从零到一】JDBC详解 &#x1f6a9;【JavaWeb从零到一】JDBC连接池&JDBCTemplate Cookie&Session&…

王学岗音视频开发(二)—————OpenGLES开发实践

矩阵以及矩阵运算 上图就是m x a 的矩阵 1x30x22x1 :为左侧第一行乘以右侧第一列。 1x10x12x0 :为左侧第一行乘以右侧第二列。 -1x33x21x1:为左侧第二行乘以右侧第一列。 -1x13x11x0:为左侧第二行乘以右侧第二列。 矩阵的行列式 伴随矩阵 A*表示伴随矩阵 OpenGL 教程----屏…

Grails SpringBoot国际化不生效

问题描述&#xff1a; grails项目使用了国际化&#xff0c;按照官方文档的说法&#xff1a; 会根据用户浏览器访问时使用的Accept-Language头自动选择合适的语言。 但无论浏览器了配置什么语言甚至配置了Tomcat启动参数 -Duser.languagexxx -Duser.regionxxx页面显示依旧是英…

[附源码]java毕业设计一点到家小区微帮服务系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

QPushButton按钮用法

QPushButton 简介 QPushButton是一个很常用的一个按钮控件&#xff0c;主要用于创建一个可按压的按键。它显示了一 个文本和一个图标。另外&#xff0c;你也可以在创建时&#xff0c;指定一个快捷键。 基本用法 1. 创建 QPushButton主要有两种创建方法&#xff0c;一种是直…

SQLite实现的学生管理系统

SQLite数据库 案例资源所在地址&#xff1a; https://download.csdn.net/download/weixin_41957626/87150608?spm1001.2014.3001.5503 1.简介 1.1引入 1.前面学习的文件存储和SharedPreference存储的方式只能存储一些小型的数据但是对于复杂关系以及复杂数据结构的数据仅仅靠…

交互与前端16 Tabulator 表格实践4

说明 继续给表格来加一些小功能。 内容 1 分页 在表格初始化的地方加两行配置,表格就实现了分页 pagination:true, //enable.paginationSize:20, // this option can take any positive integer value2 超链接 这个需求的来源是,一些微服务需要注释,所以我写了很多文档…

科研教育「双目视觉技术」首选!维视MV-VS220双目立体视觉系统开发平台

NO.1产品背景 在最近大热的自动驾驶赛道&#xff0c;大疆采用新的技术路线——双目立体视觉。具体来说&#xff0c;它就是模拟人的视觉系统&#xff0c;通过两个临近摄像头所拍摄到的画面的视差&#xff0c;来还原出三维立体结构。不需要对海量数据进行标注和训练&#xff0c;可…

Echarts 散点象限图(二)动态绘制

之前发布过一篇文章Echarts散点象限图,基于死数据来绘制的,但实际开放场景中,需要请求数据,而且可能会动态更改数据,这时候需要如何处理,有什么要注意的地方,这篇文章详细说明一下。 主要需要处理的地方就是四个象限的markArea,需要根据中心的位置来画,你可以想象成…