(十)死信队列

news2025/8/3 23:50:20

死信队列

  • 1、概念
  • 2、死信产生的原因
  • 3、代码实现
    • 3.1. 流程图
    • 3.2. 消息TTL 过期
    • 3.3. 队列达到最大长度
    • 3.4. 消息被拒

1、概念

某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有
后续的处理,就变成了死信,有死信自然就有了死信队列

**应用场景:**为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效,这种就放入死信队列进行处理。

2、死信产生的原因

  • 消息 TTL(Time to Live存活时间) 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

3、代码实现

3.1. 流程图

在这里插入图片描述

3.2. 消息TTL 过期

消费者1 (最复杂的)

package com.feng.deadQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 15:09
 * @Version 1.0
 * @Description 死信队列消费者01
 */
public class Consumer01 {

    //普通交换机名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名字
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机:普通和死信
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        /**
         *  声明普通队列
         */
        //设置声明队列的参数
        Map<String, Object> arguments = new HashMap<>();
        //设置过期时间,单位是毫秒,这里设置10秒
        arguments.put("x-message-ttl",10000);
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信交换机的RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        channel.basicConsume(NORMAL_QUEUE,true,(String consumerTag, Delivery message) -> {
            System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));
            System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
        }, consumerTag -> {});
    }
}

生产者

package com.feng.deadQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 15:37
 * @Version 1.0
 * @Description 死信队列生产者
 */
public class ProductDead {
    //普通交换机名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //死信消息,设置ttl时间
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties()
                .builder()
                .expiration("10000")//单位是ms
                .build();
        for (int i = 1; i < 11; i++) {
            String msg = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
        }
    }
}

只需要启动消费者01再宕机,然后启动生产者发消息,消息内容就会从普通队列到死信队列

在这里插入图片描述
在这里插入图片描述

死信队列消费者

package com.feng.deadQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 15:09
 * @Version 1.0
 * @Description 死信队列消费者02
 */
public class Consumer02 {
    //死信交换机名字
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //死信队列名
    public static final String DEAD_QUEUE = "dead_queue";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        channel.basicConsume(DEAD_QUEUE,true,(String consumerTag, Delivery message) -> {
            System.out.println("消费者1接收消息是:" + new String(message.getBody(), "UTF-8"));
            System.out.println("绑定的路由是:"+message.getEnvelope().getRoutingKey());
        }, consumerTag -> {});
    }
}

3.3. 队列达到最大长度

声明队列的时候设置参数就行如下

//设置队列长度限制,这里是6
arguments.put("x-max-length",6);

注意此时需要把原先队列删除 因为参数改变了

在这里插入图片描述

发十条消息测试如下

在这里插入图片描述
这里注意超出队列长度进入死信队列的消息是先入队的消息

3.4. 消息被拒

  • 消息生产者代码同上生产者一致
  • C1 消费者代码(启动之后关闭该消费者 模拟其接收不到消息。让消息阻塞在队列中)
package com.feng.deadQueue;

import com.feng.utils.RabbitMQUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Delivery;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

/**
 * @Author Feng
 * @Date 2022/11/25 15:09
 * @Version 1.0
 * @Description 死信队列消费者01
 */
public class Consumer01 {

    //普通交换机名字
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名字
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列名
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列名
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //声明交换机:普通和死信
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
        /**
         *  声明普通队列
         */
        //设置声明队列的参数
        Map<String, Object> arguments = new HashMap<>();
        //设置过期时间,单位是毫秒,这里设置10秒
//        arguments.put("x-message-ttl",10000);
        //设置队列的死信交换机
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //设置死信交换机的RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
        //设置队列长度限制
//        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通交换机与队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        //绑定死信换机与死信队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        //开启手动应答
        channel.basicConsume(NORMAL_QUEUE,false,(String consumerTag, Delivery message) -> {
            String msg = new String(message.getBody(), "UTF-8");
            if("info5".equals(msg)){
                System.out.println("消费者1接收消息是:" + msg+"这个是拒绝消息");
                //第二个参数是不重新入队
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }
            System.out.println("消费者1接收消息是:" + msg);
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
        }, consumerTag -> {});
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/34714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

(九)RabbitMQ交换机(Exchange)

交换机Exchange1、交换机1.1. Exchanges 概念1.2. Exchanges 的类型1.3. 无名exchange&#xff08;默认交换机&#xff09;2、临时队列3、绑定&#xff08;bindings&#xff09;4、Fanout&#xff08;发布/订阅&#xff09;5、Direct exchange、6、Topics在这里插入图片描述1、…

学生HTML个人网页作业作品 基于HTML+CSS+JavaScript明星个人主页(15页)

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

0105 蓝桥杯真题08

/* * 中国古代文献中&#xff0c;曾记载过“大衍数列”, 主要用于解释中国传统文化中的太极衍生原理。 * 它的前几项是&#xff1a;0、2、4、8、12、18、24、32、40、50 ... * 其规律是&#xff1a;对偶数项&#xff0c;是序号平方再除2&#xff0c;奇数项&#xff0c;是序号…

【RT-Thread Studio更新】英飞凌 PSOC62-IFX-PROTO-KIT 开发环境搭建指南

本文将介绍在 RT-Thread Studio 上如何基于 PSOC62-IFX-PROTO-KIT 开发板搭建开发环境进行开发、烧录、调试功能。开发环境搭建步骤1、PSOC62-IFX-PROTO-KIT 开发板资源包安装打开Studio&#xff0c;点击工具栏上的SDK管理器在Board_Support_Packages 找到 Infineon 下的 PSOC6…

TOWER 成就徽章 NFT 系列介绍——TOWER 生态系统的第一个灵魂通证(SBT)

2022 年 7 月&#xff0c;团队推出了成就徽章 NFT 系列&#xff0c;记录每个成员在 TOWER 生态系统中的努力。这是第一个不可转让的灵魂 NFT 系列&#xff08;SBT&#xff09;&#xff0c;代表了每个玩家的独特身份。 关于灵魂通证&#xff08;SBT&#xff09; 以太坊联合创始人…

力扣(LeetCode)809. 情感丰富的文字(C++)

模拟 分析单词可扩张条件 : 对于某个字母&#xff0c;设目标字母长度 c1c1c1 &#xff0c;待扩张字母长度 c2c2c2 当 c1<c2c1<c2c1<c2&#xff0c;目标字母比待扩张字母少&#xff0c;false 当 c1≥c2c1\ge c2c1≥c2&#xff0c;目标字母比待扩张字母多或者相等&…

大数据开发——Hive实战案例

文章目录1. 创建表结构1.1 视频表结构1.2 用户表结构2. 准备工作2.1 创建临时表2.2 创建最终使用表2.3 对创建表进行解读3. 业务分析1. 创建表结构 1.1 视频表结构 1.2 用户表结构 2. 准备工作 2.1 创建临时表 由于使用的是orc方式进行存储&#xff0c;所以我们需要建立一个…

OpenFlow协议原理及基本配置-网络测试仪实操

一、OpenFlow协议原理 1.OpenFlow技术背景 ●转发和控制分离是SDN网络的本质特点之一。在SDN网络架构中&#xff0c;控制平面与转发平面分离&#xff0c;网络的管理和状态在逻辑上集中到一起&#xff0c;底层的网络基础从应用中独立出来&#xff0c;由此&#xff0c;网络获得…

不知道照片上怎么文字翻译成英文?来看看这篇文章

不知道你们在遇到看不懂的英文图片时&#xff0c;是不是和以前的我一样&#xff0c;一个一个的把图片内容输到翻译软件里&#xff0c;然后再进行翻译&#xff0c;其实这种办法不仅费时还费力&#xff0c;而且一旦遇到其它的外文就彻底没辙了&#xff0c;那怎么办呢&#xff1f;…

[附源码]java毕业设计音乐交流平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

我做数画ai绘画教程日赚过千,良心分享给想兼职赚钱的人

ai绘画能实现日赚过千&#xff0c;你信吗&#xff1f; 现在什么是风口&#xff1f;当然是ai绘画。而AI绘画里&#xff0c;什么最受欢迎&#xff0c;不用说&#xff0c;自然是二次元。 然后&#xff0c;很多人一拥而上&#xff0c;都去拍自己的二次元照片&#xff0c;然后在各…

10000m3d城镇生活污水处理工艺设计

目 录 1前 言 1 1.1 设计任务 2 1.2 设计目的 2 1.3 设计要求 2 1.4设计的数据以及资料 2 1.5 处理程度的计算 3 2 总体设计 5 2.1工艺比较的选择 5 2.2设计流量 8 2.2.1 设计规模 8 2.2.2 设计最大流量 8 2.3 格栅的设计计算 8 2.3.1格栅的作用及种类 8 2.3.2格栅的设计原则 8…

Linux系统编程(一)——环境搭建

准备写系统的总结Linux系统的一些知识以及Linux系统编程。这一篇先讲Linux搭建常用的开发环境。 目录 0x01 Linux开发环境搭建 一、远程链接操作 0x02 GCC 一、安装 二、了解GCC 0x03 静态库的制作及使用 一、库的介绍 二、静态库 0x04 动态库的制作和使用 一、配置…

Python遥感开发之批量掩膜和裁剪

Python遥感开发之批量掩膜和裁剪1.使用arcpy进行批量掩膜1.1 批量掩膜代码1.2 单个掩膜代码2.使用GDAL进行批量掩膜3.使用rasterio进行批量裁剪前言&#xff1a;主要介绍了使用arcpy、gdal、rasterio对遥感影像进行批量掩膜和裁剪。 1.使用arcpy进行批量掩膜 注意&#xff1a;…

Spring Boot——日志文件

文章目录1.日志的作用2.日志的用法3.自定义日志打印日志格式的说明4.日志级别5.在配置文件中设置日志级别5.1设置全局的日志级别和局部文件夹的日志级别6. 日志持久化7. 更简单的日志输出-lombok1.日志的作用 日志的作用&#xff1a;用来排除和定位问题 通过日志还可以具有以…

Greenplum数据库故障排查及修复

场景一&#xff1a;gp服务正常&#xff0c;存在部分segment实例丢失 1、异常现象 主节点切换gpadmin用户输入gpstate查看状态 如果红色框内有指向左边的箭头则说明存在部分segment实例丢失。 2、排查思路 首先查看主节点日志&#xff0c;重点关注发生segment丢失那段时间的…

【ML特征工程】第 3 章 :文本数据:扁平化、过滤和分块

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

关于SD-WAN的十问十答(最强攻略戳这里)

1. WAN和SD-WAN之间的区别&#xff1f; 从底层来看&#xff0c;相较基于硬件物理设施的WAN&#xff0c;SD-WAN是一种覆盖现有网络的软件技术&#xff0c;是部署在物理基础设施下层的流量管理网络。 和常规WAN相比&#xff0c;SD-WAN具有虚拟WAN体系结构和软件驱动技术&#xff…

[附源码]java毕业设计圆梦山区贫困学生助学系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

EasyRecovery适用于Windows和Mac的专业硬盘恢复软件

电脑中的数据文件对很多的小伙伴来说都是非常重要的&#xff0c;在下载安装新的软件设备时都需要非常谨慎&#xff0c;一旦碰到一些问题就可能会导致文件丢失&#xff0c;想要恢复这些文件并不是很容易&#xff0c;需要使用专业的数据恢复工具才可以对其进行恢复&#xff0c;那…