spring中的@RabbitListener注解详解

news2025/6/9 20:54:28

    • 基本用法
    • 主要属性
      • 1. queues / queueNames
      • 2. containerFactory
      • 3. id
      • 4. concurrency
      • 5. ackMode
      • 6. priority
      • 7. bindings
    • 高级特性
      • 1. 消息转换器
      • 2. 手动确认
      • 3. 条件监听
      • 4. 错误处理
    • 配置监听容器工厂
    • 注意事项
    • 完整示例
    • 循环依赖解决
      • 1. 使用 Setter 注入
      • 2. 使用 `@Lazy` 注解
      • 3. 重构代码结构
      • 4. 使用事件驱动架构
      • 5. 使用 `@PostConstruct` 方法
    • @RabbitListener注解与@Profile注解组合使用
      • 基本用法
      • 配置文件激活
        • 1. 在 `application.properties` 或 `application.yml` 中设置
        • 2. 在启动应用程序时通过命令行参数设置
        • 3. 在 IDE 中设置
      • 高级用法
      • 注意事项
    • @RabbitListener的autoStartup配置说明
      • 作用
      • 使用场景
      • 示例代码
      • 手动启动监听器


@RabbitListener 是 Spring AMQP 框架中的一个核心注解,用于简化 RabbitMQ 消息监听器的开发。下面详细介绍这个注解的用法和特性。

基本用法

@RabbitListener 可以标注在方法上,表示该方法是一个 RabbitMQ 消息监听器。

@Component
public class MyRabbitListener {
    
    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        System.out.println("Received <" + message + ">");
    }
}

在这里插入图片描述

主要属性

1. queues / queueNames

指定要监听的队列名称:

@RabbitListener(queues = "myQueue")
// 或
@RabbitListener(queueNames = "myQueue")

可以监听多个队列:

@RabbitListener(queues = {"queue1", "queue2"})

2. containerFactory

指定使用的消息监听容器工厂:

@RabbitListener(queues = "myQueue", containerFactory = "myFactory")

3. id

为监听器指定一个唯一ID:

@RabbitListener(id = "myListener", queues = "myQueue")

4. concurrency

设置并发消费者数量:

@RabbitListener(queues = "myQueue", concurrency = "3-5")

5. ackMode

设置确认模式:

@RabbitListener(queues = "myQueue", ackMode = "MANUAL")

6. priority

设置监听器优先级:

@RabbitListener(queues = "myQueue", priority = "10")

7. bindings

使用绑定器配置(更复杂的路由配置):

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(value = "myQueue", durable = "true"),
    exchange = @Exchange(value = "myExchange", type = ExchangeTypes.TOPIC),
    key = "myRoutingKey"
))

高级特性

1. 消息转换器

可以指定消息转换器:

@RabbitListener(queues = "myQueue")
public void receiveMessage(@Payload MyObject obj, 
                          @Header(AmqpHeaders.CONTENT_TYPE) String contentType) {
    // ...
}

2. 手动确认

@RabbitListener(queues = "myQueue")
public void receiveMessage(String message, Channel channel, 
                          @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    try {
        // 处理消息
        channel.basicAck(tag, false);
    } catch (Exception e) {
        channel.basicNack(tag, false, true);
    }
}

3. 条件监听

使用 SpEL 表达式控制是否处理消息:

@RabbitListener(queues = "myQueue", condition = "headers['type']=='important'")
public void receiveImportantMessage(String message) {
    // 只处理带有特定header的消息
}

4. 错误处理

可以结合 @RabbitHandler@ServiceActivator 实现错误处理:

@RabbitListener(queues = "myQueue")
public void receiveMessage(String message) {
    // ...
}

@ServiceActivator(inputChannel = "errorChannel")
public void handleError(ErrorMessage errorMessage) {
    // 处理错误
}

配置监听容器工厂

通常需要配置一个 SimpleRabbitListenerContainerFactory

@Configuration
public class RabbitConfig {
    
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setPrefetchCount(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
}

注意事项

  1. 监听器方法应该是 public 的
  2. 方法可以有多个参数,Spring 会尝试匹配消息头和内容
  3. 默认情况下,消息会自动确认,除非抛出异常
  4. 对于 POJO 消息,需要配置适当的消息转换器

完整示例

@Configuration
public class RabbitConfig {
    
    @Bean
    public Queue myQueue() {
        return new Queue("myQueue", true);
    }
    
    @Bean
    public TopicExchange myExchange() {
        return new TopicExchange("myExchange");
    }
    
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(myQueue())
                            .to(myExchange())
                            .with("myRoutingKey");
    }
    
    @Bean
    public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(3);
        return factory;
    }
}

@Component
public class MyRabbitListener {
    
    @RabbitListener(
        queues = "myQueue",
        containerFactory = "myFactory",
        ackMode = "MANUAL"
    )
    public void receiveMessage(
            @Payload MyObject obj,
            @Header(AmqpHeaders.DELIVERY_TAG) long tag,
            Channel channel) throws IOException {
        try {
            // 处理消息
            System.out.println("Received: " + obj);
            channel.basicAck(tag, false);
        } catch (Exception e) {
            channel.basicNack(tag, false, true);
        }
    }
}

@RabbitListener 注解极大地简化了 RabbitMQ 消费者的开发,通过合理的配置可以满足各种消息处理场景的需求。

循环依赖解决

在使用 Spring 的 @RabbitListener 注解时,循环依赖问题通常发生在监听器方法中直接或间接地引用了其他需要注入的 Bean,而这些 Bean 又依赖于监听器本身或其相关组件。以下是解决这类循环依赖问题的几种方法:

1. 使用 Setter 注入

Setter 注入可以帮助解决循环依赖问题,因为 Spring 可以在创建 Bean 后设置依赖关系。

@Component
public class MyService {
    private MyListener myListener;

    @Autowired
    public void setMyListener(MyListener myListener) {
        this.myListener = myListener;
    }
}

@Component
public class MyListener {
    private final MyService myService;

    @Autowired
    public MyListener(MyService myService) {
        this.myService = myService;
    }

    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        myService.processMessage(message);
    }
}

2. 使用 @Lazy 注解

@Lazy 注解可以延迟 Bean 的初始化,从而打破循环依赖。

@Component
public class MyListener {
    private final MyService myService;

    @Autowired
    public MyListener(@Lazy MyService myService) {
        this.myService = myService;
    }

    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        myService.processMessage(message);
    }
}

3. 重构代码结构

重构代码以消除循环依赖。这通常涉及将共享逻辑提取到一个单独的 Bean 中,或者重新设计组件之间的交互。

@Component
public class MessageProcessor {
    public void processMessage(String message) {
        // 处理消息的逻辑
    }
}

@Component
public class MyListener {
    private final MessageProcessor messageProcessor;

    @Autowired
    public MyListener(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        messageProcessor.processMessage(message);
    }
}

@Component
public class MyService {
    private final MessageProcessor messageProcessor;

    @Autowired
    public MyService(MessageProcessor messageProcessor) {
        this.messageProcessor = messageProcessor;
    }

    public void someMethod() {
        // 使用 messageProcessor
    }
}

4. 使用事件驱动架构

通过引入事件驱动架构,可以将监听器与业务逻辑解耦。监听器只负责发布事件,具体的业务逻辑由事件监听器处理。

@Component
public class MyListener {
    private final ApplicationEventPublisher eventPublisher;

    @Autowired
    public MyListener(ApplicationEventPublisher eventPublisher) {
        this.eventPublisher = eventPublisher;
    }

    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        eventPublisher.publishEvent(new MessageReceivedEvent(message));
    }
}

@Component
public class MessageEventListener {
    @EventListener
    public void handleMessageEvent(MessageReceivedEvent event) {
        // 处理消息事件
    }
}

5. 使用 @PostConstruct 方法

在某些情况下,可以使用 @PostConstruct 方法来初始化依赖关系。

@Component
public class MyListener {
    private MyService myService;

    @Autowired
    public void setMyService(MyService myService) {
        this.myService = myService;
    }

    @PostConstruct
    public void init() {
        // 确保在初始化时设置好所有依赖
    }

    @RabbitListener(queues = "myQueue")
    public void receiveMessage(String message) {
        myService.processMessage(message);
    }
}

通过以上方法,可以有效地解决在使用 @RabbitListener 注解时可能遇到的循环依赖问题。选择哪种方法取决于具体的应用场景和架构设计。

@RabbitListener注解与@Profile注解组合使用

在 Spring 中,@RabbitListener@Profile 注解可以组合使用,以便根据不同的环境或配置文件来启用或禁用特定的消息监听器。这在需要根据环境(如开发、测试、生产)来调整消息处理逻辑时非常有用。

基本用法

你可以将 @Profile 注解与 @RabbitListener 注解一起使用,以便只有在特定配置文件激活时,监听器才会被注册和运行。

@Component
@Profile("dev") // 只有在 "dev" 配置文件激活时,这个 Bean 才会被创建
public class DevRabbitListener {

    @RabbitListener(queues = "devQueue")
    public void receiveMessage(String message) {
        System.out.println("Dev environment received: " + message);
    }
}

@Component
@Profile("prod") // 只有在 "prod" 配置文件激活时,这个 Bean 才会被创建
public class ProdRabbitListener {

    @RabbitListener(queues = "prodQueue")
    public void receiveMessage(String message) {
        System.out.println("Prod environment received: " + message);
    }
}

配置文件激活

要激活特定的配置文件,可以在应用程序启动时设置 spring.profiles.active 属性。这可以通过多种方式实现:

1. 在 application.propertiesapplication.yml 中设置
spring.profiles.active=dev
2. 在启动应用程序时通过命令行参数设置
java -jar myapp.jar --spring.profiles.active=dev
3. 在 IDE 中设置

在 IntelliJ IDEA 或 Eclipse 中,可以在运行配置中设置激活的配置文件。

高级用法

如果需要更复杂的逻辑,可以使用 @ConditionalOnProperty 或其他条件注解来进一步控制监听器的行为。

@Component
@Profile("dev")
public class DevRabbitListener {

    @RabbitListener(queues = "devQueue")
    public void receiveMessage(String message) {
        System.out.println("Dev environment received: " + message);
    }
}

@Component
@Profile("prod")
public class ProdRabbitListener {

    @RabbitListener(queues = "prodQueue")
    public void receiveMessage(String message) {
        System.out.println("Prod environment received: " + message);
    }
}

@Component
@Profile("!dev & !prod") // 默认情况下激活,如果 dev 和 prod 都没有激活
public class DefaultRabbitListener {

    @RabbitListener(queues = "defaultQueue")
    public void receiveMessage(String message) {
        System.out.println("Default environment received: " + message);
    }
}

注意事项

  • 确保在配置文件中正确设置了激活的配置文件,否则相关的监听器不会被注册。
  • 使用 @Profile 注解时,确保在应用程序上下文初始化时配置文件已经被正确加载。
  • 可以通过组合多个配置文件来实现更复杂的条件逻辑。

通过将 @RabbitListener@Profile 注解结合使用,可以灵活地管理不同环境下的消息处理逻辑,使应用程序在不同环境中表现不同的行为。

@RabbitListener的autoStartup配置说明

@RabbitListenerautoStartup 配置用于控制监听器容器在应用启动时是否自动启动,以下是详细说明:

作用

autoStartup 属性是一个布尔值,默认情况下其值为 true,表示监听器容器将会在应用启动时自动启动,开始监听指定队列中的消息。当设置为 false 时,监听器容器在应用启动时不会自动启动,需要手动触发启动。

使用场景

  • 测试场景:在测试环境中,可能需要关闭某些监听器,以便单独测试其他组件,或者模拟特定的消息处理场景。此时可以将 autoStartup 设置为 false,在需要时再手动启动监听器。
  • 条件性启动:根据某些条件(如配置文件中的参数、环境变量等)来决定是否启动监听器。例如,只有在特定的配置文件激活时才启动某个监听器,可以通过结合 @Profile 注解和 autoStartup 属性来实现。
  • 资源优化:如果某些监听器在应用启动初期并不需要立即运行,可以将其 autoStartup 设置为 false,以减少应用启动时的资源占用,在需要时再启动。

示例代码

@Component
public class MyListener {

    @RabbitListener(
        bindings = @QueueBinding(
            value = @Queue(value = "myQueue", durable = "true"),
            exchange = @Exchange(value = "myExchange", type = ExchangeTypes.TOPIC),
            key = "myRoutingKey"
        ),
        autoStartup = "false" // 设置为false,监听器容器在应用启动时不会自动启动
    )
    public void receiveMessage(String message) {
        System.out.println("Received: " + message);
    }
}

手动启动监听器

如果将 autoStartup 设置为 false,可以通过编程方式手动启动监听器容器。例如,可以在一个 @PostConstruct 方法中启动监听器:

@Component
public class ListenerStarter {

    @Autowired
    private RabbitListenerEndpointRegistry registry;

    @PostConstruct
    public void startListeners() {
        registry.getListenerContainer("myListenerContainerId").start(); // "myListenerContainerId"是监听器的ID,如果未指定,可以使用默认生成的ID
    }
}

或者,也可以使用 RabbitAdmin 或其他管理组件来控制监听器的启动和停止。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405828.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL-运维篇

运维篇 日志 错误日志 错误日志是 MySQL 中最重要的日志之一&#xff0c;它记录了当 mysqld 启动和停止时&#xff0c;以及服务器在运行过程中发生任何严重错误时的相关信息当数据库出现任何故障导致无法正常使用时&#xff0c;建议首先查看此日志。 该日志是默认开启的&am…

深度优先算法学习

1: 从 1点出发到 15点 #include <stdio.h>#define MAX_NODES 100typedef struct {int node_id;int *nextNodes;int nextNodesSize; } Node;// 假设我们有一个节点数组&#xff0c;全局保存了所有节点 Node nodes[MAX_NODES];void dfs(int node_id) {Node *node &n…

前端技能包

ES6 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>Title</title> </head> <body><script>// 变量定义var a1;let b5; // 现在使用let 定义变量// 对象解构let person{&quo…

【笔记】PyCharm 使用问题反馈与官方进展速览

#工作记录 https://youtrack.jetbrains.com/issue/IJPL-190308 【笔记】记一次PyCharm的问题反馈_the polyglot context is using an implementation th-CSDN博客 【笔记】与PyCharm官方沟通解决开发环境问题-CSDN博客 与 JetBrains 官方沟通记录&#xff08;PyCharm 相关问题…

操作系统期末版

文章目录 概论处理机管理进程线程处理机调度生产者消费者问题 死锁简介死锁的四个必要条件解决死锁的方法 存储管理链接的三种方式静态链接装入时动态链接运行时链接 装入内存的三种方式绝对装入可重定位装入动态运行时装入 覆盖交换存储管理方式连续分配**分段存储管理方式***…

自然语言处理——语言模型

语言模型 n元文法参数估计数据平滑方法加1法 神经网络模型提出原因前馈神经网络&#xff08;FNN&#xff09;循环神经网络 n元文法 大规模语料库的出现为自然语言统计处理方法的实现提供了可能&#xff0c;统计方法的成功应用推动了语料库语言学的发展。 语句 &#x1d460; …

数据库管理与高可用-MySQL高可用

目录 #1.1什么是MySQL高可用 1.1.1MySQL主主复制keepalivedhaproxy的高可用 1.1.2优势 #2.1MySQL主主复制keepalivedhaproxy的实验案例 1.1什么是MySQL高可用 MySQL 高可用是指通过技术手段确保 MySQL 数据库在面临硬件故障、软件错误、网络中断、人为误操作等异常情况时&…

免费工具-微软Bing Video Creator

目录 引言 一、揭秘Bing Video Creator 二、轻松上手&#xff1a;三步玩转Bing Video Creator 2.1 获取与访问&#xff1a; 2.2 创作流程&#xff1a; 2.3 提示词撰写技巧——释放AI的想象力&#xff1a; 三、核心特性详解&#xff1a;灵活满足多样化需求 3.1 双重使用模…

【笔记】解决MSYS2安装后cargo-install-update.exe-System Error

#工作记录 cargo-install-update.exe-System Error The code execution cannot proceed because libgit2-1.9.dll wasnot found. Reinstalling the program may fix this problem. …

银行卡二三四要素实名接口如何用PHP实现调用?

一、什么是银行卡二三四要素实名接口 输入银行卡卡号、姓名、身份证号码、手机号&#xff0c;验证此二三四要素是否一致。 二、核心价值 1. 提升风控效率 通过实时拦截冒用身份开户&#xff0c;银行卡二三四要素实名接口显著降低了人工审核成本&#xff0c;效率提升50%以上…

itvbox绿豆影视tvbox手机版影视APP源码分享搭建教程

我们先来看看今天的主题&#xff0c;tvbox手机版&#xff0c;然后再看看如何搭建&#xff1a; 很多爱好者都希望搭建自己的影视平台&#xff0c;那该如何搭建呢&#xff1f; 后端开发环境&#xff1a; 1.易如意后台管理优化版源码&#xff1b; 2.宝塔面板&#xff1b; 3.ph…

网页抓取混淆与嵌套数据处理流程

当我们在网页抓取中&#xff0c;遇到混淆和多层嵌套的情况是比较常见的挑战。混淆大部分都是为了防止爬虫而设计的&#xff0c;例如使用JavaScript动态加载、数据加密、字符替换、CSS偏移等。多层嵌套则可能是指HTML结构复杂&#xff0c;数据隐藏在多层标签或者多个iframe中。 …

高性能MYSQL:复制同步的问题和解决方案

一、复制的问题和解决方案 中断MySQL的复制并不是件难事。因为实现简单&#xff0c;配置相当容易&#xff0c;但也意味着有很多方式会导致复制停止&#xff0c;陷入混乱并中断。 &#xff08;一&#xff09;数据损坏或丢失的错误 由于各种各样的原因&#xff0c;MySQL 的复制…

大话软工笔记—架构模型

1. 架构模型1—拓扑图 &#xff08;1&#xff09;拓扑图概念 拓扑图&#xff0c;将多个软件系统用网络图连接起来的表达方式。 &#xff08;2&#xff09;拓扑图分类 总线型结构 比较普遍采用的方式&#xff0c;将所有的系统接到一条总线上。 星状结构 各个系统通过点到…

javaweb -html -CSS

HTML是一种超文本标记语言 超文本&#xff1a;超过了文本的限制&#xff0c;比普通文本更强大&#xff0c;除了文字信息&#xff0c;还可以定义图片、音频、视频等内容。 标记语言&#xff1a;由标签"<标签名>"构成的语言。 CSS:层叠样式表&#xff0c;用于…

spring task定时任务快速入门

spring task它基于注解和配置&#xff0c;可以轻松实现任务的周期性调度、延迟执行或固定频率触发。按照我们约定的时间自动执行某段代码。例如闹钟 使用场景 每月还款提醒&#xff0c;未支付的订单自动过期&#xff0c;收到快递后自动收货&#xff0c;系统自动祝你生日快乐等…

搭建nginx的负载均衡

1、编写一个configMap的配置文件 events {worker_connections 1024; # 定义每个worker进程的最大连接数 }http {# 定义通用代理参数&#xff08;替代proxy_params文件&#xff09;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-F…

Appium+python自动化(八)- 认识Appium- 下章

1、界面认识 在之前安装appium的时候说过我们有两种方法安装&#xff0c;也就有两种结果&#xff0c;一种是有界面的&#xff08;客户端安装&#xff09;&#xff0c;一种是没有界面的&#xff08;终端安装&#xff09;&#xff0c;首先我们先讲一下有界面的&#xff0c;以及界…

LabVIEW的MathScript Node 绘图功能

该VI 借助 LabVIEW 的 MathScript Node&#xff0c;结合事件监听机制&#xff0c;实现基于 MathScript 的绘图功能&#xff0c;并支持通过交互控件自定义绘图属性。利用 MathScript 编写脚本完成图形初始化&#xff0c;再通过LabVIEW 事件结构响应用户操作&#xff0c;动态修改…

每日Prompt:治愈动漫插画

提示词 现代都市治愈动漫插画风格&#xff0c;现代女子&#xff0c;漂亮&#xff0c;长直发&#xff0c;20岁&#xff0c;豆沙唇&#xff0c;白皙&#xff0c;气质&#xff0c;清纯现代都市背景下&#xff0c;夕阳西下&#xff0c;一位穿着白色露脐短袖&#xff0c;粉色工装裤…