SpringBoot整合RabbitMQ

news2025/7/26 7:56:30

RabbitMQ安装部署详情可见:RabbitMQ简介及在Linux中安装部署(yum)

 一、导入pom.xml依赖

<dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.8.0</version>
</dependency>
<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-nop</artifactId>
      <version>1.7.29</version>
</dependency>

二、入门测试案例

  发送消息类:Send.java

public class Send {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //发布消息
        String message = "你好,老6";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println("发送了消息:" + message);
        //关闭连接
        channel.close();
        connection.close();
    }
}

  接收消息类:Recv.java

public class Recv {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        //接收消息并消费
        channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        });
    }
}

运行结果:            ​​​​​

 三、多任务消息队列案例

发送多任务消息类NewTask.java

public class NewTask {

    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        for (int i = 0; i < 10; i++) {
            String message;
            if (i % 2 == 0) {
                message = i + "...";
            } else {
                message = String.valueOf(i);
            }
            channel.basicPublish("", TASK_QUEUE_NAME, null, message.getBytes("UTF-8"));
            System.out.println("发送了消息:" + message);
        }

        channel.close();
        connection.close();
    }
}

 接收多任务消息类Worker.java

public class Worker {

    private final static String TASK_QUEUE_NAME = "task_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        //创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        //设置RabbitMQ地址
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        //建立连接
        Connection connection = factory.newConnection();
        //获得信道
        final Channel channel = connection.createChannel();
        //声明队列
        channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
        System.out.println("开始接收消息");
        channel.basicQos(1);

        channel.basicConsume(TASK_QUEUE_NAME, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到了消息:" + message);
                try {
                    doWork(message);
                } finally {
                    System.out.println("消息处理完成");
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            }
        });
    }

    private static void doWork(String task) {
        char[] chars = task.toCharArray();
        for (char ch : chars) {
            if (ch == '.') {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

运行结果:

四、接收指定消息案例

发送消息类:EmitLogDirect.java

public class EmitLogDirect {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        String message = "info:Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes("UTF-8"));
        System.out.println("发送了消息," + "等级为info,消息内容:" + message);

        message = "warning:Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "warning", null, message.getBytes("UTF-8"));
        System.out.println("发送了消息," + "等级为warning,消息内容:" + message);

        message = "error:Hello World!";
        channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
        System.out.println("发送了消息," + "等级为error,消息内容:" + message);
        channel.close();
        connection.close();
    }
}

接收所有消息类:ReceiveLogsDirect1.java

public class ReceiveLogsDirect1 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机同时绑定3个queue
        channel.queueBind(queueName, EXCHANGE_NAME, "info");
        channel.queueBind(queueName, EXCHANGE_NAME, "warning");
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

仅仅接收error消息类:ReceiveLogsDirect2.java

public class ReceiveLogsDirect2 {
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.119.129");
        factory.setUsername("admin");
        factory.setPassword("password");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
        //生成一个随机的临时的queue
        String queueName = channel.queueDeclare().getQueue();
        //一个交换机绑定1个queue
        channel.queueBind(queueName, EXCHANGE_NAME, "error");

        System.out.println("开始接收消息");
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                    BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println("收到消息:" + message);
            }
        };
        channel.basicConsume(queueName, true, consumer);
    }
}

运行结果:

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/16799.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

简单的抓包

验证TCP/IP协议栈结构、帧结构 准备&#xff1a; 用到的软件&#xff1a;科来网络分析系统、putty 1&#xff09;准备两台虚拟机&#xff08;我这里准备windows2003、7&#xff09;&#xff0c;并让两台虚拟机可以互相通信&#xff08;配置IP&#xff0c;让他们处于同一局域网…

基于SSM的网络教学(作业)管理系统

1、项目介绍 基于SSM的网络教学&#xff08;作业&#xff09;管理系统拥有三种角色&#xff1a;管理员、教师和学生 管理员&#xff1a;教师管理、学生管理、课程管理、教学信息管理、作业管理、试卷管理、试题管理、论坛管理、系统管理、考试管理等 教师&#xff1a;课程信…

【快速上手系列】使用支付宝沙箱环境进行支付测试的快速上手

【快速上手系列】使用支付宝沙箱环境进行支付测试的快速上手 步骤 一、支付宝开放平台 1、进入支付宝开放平台&#xff0c;登录&#xff0c;然后点击控制台&#xff0c;划到最下面&#xff0c;点击沙箱 2、APPID一会要用到&#xff0c;然后将接口加签方式选择为自定义密钥&a…

【Bio】基础生物学 - 细胞膜 cell membrane

文章目录1. 细胞膜1.1 内平衡1.2 选择透过性1.3 流动镶嵌模型Ref1. 细胞膜 没有细胞膜&#xff0c;就没有细胞。 不论是否有细胞壁&#xff0c;所有的细胞都有细胞膜&#xff0c;也叫做 质膜 (plasma membrane)\blue{\text{质膜 (plasma membrane)}}质膜 (plasma membrane)。…

RabbitMQ系列【10】死信队列

有道无术&#xff0c;术尚可求&#xff0c;有术无道&#xff0c;止于术。 文章目录概念创建死信交换机、队列过期导致死信拒接消费长度限制概念 无法被消费的消息被称为死信&#xff0c;存放死信的队列也就是死信队列。 由于某些特定的原因导致队列中的某些消息无法被消费&am…

【数据结构】c++栈的应用:波兰式、逆波兰式和中缀表达式计算器

********************************************************************************************************* 本文作者科大MF22某班Noah懒羊羊同学&#xff0c;为大家提供一个作业思路&#xff0c;请勿直接copy&#xff01;&#xff01;&#xff01;一起进步学习~ ********…

Mac 通过docker安装MinIO

前言 最近MeterSphere出了新版本&#xff0c;新版本架构是这样的&#xff08;如下图&#xff09;。采用了SpringCloudSpringBoot 微服务的架构的。跟以往相比&#xff0c;多了一个新的组件&#xff0c;MinIO。也就是分布式存储。 关于MinIO 1、MinIO是什么&#xff1f; 官方…

[附源码]java毕业设计某互联网公司人力资源管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【C语言】操作符详解

目录 1、操作符分类 2、算术操作符 3、移位操作符 (二进制) 3.1 左移操作符 3.2 右移操作符 4、位操作符 5、赋值操作符 6、单目操作符 6.1 单目操作符介绍 6.2 sizeof和数组 7、关系操作符 8、逻辑操作符 9、条件操作符 10、逗号表达式 11、下标引用、函…

微信对账单功能开发(V2)

下载交易账单接口开发 应用场景&#xff1a; 商户可以通过该接口下载历史交易清单。比如掉单、系统错误等导致商户侧和微信侧数据不一致&#xff0c;通过对账单核对后可校正支付状态。 注意&#xff1a; 1、微信侧未成功下单的交易不会出现在对账单中。支付成功后撤销的交易…

Azure Integrator Delphi版

Azure Integrator Delphi版 Azure Integrator包括表、队列和Blob等标准Windows Azure结构的实现&#xff0c;使开发人员能够快速轻松地将基于云的数据存储、队列管理、表配置等添加到任何桌面、Web或移动应用程序中。 Azure Integrator功能 用于访问Windows Azure表、Blob和队列…

python folium 实现地图平台制作

python实现泸定地震点观测平台制作 数据来自[走天涯徐小洋地理数据科学]&#xff0c;原始数据来自微博中国地震台网 的正式测定数据。 以下是地震点的数据&#xff1a; MagnitudeDateTimeLongitudeLatitudeDepth6.82022-9-512:52102.0829.59163.12022-9-65:28102.0729.64113…

[附源码]java毕业设计览宏公司会议管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

01. 信息搜集:Web 1~10

Web 1~10web1知识点题解web2知识点题解web3知识点题解web4知识点题解web5知识点题解web6知识点题解web7知识点题解web8知识点题解web9知识点题解web10知识点题解web1 知识点 查看网页源码&#xff1a;ctrl u 或 F12 开发注释未及时删除 题解 查看网页源码即可。 web2 知识…

学习UI设计,哪些软件是必学的

UI设计软件的学习并不重要。许多设计软件功能相似&#xff0c;操作相似&#xff0c;设计效果相似&#xff0c;此时我们只需要选择相同类型的软件进行深入学习&#xff0c;当我们掌握软件时&#xff0c;使用其他类型的软件基本上不会有太大的困难。 ​一、位图软件&#xff1a; …

spring-data-mongodb生成的Query语句order字段顺序错误

前言&#xff1a; 最近在实现一个需求的时候&#xff0c;需求要求查询的数据需要根据播放量倒叙、创建时间倒叙来排序&#xff0c;考虑到播放量、创建时间都有可能是相同的&#xff0c;就会出现排序不稳定的情况&#xff0c;于是就加入了"_id"作为第三个排序字段&am…

Unity程序在VR一体机(Android)上卡死(闪退)后怎么办?——用adb查看android上某Unity app的debug信息

一、之前面临的困境 Unity的程序build到android一体机后&#xff0c;仿佛进入了一个黑箱子&#xff0c;你既看不到脚本的debug报错信息&#xff0c;也看不到任务管理器里的内存和CPU使用情况&#xff1f;如果黑屏、闪屏、花屏怎么办&#xff1f; 最近面临的一个问题就是&…

【快速上手系列】使用七牛云+webuploader实现对象云存储(OSS)

【快速上手系列】使用七牛云webuploader实现对象云存储&#xff08;OSS&#xff09; 步骤 一、七牛云配置 1、新建存储空间 进入七牛云&#xff0c;注册登录&#xff0c;进入控制台&#xff0c;选择对象存储 Kodo 选择空间管理——新建空间&#xff08;免费30天&#xff0…

[附源码]java毕业设计企业人力资源管理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

【Bio】基础生物学 - 基因 gene

文章目录1. DNA 脱氧核糖核酸、RNA 核糖核酸1.1 核苷酸1.2 脱氧核糖核酸1.3 核糖核酸2. 基因2.1 基因组2.2 染色体2.3 基因与脱氧核苷酸的牵连2.4 基因与DNA的牵连2.5 基因与染色体的牵连Ref1. DNA 脱氧核糖核酸、RNA 核糖核酸 1.1 核苷酸 核苷酸 (Nucleotide)\blue{\text{核苷…