六、死信队列

news2025/7/17 4:05:09

1、死信的概念

2、死信来源

3、死信实战

3.1 代码架构

在这里插入图片描述

  • 正常队列绑定正常交换机
  • 正常队列绑定死信交换机
  • 死信队列绑定死信

3.2 消息TTL过期变成死信

生产者向 normal_exchange发送消息,通过路由键zhangsan路由到 normal-queue中,消息设置TTL属性

/**
 * @author houChen
 * @date 2022/11/12 20:09
 * @Description: 死信队列实战: ttl
 * 生产者
 * 设置消息具有过期时间属性,当消息过期后会经过死信交换机路由到死信队列
 */
public class Producer {
    private static final String NORMAL_EXECAGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            //1、创建一个交换机
            channel.exchangeDeclare(NORMAL_EXECAGE, "direct");
            //设置消息的TTL时间
            AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().expiration("10000").build();
            for (int i = 0; i < 11; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXECAGE, "zhangsan", basicProperties, message.getBytes());
                System.out.println("生产者发送消息:" + message);
            }
        }
    }
}

消费者c1不启动,模拟消息在normal-queue中逗留超过10s,导致消息过期,经过dead_exchange路由到dead-queue

/**
 * @author houChen
 * @date 2022/11/12 20:33
 * @Description: 消费者c1代码
 */
public class Consumer01 {

    private static final String NORMAL_EXCHANGE = "normal_exchange";

    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        //正常队列绑定死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        Map<String, Object> params = new HashMap<>();
        //key是固定的
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        params.put("x-dead-letter-routing-key", "lisi");
        String normalQueueName = "normal-queue";
        channel.queueDeclare(normalQueueName, false, false, false, params);
        channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");
        System.out.println("等待接收消息");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println("Consumer01接收到消息:" + message);
        };
        channel.basicConsume(normalQueueName, false, deliverCallback, consumerTag -> {
        });
    }
}

消费者c2消费死信队列中的消息

/**
 * @author houChen
 * @date 2022/11/12 21:11
 * @Description: Consumer02 会消费死信队列的消息
 */
public class Consumer02 {

    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        //声明死信队列,绑定
        String deadQueueName = "dead-queue";
        channel.queueDeclare(deadQueueName, false, false, false, null);
        channel.queueBind(deadQueueName, DEAD_EXCHANGE, "lisi");
        System.out.println("等待死信接收消息");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println("Consumer02接收死信队列消息:" + message);

        };
        channel.basicConsume(deadQueueName, true, deliverCallback, consumerTag -> {
        });
    }
}

启动生产者和消费者c2,发现经过10s后,消费者c2消费到生产者生产的消息,表明normal-queue中的消息过期后,确实经由dead-exchange 路由到dead-queue
在这里插入图片描述

3.3 队列达到最大长度

当 队列达到最大长度后,再往队列中投递消息时,消息会变成死信

1) 消息生产者代码去掉 TTL 属性:

/**
 * @author houChen
 * @date 2022/11/12 20:09
 * @Description: 死信队列实战: 队列达到最大长度导致消息进入死信队列
 * 生产者
 *  去掉消息的过期属性
 */
public class Producer {
    private static final String NORMAL_EXECAGE = "normal_exchange";

    public static void main(String[] args) throws Exception {
        try (Channel channel = RabbitMqUtils.getChannel();) {
            //1、创建一个交换机
            channel.exchangeDeclare(NORMAL_EXECAGE, "direct");
            for (int i = 0; i < 10; i++) {
                String message = "info" + i;
                channel.basicPublish(NORMAL_EXECAGE, "zhangsan", null, message.getBytes());
                System.out.println("生产者发送消息:" + message);
            }

        }
    }
}

2) C1 消费者给 normal-queue 添加最大长度的属性 (启动之后关闭该消费者 模拟其接收不到消息)
【注意】 此时需要在RabbitMQ的控制台将 normal-queue删除,不然创建队列会报错

/**
 * @author houChen
 * @date 2022/11/12 20:33
 * @Description: 消费者c1代码
 */
public class Consumer01 {

    private static final String NORMAL_EXCHANGE = "normal_exchange";

    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        
        //正常队列绑定死信交换机
        Map<String, Object> params = new HashMap<>();
        //key是固定的
        //设置死信交换机
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //设置死信路由键
        params.put("x-dead-letter-routing-key", "lisi");
        //设置正常队列的最大长度
        params.put("x-max-length", 6);

        //声明正常队列绑定正常交换机
        String normalQueueName = "normal-queue";
        channel.queueDeclare(normalQueueName, false, false, false, params);
        channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");

        System.out.println("等待接收消息");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            System.out.println("Consumer01接收到消息:" + message);
        };
        channel.basicConsume(normalQueueName, true, deliverCallback, consumerTag -> {
        });
    }
}

3)测试结果
消费者生产 10 条消息后,有 6 条消息进入dead-queue
在这里插入图片描述

3.4 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false

1) 消息生产者代码同上生产者一致

2)C1 消费者代码 : 对某个消息进行拒绝

/**
 * @author houChen
 * @date 2022/11/12 20:33
 * @Description: 消费者c1代码
 *
 * 对 某个消息拒绝签收,并且不重新入队
 */
public class Consumer01 {

    private static final String NORMAL_EXCHANGE = "normal_exchange";

    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws Exception {

        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //正常队列绑定死信交换机
        Map<String, Object> params = new HashMap<>();
        //key是固定的
        params.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        params.put("x-dead-letter-routing-key", "lisi");

        //声明正常队列,绑定, 已经
        String normalQueueName = "normal-queue";
        channel.queueDeclare(normalQueueName, false, false, false, params);
        channel.queueBind(normalQueueName, NORMAL_EXCHANGE, "zhangsan");


        System.out.println("等待接收消息");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "utf-8");
            if(message.equals("info5")) {
                System.out.println("Consumer01接收到消息:" + message + ",并拒绝签收该消息");
                //param1: 消息的标记   param2: 被拒绝的消息是否重新入队
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);
            } else {
                System.out.println("Consumer01接收到消息:" + message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
            }
        };
        channel.basicConsume(normalQueueName, false, deliverCallback, consumerTag -> {
        });
    }
}

3)C2 消费者代码不变
启动消费者 1 然后再启动消费者 2

4)结果
消费者c1,会拒绝消费消息info5,并且拒绝消息重新入队
在这里插入图片描述
被拒绝消息会路由到 dead-queue
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/395871.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【自监督论文阅读笔记】What Makes for Good Views for Contrastive Learning?

Abstract 数据的多个视图之间的对比学习最近在自监督表示学习领域取得了最先进的性能。尽管取得了成功&#xff0c;但对不同视角选择的影响研究较少。在本文中&#xff0c;我们使用理论和实证分析来 更好地理解视图选择的重要性&#xff0c;并认为我们应该减少视图之间的互信息…

三分钟完成Stable Diffusion本地安装(零基础体验AI绘画)

三分钟完成Stable Diffusion本地安装前言安装步骤下载链接前言 最近AI绘画很火&#xff0c;很多无编程基础的小伙伴也想体验一下&#xff0c;所以写这篇博客来帮助小伙伴们愉快的体验一下~废话少说&#xff0c;我们直接开整&#xff01; 安装步骤 首先&#xff0c;下载本项目的…

电脑启动后显示器黑屏怎么办?排查下面4个问题,快速解决

电脑启动出现显示器黑屏是一个相当常见的问题。如果您遇到了这个问题&#xff0c;不要惊慌&#xff0c;因为它有很多可能的原因&#xff0c;可以采取一些简单的措施来解决它。在本文中&#xff0c;小编将介绍下面4种常见的电脑启动后显示器黑屏的原因&#xff0c;排查这些原因&…

合并两个有序链表(精美图示详解哦)

全文目录引言合并两个有序链表题目描述方法一&#xff1a;将第二个链表合并到第一个思路实现方法二&#xff1a;尾插到哨兵位的头节点思路实现总结引言 在前面两篇文章中&#xff0c;我们介绍了几道链表的习题&#xff1a;反转链表、链表的中间结点、链表的倒数第k个结点&…

基于单细胞多组学数据无监督构建基因调控网络

在单细胞分辨率下识别基因调控网络&#xff08;GRNs&#xff0c;gene regulatory networks&#xff09;一直是一个巨大的挑战&#xff0c;而单细胞多组学数据的出现为构建GRNs提供了机会。 来自&#xff1a;Unsupervised construction of gene regulatory network based on si…

力扣sql简单篇练习(二十四)

力扣sql简单篇练习(二十四) 1 各赛事的用户注册率 1.1 题目内容 1.1.1 基本题目信息 1.1.2 示例输入输出 a 示例输入 b 示例输出 1.2 示例sql语句 SELECT contest_id,ROUND(count(*)/(SELECT count(user_id) FROM Users)*100,2) percentage FROM Register GROUP BY contes…

MQTT协议-使用CONNECT报文连接阿里云

使用网络调试助手发送CONNECT报文连接阿里云 参考&#xff1a;https://blog.csdn.net/daniaoxp/article/details/103039296 在前面文章介绍了如何组装CONNECT报文&#xff0c;以及如何计算剩余长度 CONNECT报文&#xff1a;https://blog.csdn.net/weixin_46251230/article/d…

【C语言】详解静态变量static

关键字static 在C语言中&#xff1a;static是用来修饰变量和函数的static主要作用为:1. 修饰局部变量-静态局部变量 2. 修饰全局变量-静态全局变量3. 修饰函数-静态函数在讲解静态变量之前&#xff0c;我们应该了解静态变量和其他变量的区别: 修饰局部变量 //代码1 #include &l…

OpenTelemetry 实现方案

OpenTelemetry 有很多种组合和实现方案&#xff0c;我们分别来了解一下 OpenTelemetry 在三种不同技术架构下的使用方式。 1、OpenTelemetry to 开源工具组合 作为经典的对各种遥测数据的处理架构&#xff0c;开源工具可将不同类型的数据存储在不同的平台&#xff0c;比如日志…

倒立摆建模

前言 系统由一辆具有动力的小车和安装在小车上的倒立摆组成&#xff0c;系统是不稳定&#xff0c;我们需要通过控制移动小车使得倒立摆保持平衡。 具体地&#xff0c;考虑二维情形如下图&#xff0c;控制力为水平力FFF&#xff0c;输出为角度θ\thetaθ以及小车的位置xxx。 力…

【WebRTC---序篇】(五)信令逻辑

关于信令的几个问题 信令发送的过程信令发送的时机:用户点connec按钮;选中connect按钮后,按回车键; Windows会分发给消息处理机制,而触发OnDefaultAction中调用Conduction的StartLogin; StartLogin里面会调用pcc_client(信令模块)的Connect; 如果是域名,进行域名解析,之后…

SQL注入——布尔盲注

目录 一&#xff0c;盲注的概念 二&#xff0c;盲注分类 三&#xff0c;注入方法的选择 四&#xff0c;关键函数 五&#xff0c;实例 一&#xff0c;盲注的概念 页面没有报错回显&#xff0c;不知道数据库具体返回值的情况下&#xff0c;对数据库中的内容进行猜解&#x…

【历史上的今天】3 月 8 日:游戏机之父诞辰;搜索技术理论之父出生;MIT 公开演示旋风计算机

整理 | 王启隆 透过「历史上的今天」&#xff0c;从过去看未来&#xff0c;从现在亦可以改变未来。 今天是 2023 年 3 月 8 日&#xff0c;在 1857 年的今天&#xff0c;美国纽约制衣和纺织女工举行了首次大型抗议活动。妇女节是纪念妇女权利运动的国际性节日。设立国际妇女节…

【打卡-Coggle竞赛学习2023年3月】对话意图识别

学习链接&#xff1a; https://coggle.club/blog/30days-of-ml-202303 ## Part1 内容介绍 本月竞赛学习将以对话意图识别展开&#xff0c;意图识别是指分析用户的核心需求&#xff0c;错误的识别几乎可以确定找不到能满足用户需求的内容&#xff0c;导致产生非常差的用户体验…

2.6 棋盘覆盖

在一个2*x2‘个方格组成的棋盘中&#xff0c;若怡有一个方格与其他方格不同&#xff0c;则称该方格为特殊方格&#xff0c;且称该棋盘为一特殊棋盘。显然&#xff0c;特殊方格在棋盘上出现的位置有 4种情形因而对任何k0&#xff0c;有4‘种特殊棋盘。图2-4 申的特殊棋益是12时 …

【项目设计】高并发内存池(七)[性能测试和提升]

&#x1f387;C学习历程&#xff1a;入门 博客主页&#xff1a;一起去看日落吗持续分享博主的C学习历程博主的能力有限&#xff0c;出现错误希望大家不吝赐教分享给大家一句我很喜欢的话&#xff1a; 也许你现在做的事情&#xff0c;暂时看不到成果&#xff0c;但不要忘记&…

初学JavaScript有困难?看过来,详细安排

你肯定没有尝试归纳&#xff0c;可以把每天学习的内容&#xff0c;用思维导图整理归类&#xff0c;这样看着就清晰多了。把基础入门做成5天的学习计划&#xff0c;其实很简单&#xff0c;你可以参考以下内容 第一天学习目标&#xff1a; 1. 理解变量是存储数据的“容器” 2.…

Linux -- 磁盘存储管理 分区类型(MBR,GPT)

首先呢&#xff0c;大家要清楚&#xff0c;在 Linux 上&#xff0c;分区类型有两种 &#xff1a;一种是MBR, 一种 GPT ~&#xff01;&#xff01;&#xff01;我们所谓的分区、分盘&#xff0c;其实是一回事儿。分区&#xff0c;就是对磁盘划分 逻辑边界&#xff0c; 注意是逻辑…

LAY-EXCEL导出excel并实现单元格合并

通过lay-excel插件实现Excel导出&#xff0c;并实现单元格合并&#xff0c;样式设置等功能。更详细描述&#xff0c;请去lay-excel插件文档查看&#xff0c;地址&#xff1a;http://excel.wj2015.com/_book/docs/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B.html一、安装这里使用Vue…

带你感受一次JVM调优实战

本文分成两部分&#xff0c;先了解理论&#xff0c;然后再进行实战。 理论篇 1.1 调优目标 JVM调优的两大目标是&#xff1a; 提高应用程序的性能和吞吐量&#xff1a; 通过优化JVM的垃圾回收机制、调整线程池大小和优化代码&#xff0c;可以提高应用程序的性能和吞吐量。…