RabbitMQ个人理解与基本使用

news2025/7/19 12:44:01

目录

一. 作用:

二. RabbitMQ的5中队列模式:

1. 简单模式

2. Work模式

3. 发布/订阅模式

4. 路由模式

5. 主题模式

三. 消息持久化:

消息过期时间

ACK应答 

四. 同步接收和异步接收:

应用场景

五. 基本使用 :

引入依赖库:

配置文件RabbitMQConfig: 

创建消息任务类: 

解析:


一. 作用:

        RabbitMQ主要用于消息队列的实现。

二. RabbitMQ的5中队列模式:

1. 简单模式

一个生产者(发送方)对应一个消费者(接收方)

2. Work模式

一个生产者对应多个消费者,但是只能有一个消费者获得消息(排他)

3. 发布/订阅模式

一个消费者将消息首先发送到fanout交换器,交换器绑定到多个队列,然后与之对应的所有消费者都能接收到消息(不排他)

4. 路由模式

生产者将消息发送到direct交换器,交换器按照关键字(Key),把消息路由到某个队列

5. 主题模式

生产者将消息发送到Topic交换器,交换器按照复杂的规则,把消息路由到某个队列

三. 消息持久化:

        消息的可靠性是RabbitMQ的一大特色,那么RabbitMQ是如何保证消息可靠性的呢?答案就是消息持久化。持久化可以防止在异常情况下丢失数据。除了消息持久化之外,甚至交换器和队列都能持久化。也就是说rabbitmq的消息会被存储在磁盘上,只有当消费收到消息,rabbitmq确认消费者收到消息(Acknowledgments--简称ACK)后才会将消息从队列中删除。  

  • 消息过期时间

        如果消费者一直不接收消息,消息会一直保存在消息队列当中,短期内可能不会有什么影响,但是如果经过长时间的积累后,消息会变得很多很多 ,浪费大量的资源,内存。

        为了应对这种情况,就可以对rabbitmq设置消息的过期时间,在规定时间内消息没有被接收,就会删除掉该消息。

  • ACK应答 

        消费者接收到消息后,为了让RabbitMQ 知道,就需要返回一个ACK应答,告诉RabbitMQ消费者已经收到了消息,如果收到消息后我们需要删除该消息,只需要在ACK应答中加上deliveryTag标志位。

四. 同步接收和异步接收:

        同步接收:指消费者调用方法时,会阻塞来等待消息,直到消息被成功消费或者队列为空。(没有消息等待消息再接着处理)。

        异步接收: 指消费者不会在接收消息时阻塞,而是通过回调函数处理消息。消费者在等待消息的同时不会停下,可以处理其他任务。(当有消息时才来处理消息)。

  • 应用场景

        同步接收 :当消息的处理顺序对业务逻辑非常重要,就使用同步接收,消费者一次只处理一个消息,确保了每条消息的处理顺序。

        异步接收:当处理消息的时间比较长,或者系统的并发量大时,采用异步接收会更好。

RabbitMQ还有一个杀手锏——同时使用异步收发和同步收发。

五. 基本使用 :

引入依赖库:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.9.0</version>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency> 

配置文件RabbitMQConfig: 

import com.rabbitmq.client.ConnectionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {
    @Value("${rabbitmq.factoryHost}")
    private String host;

    @Bean
    public ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(host);
        factory.setPort(5672);
        return factory;
    }
}

 host配置,我将rabbitMQ放在虚拟机上的,所有ip是虚拟机的地址:

创建消息任务类: 

@Slf4j
@Component
public class MessageTask {
    @Autowired
    private ConnectionFactory factory;
    @Autowired
    private MessageService messageService;

    /*
    * 同步发送消息
    * */
    public void send(String topic, MessageEntity entity) {
        //向MongoDB保存消息数据,返回消息ID
        String id = messageService.insertMessage(entity);
        //向RabbitMQ发送消息
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()){
            //连接到某个topic
            channel.queueDeclare(topic, true, false, false, null);
            HashMap header = new HashMap();
            header.put("messageId",id);
            //创建AMQP协议参与对象,添加附加属性
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().headers(header).build();
            channel.basicPublish("",topic,properties,entity.getMsg().getBytes());
            log.debug("消息发送成功");
        } catch (Exception e){
            log.error(e.getMessage());
            throw new EmosException("向MQ发送消息失败");
        }
    }

    /*
    * 异步发送消息
    * */
    @Async("AsyncTaskExecutor")
    public void sendAsync(String topic, MessageEntity entity) {
        send(topic, entity);
    }

    /*
    * 同步接收消息
    * */
    public int receive(String topic) {
        int i = 0;
        try (//接收消息数据
             Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            // 从队列中获取消息,不自动确认
            channel.queueDeclare(topic, true, false, false, null);
            //Topic中有多少条数据未知,所以使用死循环接收数据,直到接收不到消息,退出死循环
            while (true) {
                //创建响应接收数据,禁止自动发送Ack应答
                GetResponse response = channel.basicGet(topic, false);
                if (response != null) {
                    AMQP.BasicProperties properties = response.getProps();
                    Map<String, Object> header = properties.getHeaders(); //获取附加属性对象
                    String messageId = header.get("messageId").toString();
                    byte[] body = response.getBody();//获取消息正文
                    String message = new String(body);
                    log.debug("从RabbitMQ接收的消息:" + message);
                    MessageRefEntity entity = new MessageRefEntity();
                    entity.setMessageId(messageId);
                    entity.setReceiverId(Integer.parseInt(topic));
                    entity.setReadFlag(false);
                    entity.setLastFlag(true);
                    messageService.insertRef(entity); //把消息存储在MongoDB中
                    //数据保存到MongoDB后,才发送Ack应答,让Topic删除这条消息
                    long deliveryTag = response.getEnvelope().getDeliveryTag();
                    channel.basicAck(deliveryTag, false);
                    i++;
                } else {
                    break; //接收不到消息,则退出死循环
                }
            }
        } catch (Exception e) {
            log.error("执行异常", e);
        }
        return i;
    }

    /*
    * 异步接收消息
    * */
    @Async
    public int receiveAsync(String topic) {
        return receive(topic);
    }

    /*
    * 同步删除消息
    * */
    public void deleteQueue(String topic) {
        try(//接收消息数据
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel()){
            channel.queueDelete(topic);
            log.debug("成功删除消息队列:"+topic);
        } catch (Exception e){
            log.error("删除消息队列失败:",e);
            throw new EmosException("删除消息队列失败");
        }
    }

    /*
    * 异步删除消息
    * */
    @Async
    public void deleteAsync(String topic) {
        deleteQueue(topic);
    }
}

解析:

channel.queueDeclare(String queueName, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
  • queueName:队列的名称,用于标识消息的存储位置。
  • durable:

        true,表示队列是持久化的。

        false,表示队列是非持久化的。

  • exclusive:

        true:队列仅供当前连接使用,连接断开时队列会自动删除。

        false:队列可供多个连接共享。

  • autoDelete:
    true:当队列不再被任何消费者订阅时,队列会自动删除。
    false:队列即使没有消费者订阅也会一直存在,直到手动删除。

  • arguments:额外的参数,null表示没有额外参数

Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 60000); // 设置队列中消息的过期时间为 60 秒(60000 毫秒)

channel.queueDeclare("myQueue", true, false, false, arguments);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261281.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Y3编辑器文档4:触发器1(界面及使用简介、变量作用域、入门案例)

文章目录 一、触发器简介1.1 触发器界面1.2 ECA语句编辑及快捷键1.3 参数设置1.4 变量设置1.5 实体触发器1.6 触发器复用 二、触发器的多层结构2.1 子触发器&#xff08;在游戏内对新的事件进行注册&#xff09;2.2 触发器变量作用域 三、入门案例3.1 使用触发器实现瞬间移动3.…

【DBeaver】连接带kerberos的hive[Apache|HDP]

目录 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 1.2 环境配置 二、基于Cloudera驱动创建连接 三、基于Hive原生驱动创建连接 一、安装配置Kerberos客户端环境 1.1 安装Kerberos客户端 在Kerberos官网下载,地址如下&#xff1a;https://web.mit.edu/kerberos…

bug:uniapp运行到微信开发者工具 白屏 页面空白

1、没有报错信息 2、预览和真机调试都能正常显示&#xff0c;说明代码没错 3、微信开发者工具版本已经是win7能装的最高版本了&#xff0c;1.05版 链接 不打算回滚旧版本 4、解决&#xff1a;最后改调试基础库为2.25.4解决了&#xff0c;使用更高版本的都会报错&#xff0c;所…

【前端】JavaScript自定义 forEach方法详解与原理分析

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: 前端 文章目录 &#x1f4af;前言&#x1f4af;题目演示与效果演示代码控制台输出结果 &#x1f4af;代码分析与源理解释1. 构造函数 Brray2. 实例化 Brray3. 调用自定义的 forEach &#x1f4af;比较与拓展1. 比较原…

基于卷积神经网络的Caser算法

将一段交互序列嵌入到一个以时间为纵轴的平面空间中形成“一张图”后&#xff0c;基于卷积序列嵌入的推荐&#xff08;Caser&#xff09;算法利用多个不同大小的卷积滤波器&#xff0c;来捕捉序列中物品间的点级&#xff08;point-level&#xff09;、联合的&#xff08;union-…

挑战一个月基本掌握C++(第三天)了解注释、数据类型、变量

一 C注释 程序的注释是解释性语句&#xff0c;您可以在 C 代码中包含注释&#xff0c;这将提高源代码的可读性。所有的编程语言都允许某种形式的注释。 C 支持单行注释和多行注释。注释中的所有字符会被 C 编译器忽略。 C 注释一般有两种&#xff1a; // - 一般用于单行注释…

211-基于FMC的1路1.5G ADC 1路 2.5G DAC子卡

一、板卡概述 FMC-1AD-1DA-1SYNC是我司自主研发的一款1路1G AD采集、1路2.5G DA回放的FMC、1路AD同步信号子卡。板卡采用标准FMC子卡架构&#xff0c;可方便地与其他FMC板卡实现高速互联&#xff0c;可广泛用于高频模拟信号采集等领域。 二、功能介绍 2.1 原理框图 2.2 硬件…

Cloudlog 电台日志系统 request_form SQL注入漏洞复现

0x01 产品简介 Cloudlog 是一个自托管的 PHP 应用程序,可让您在任何地方记录您的业余无线电联系人。使用PHP和MySQL构建的基于Web的业余无线电记录应用程序支持从HF到微波的一般站记录任务。 0x02 漏洞概述 Cloudlog request_form 接口存在未授权SQL注入漏洞,未经身份验证…

Jenkins容器使用宿主机Docker(五)

DevOps之安装和配置 Jenkins (一) DevOps 之 CI/CD入门操作 (二) Sonar Qube介绍和安装&#xff08;三&#xff09; Harbor镜像仓库介绍&安装 &#xff08;四&#xff09; Jenkins容器使用宿主机Docker&#xff08;五&#xff09; Jenkins流水线初体验&#xff08;六&#…

【大模型】LLaMA-2:Open Foundation and Fine-Tuned Chat Models, July. 2023.

论文&#xff1a;LLaMA-2&#xff1a;Open Foundation and Fine-Tuned Chat Models, July. 2023. 链接&#xff1a;https://arxiv.org/abs/2307.09288 Introduction 创新点 7B - 70B 预训练 微调 开源Llama 2 和Llama 2-Chat&#xff0c;针对对话用例进行了优化Motivation A…

Jmeter直连数据库,jar包下载

运行报错信息&#xff1a;jmeter连接mysql异常&#xff1a;Cannot load JDBC driver class ‘com.mysql.jdbc.Driver‘ 1、下载地址&#xff1a; https://mvnrepository.com/artifact/mysql/mysql-connector-java/ 2、将下载好的jar包 &#xff08;我的是&#xff1a;mysql-con…

安全攻击平台介绍

目录 XSS攻击平台 Attack API BeEF XSS-Proxy 漏洞平台 cnvd 阿里云漏洞库 攻防演练平台 XCTF 攻防平台 零日靶场&#xff08;0ops&#xff09; 安恒靶场&#xff08;赛宁安全&#xff09; XSS攻击平台 XSS Payload如此强大&#xff0c;为了使用方便&#xff0c;有安…

webpack处理图片资源

过去在Webpack4时&#xff0c;我们处理图片资源通过file-loader和url-loader进行处理。 现在Webpack5已经将两个Loader功能内置到Webpack里了&#xff0c;我们只需要简单配置即可处理图片资源。 1. 配置 //webpack.config.js const path require("path");module.…

QT数据库(四):QSqlRelationalTableModel 类

关系数据库概念 例如下列departments、majors、studInfo 这 3 个数据表之间存在关系。 主键与外键 标记“**”的是主键字段&#xff0c;标记“*”的是外键字段。主键字段是一个数据表中表示记录唯一性的字段&#xff0c;例如 studInfo 数据表中的 studID 字段。外键字段是与其…

深入C语言文件操作:从库函数到系统调用

引言 文件操作是编程中不可或缺的一部分&#xff0c;尤其在C语言中&#xff0c;文件操作不仅是处理数据的基本手段&#xff0c;也是连接程序与外部世界的重要桥梁。C语言提供了丰富的库函数来处理文件&#xff0c;如 fopen、fclose、fread、fwrite 等。然而&#xff0c;这些库…

【HarmonyOS NEXT】ArkTs数据类型解析与使用

1. 背景 为什么设计ArkTS&#xff1f; 1.1 其它语言有版权【Java&#xff1f;Kotlin&#xff1f;】以及历史问题【Java内存&#xff1f;】 1.2 生态&#xff0c;可复用前端生态的三方库&#xff0c;兼容JS/TS语言生态ArkTs解决了JS/TS中的哪些问题&#xff1f; 2.1 **程序健壮性…

H5 scss 移动端的样式适配

在移动端样式的scss文件中&#xff0c;出现了这些变量 env() 与 constant() 设置安全区域&#xff0c;是css里IOS11新增的属性&#xff0c;webkit的css函数&#xff0c;用于设定安全区域与边界的距离&#xff0c;有4个预定义变量&#xff1a; safe-area-inset-left: 安全区域距…

YOLOv5与ViT目标检测中的热力图应用教程

文章目录 前言一、热力图介绍1、热力图应用说明2、热力图代码整体思路3、实验效果二、heatmap类解读三、GradCAM、GradCAMPlusPlus, GradCAM, XGradCAM, EigenCAM, HiResCAM, LayerCAM等类源码解读1、GradCAM类源码2、BaseCAM类源码解读1、BaseCAM源码2、forward函数源码解读ou…

组织病理学图像的再识别|文献速递-生成式模型与transformer在医学影像中的应用

Title 题目 Re-identification from histopathology images 组织病理学图像的再识别 01 文献速递介绍 在光学显微镜下评估苏木精和伊红&#xff08;H&E&#xff09;染色切片是肿瘤病理诊断的标准程序。随着全切片扫描仪的出现&#xff0c;能够将玻璃切片数字化为所谓的…