RabbitMQ消息队列笔记

news2025/7/27 20:45:09

目录

docker

Java

导包

配置文件

Work Queues

消息堆积

消息生产者发送消息到队列

 消息消费者接收消息

Fanout交换机

Direct交换机发送消息

用Java代码创建交换机和队列、绑定

Direct交换机

Direct交换机发送消息

 用Java代码创建交换机和队列、绑定

基于注解声明队列和交换机

Topic交换机

Topic交换机发送消息

发送对象类型的消息

使用json序列化代替默认的jdk序列化

对象类型的消息对象的接收


RabbitMQ是一个高性能的异步通信组件;

docker

我部署在docker下

docker run \
-e RABBITMQ_DEFAULT_USER=root \ #用户名
-e RABBITMQ_DEFAULT_PASS=root \ #密码
-v mq-plugins:/plugins \ #目录挂载,mq-plugins是数据卷
--name mq \ #容器名
--hostname mq \ #主机名
-p 15672:15672 \ #端口映射
-p 5672:5672 \
--network hmall \ #网咯
-d \
rabbitmq:3.8-management

15672端口是控制台端口,图形化界面;

5672端口是通信的端口,发收消息用的;

publisher 消息的发送者

exchange 交换机,复制路由消息

queue 队列,存储信息

consumer 消息的消费者

virtual-host 虚拟主机,起数据隔离作用;每个项目可以建一个自己的虚拟主机;

Java

AMQP 是用于在应用程序之间传递业务消息的开放标准。该协议与平台无关,更符合微服务中独立性的要求;

Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,是抽象;底层的实现默认是Spring-rabbitmq;

导包

<!--AMQP依赖,包含RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

消息提供者/消费者的配置文件

spring:
  rabbitmq:
    host: 192.168.88.130 # 主机名
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

Work Queues

消息堆积

默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否以及处理完消息,可能会出现消息的堆积;

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一个消息

消息生产者发送消息到队列

生产者发送消息到队列而不是交换机;

@Autowired
private RabbitTemplate rabbitTemplate;
    @Test
    void testSendMessage2Queue() {
        String queueName = "simple.queue";
        String msg = "hello ampq";
        rabbitTemplate.convertAndSend(queueName, msg);
    }
}

 消息消费者接收消息

@Slf4j
@Component
public class MQListener {
    @RabbitListener(queues = {"simple.queue"})
    public void listenSimpleQueue(String msg) {
        System.out.println("消费者收到了simple.queue队列的消息:【" + msg + "】");
    }
}

Fanout交换机

Fanout交换机会把接收到的消息广播到每一个跟其绑定的队列,所以也叫广播模式;

Direct交换机发送消息

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendFanout() {
        String exchangeName = "hmall.fanout";
        String msg = "hello every";
        rabbitTemplate.convertAndSend(exchangeName, null, msg);
    }
}

因为接收消息是一样的,所以这里就不赘述了;

用Java代码创建交换机和队列、绑定

一般在消费者项目声明

@Configuration
public class FanoutConfig {
    /**
     * 声明fanout交换机
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("hmall.fanout");
    }

    /**
     * 第二种声明fanout交换机的写法
     *
     * @return
     */
    @Bean
    public FanoutExchange fanoutExchange1() {
        return ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
    }

    /**
     * 声明一个队列
     *
     * @return
     */
    @Bean
    public Queue fanoutQueue() {
        return new Queue("fanout.queue1");
    }

    /**
     * 第二种声明队列的写法
     *
     * @return
     */
    @Bean
    public Queue fanoutQueue2() {
        // 持久化
        return QueueBuilder.durable("fanout.quque2").build();
    }

    /**
     * 绑定队列和交换机
     *
     * @param fanoutQueue
     * @param fanoutExchange
     * @return
     */
    @Bean
    public Binding bindingQueue1(Queue fanoutQueue, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
    }

    /**
     * 第二种绑定队列和交换机的方法
     *
     * @return
     */
    @Bean
    public Binding bindingQueue2() {
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
    }
}

Direct交换机

Direct交换机会将接收到的消息根据规则路由到指定的队列,因此称为定向路由;

每一个队列都与一个交换机设置一个BindingKey;

发布者发布消息时,指定消息的RoutingKey;

交换机将消息路由到BindingKey与RoutingKey一致的队列;

将BindingKey写成一致就可以实现广播消息的功能; 

Direct交换机发送消息

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendDirect() {
        String exchangeName = "hmall.direct";
        String routingKey = "blue";
        String msg = "hello every";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

 用Java代码创建交换机和队列、绑定

@Configuration
public class DirectConfig {
    /**
     * 声明direct交换机
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("hmall.direct");
    }

    /**
     * 第二种声明direct交换机的写法
     *
     * @return
     */
    @Bean
    public DirectExchange directExchange1() {
        return ExchangeBuilder.directExchange("hmall.direct2").build();
    }

    /**
     * 声明一个队列
     *
     * @return
     */
    @Bean
    public Queue directQueue() {
        return new Queue("direct.queue1");
    }

    /**
     * 第二种声明队列的写法
     *
     * @return
     */
    @Bean
    public Queue directQueue2() {
        // 持久化
        return QueueBuilder.durable("direct.quque2").build();
    }

    /**
     * 绑定队列和交换机
     *
     * @param directQueue
     * @param directExchange
     * @return
     */
    @Bean
    public Binding bindingQueue3(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("red");
    }

    @Bean
    public Binding bindingQueue4(Queue directQueue, DirectExchange directExchange) {
        return BindingBuilder.bind(directQueue).to(directExchange).with("yellow");
    }
}

基于注解声明队列和交换机

@Slf4j
@Component
public class MQListener {
    @RabbitListener(bindings = {@QueueBinding(
            value = @Queue(name = "direct.queue3", declare = "true"),
            exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
            key = {"red", "blue"}
    )})
    public void listenDirectQueue3(String msg) {
        System.out.println("消费者3收到了direct.queue3队列的消息:【" + msg + "】");
    }
}

Topic交换机

类似于direct交换机,与direct交换机的区别是,topic交换机的routingKey可以是多个单词的列表,并以 . 分割;

队列和交换机指定BindingKey时可以使用通配符;

# 代表0个或多个单词

* 代表一个单词

Topic交换机发送消息

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendTopic() {
        String exchangeName = "hmall.topic";
        String routingKey = "china.news";
        String msg = "这是一条消息通知";
        rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
    }
}

发送对象类型的消息

@SpringBootTest
public class SpringAmqpTest {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    void testSendObject() {
        Map<String, Object> msg = new HashMap<>();
        msg.put("name", "jack");
        msg.put("age", 21);
        rabbitTemplate.convertAndSend("object.queue", msg);
    }
}

使用json序列化代替默认的jdk序列化

<!--jackson-->
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>
@Configuration
public class MessageConverterConfig {
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

对象类型的消息对象的接收

@Slf4j
@Component
public class MQListener {
    @RabbitListener(queues = {"object.queue"})
    public void listenObjecQueue(Map<String, Object> msg) {
        System.out.println("消费者收到了object.queue队列的消息:【" + msg + "】");
    }
}

消息的可靠性质

发送者的可靠性

消息发送时丢了

生产者重连

spring:
  rabbitmq:
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
        max-attempts: 3 # 最大重试次数

生产者确认

开启确认机制后,在MQ成功后收到消息后会返回确认消息给生产者。返回的结果有一下几种情况;

消息到了MQ,但路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功;

临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功;

持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功;

其他情况都会返回NACK,告知投递失败;

配置文件

spring:
  rabbitmq:
    publisher-confirm-type: correlated # MQ异步回调的方式返回回执消息
    publisher-returns: true # 开启返回机制

配置类

@Slf4j
@Configuration
public class MqConfig implements ApplicationContextAware {
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
        // 配置回调
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returnedMessage) {
                log.debug("收到消息的回调,exchange:{},key:{},msg:{},code:{},text:{}", returnedMessage.getExchange(),
                        returnedMessage.getRoutingKey(), returnedMessage.getMessage(),
                        returnedMessage.getReplyCode(), returnedMessage.getReplyText());
            }
        });
    }
}

发送消息

@Test
void testConfirmCallback() throws InterruptedException {
    // 1、创建id
//        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
    CorrelationData cd = new CorrelationData();
    // 2、添加confirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            log.error("消息回调失败", ex);
        }

        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            log.debug("收到confirm callback回执");
            if (result.isAck()) {
                log.info("消息发送成功,收到ack");
            } else {
                log.error("消息发送失败,收到nack,原因:{}", result.getReason());
            }
        }
    });
    rabbitTemplate.convertAndSend("hmall.direct", "red", "hello");
    Thread.sleep(2000);
}

但是上面的代码我尝试了但回调方法始终没有触发;

MQ的可靠性

mq把消息丢了

在默认情况下,RabbitMQ会将接收到的消息保存在内存以降低消息的收发延迟。这样会导致两个问题:

一旦MQ宕机,内存中的消息会丢失;

内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞;

数据持久化

交换机的持久化

@Test
void testPageOut() {
    Message msg = MessageBuilder
            .withBody("hello".getBytes(StandardCharsets.UTF_8))
            .setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); // PERSISTENT持久化
    for (int i = 0; i < 1E6; i++) {
        rabbitTemplate.convertAndSend("simple.queue", msg);
    }
}

Lazy Queue

惰性队列

接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条);

消费者要消费消息时才会从磁盘中读取并加载到内存;

支持数百万的消息存储;

3.12版本后,所有队列都是惰性队列,无法更改;

消费者的可靠性

消费者把消息丢了

消费者确认机制

当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

ack 成功处理消息,RabbitMQ从队列中删除该消息;

nack 消息处理失败,RabbitMQ需要再次投递消息;

reject 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息;

SpringAMQP有三种ack方式

none 不处理,即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用;

manual 手动模式,需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活;

auto 自动模式,SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果。

如果是业务异常,会自动返回nack;

如果是消息处理或校验异常,自动返回reject;

消息消费者配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

延时消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1156700.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Rust 语言常见的一些概念(上)

目录 1、变量的可变性 常量 隐藏 2、数据类型 2.1 标量类型 整型 浮点型 数值运算 布尔型 字符类型 复合类型 元组类型 数组类型 1、变量的可变性 变量默认是不可改变的&#xff08;immutable&#xff09;。这是 Rust 提供给你的众多优势之一&#xff0c;让你得以…

win32 读写UTF-8格式的文件的方法

1&#xff0c;写入数据 最开始是在写入数据前先写入三个字节 BYTE btHead[] { 0xEF,0xBB,0xBF }; ::WriteFile(hFile, btHead, 3, &dwWrite, 0); ::WriteFile(hFile, str, lstrlen(str)*sizeof(TCHAR), &dwWrite, 0);这样写入后文件样式为&#xff1a; 格式是UTF-8…

基于Python制作一个动物识别小程序

目录 引言研究背景目的与意义 动物识别技术概述基本原理图像处理与特征提取机器学习与深度学习方法 数据集与数据预处理数据收集与构建数据预处理步骤数据增强技术 特征提取与选择基础特征提取方法特征选择与降维 引言 研究背景 动物识别是计算机视觉和模式识别领域的重要研究…

《深入浅出OCR》实战:基于CRNN的文字识别

✨专栏介绍: 经过几个月的精心筹备,本作者推出全新系列《深入浅出OCR》专栏,对标最全OCR教程,具体章节如导图所示,将分别从OCR技术发展、方向、概念、算法、论文、数据集等各种角度展开详细介绍。 💙个人主页: GoAI |💚 公众号: GoAI的学习小屋 | 💛交流群: 7049325…

在python中加载tensorflow-probability模块和numpy模块

目录 操作步骤&#xff1a; 注意&#xff1a; 问题&#xff1a; 解决办法&#xff1a; 操作步骤&#xff1a; 在虚拟环境的文件夹中&#xff0c;找到Scripts文件夹&#xff0c;点击进去&#xff0c;找到地址栏&#xff0c;在地址栏中输入cmd&#xff0c;进入如下界面。 输…

国产数据库兼容过程中涉及的MySQL非严格模式

点击上方蓝字关注我 在国产数据库兼容适配过程中&#xff0c;经常遇到因源数据库是MySQL&#xff0c;迁移至其他国产数据库后&#xff0c;因MySQL端兼容模式有非严格模式&#xff0c;导致适配过程过程中需要做调整。那么&#xff0c;MySQL主要的非严格模式小结如下&#xff1a;…

约会杭州云栖2023:为了无法计算的价值一起努力

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;CSDN领军人物&#xff0c;全栈领域优质创作者✌&#xff0c;CSDN博客专家&#xff0c;阿里云社区专家博主&#xff0c;2023年6月CSDN上海赛道top4。 &#x1f3c6;数年电商行业从业经验&#xff0c;历任核心研发工程师…

C++初阶 类和对象(上)

前言&#xff1a;C初阶系列&#xff0c;每一期博主都会使用简单朴素的语言将对应的知识分享给大家&#xff0c;争取让所有人都可以听懂&#xff0c;C初阶系列会持续更新&#xff0c;上学期间将不定时更新&#xff0c;但总会更的 目录 一、什么是面向对象编程 二、什么是类和如…

AST反混淆实战|变种ob混淆还原指南一

关注它&#xff0c;不迷路。 本文章中所有内容仅供学习交流&#xff0c;不可用于任何商业用途和非法用途&#xff0c;否则后果自负&#xff0c;如有侵权&#xff0c;请联系作者立即删除&#xff01; 1.需求 ob混淆是我们最常见的混淆代码&#xff0c;标准的混淆 可以用星…

如何读懂深度学习python项目,以`Multi-label learning from single positive label`为例

Paper : Multi-label learning from single positive label Code 先读一读README.md 可能有意想不到的收获&#xff1b; 实验环境设置要仔细看哦&#xff01; 读论文 如何读论文&#xff0c;Readpaper经典十问 &#xff08;可能在我博客里有写&#xff09; How to read a …

【UE5】如何在UE5.1中创建级联粒子系统

1. 可以先新建一个actor蓝图&#xff0c;然后在该蓝图中添加一个“Cascade Particle System Component” 2. 在右侧的细节面板中&#xff0c;点击“模板”一项中的下拉框&#xff0c;然后点击“Cascade粒子系统&#xff08;旧版&#xff09;” 然后就可以选择在哪个路径下创建级…

Notepad++下载、使用

下载 https://notepad-plus-plus.org/downloads/ 安装 双击安装 选择安装路径 使用 在文件夹中搜索 文件类型可以根据需要设置 如 *.* 说明是所有文件类型&#xff1b; *.tar 说明是所有文件后缀是是tar的文件‘&#xff1b;

【Rust日报】2023-10-30 理解 Rust 中的生命周期

理解 Rust 中的生命周期 生命周期&#xff08;Lifetime&#xff09;是让 Rust 成为 Rust 的关键因素。 没有了生命周期&#xff0c;轻松的并发、直接的内存分配和整体的数据安全都是不可能的。 但是&#xff0c;生命周期也很难理解&#xff0c;这篇教程会帮助人们理解生命周期的…

【强化学习】13 —— Actor-Critic 算法

文章目录 REINFORCE 存在的问题Actor-CriticA2C&#xff1a; Advantageous Actor-Critic代码实践结果 参考 REINFORCE 存在的问题 基于片段式数据的任务 通常情况下&#xff0c;任务需要有终止状态&#xff0c;REINFORCE才能直接计算累计折扣奖励 低数据利用效率 实际中&#…

【Java】多线程案例(单例模式,阻塞队列,定时器,线程池)

❤️ Author&#xff1a; 老九 ☕️ 个人博客&#xff1a;老九的CSDN博客 &#x1f64f; 个人名言&#xff1a;不可控之事 乐观面对 &#x1f60d; 系列专栏&#xff1a; 文章目录 实现安全版本的单例模式饿汉模式类和对象的概念类对象类的静态成员与实例成员 懒汉模式如何保证…

C++设计模式_21_Iterator 迭代器(理解;面向对象的迭代器已过时;C++中使用泛型编程的方式实现)

Iterator 迭代器也是属于“数据结构”模式。GoF中面向对象的迭代器已经过时&#xff0c;C中目前使用泛型编程的方式实现&#xff0c;其他语言还在使用面向对象的迭代器。 文章目录 1. 动机(Motivation)2. 模式定义3. Iterator 迭代器代码分析4. 面向对象的迭代器与泛型编程实现…

一天写一个(前端、后端、全栈)个人简历项目(附详源码)

一、项目简介 此项目是用前端技术HTMLCSSjquery写的一个简单的个人简历项目模板&#xff0c;图片可点击放大查看&#xff0c;还可以直接下载你的word或者PDF的简历模板。 如果有需要的同学可以直接拿去使用&#xff0c;需自行填写个人的详细信息&#xff0c;发布&#xff0c;…

uniapp 开发微信小程序 v-bind给子组件传递函数,该函数中的this不是父组件的二是子组件的this

解决办法&#xff1a;子组件通过缓存子组件this然后&#xff0c;用bind改写this 这个方法因为定义了全局变量that 那么该变量就只能用一次&#xff0c;不然会有赋值覆盖的情况。 要么就弃用v-bind传入函数,改为emit传入自定义事件 [uniapp] uview(1.x) 二次封装u-navbar 导致…

程序开发设计原则

&#xff08;图片来自网络&#xff09; 单一职责 Single Responsibility Principle 不论是在设计类&#xff0c;接口还是方法&#xff0c;单一职责都会处处体现&#xff0c;单一职责的定义&#xff1a;我们把职责定义为系统变化的原因。所有在定 义类&#xff0c;接口&#xff…

CV2 将图片中某个点与中心点的角度变换成0-360度

众所周知&#xff0c;CV2中的坐标方向是这样的&#xff1a; 所以一般我们想计算图片中某个点P1(x1,y1)与中心点P0(x0,y0)的方向时&#xff0c;我们会先将y坐标翻上去,然后计算角度。即&#xff1a; p1_xint(x1) # p1_yint(y1)p0_xint(x0) #图像大小为512*512中心点坐标为25…