Spring Boot 3 整合 MQ 构建聊天消息存储系统

news2025/6/3 15:06:59

引子

在构建实时聊天服务时,我们既要保证消息的即时传递,又需要对消息进行持久化存储以便查询历史记录。然而,直接同步写入数据库在高并发场景下容易成为性能瓶颈,影响消息的实时性。秉承"没有什么问题是加一层解决不了的"理念,引入消息队列(MQ)进行异步存储是一个优雅的解决方案。消息先快速写入MQ确保即时送达,随后由专门的消费者服务从队列取出,平稳写入数据库。

在本文中,我们将详细探讨如何利用Spring Boot 3 结合消息队列技术,构建一个高效可靠的聊天消息存储系统。

在这里插入图片描述

关于MQ

MQ在这里主要的作用是实现解耦,将聊天功能与聊天内容的存储过程分离。这种机制很像工厂与批发商之间的订货关系优化——传统模式下,工厂每次出货都需要逐一通知各个批发商。
在这里插入图片描述

而引入MQ后,这一流程变得优雅高效,就像工厂只需在一个微信群里发布消息,所有批发商便能同时获取信息,无需一对一通知。工厂专注生产,批发商按需处理,两端各司其职。
在这里插入图片描述
消息队列作为服务间通信的中间媒介,在分布式系统中扮演着至关重要的角色。常见的解决方案有专业的消息队列系统(如RabbitMQ、Kafka、RocketMQ等)、分布式协调服务Zookeeper,以及基于Redis实现的轻量级队列。

MQ选型

在众多消息队列产品中,各有其特点和适用场景:

消息队列开发语言特点适用场景
RabbitMQErlang成熟稳定、易于部署、丰富的路由功能、社区活跃复杂路由需求、中小规模消息量、需要可靠性保证
ActiveMQJava老牌MQ、JMS实现、资源消耗较高传统企业应用、与Java生态紧密结合
RocketMQJava高吞吐、低延迟、金融级可靠性、支持大量堆积大规模互联网应用、金融支付场景
KafkaScala/Java超高吞吐量、持久化、分区设计、擅长流处理日志收集、大数据实时处理、流数据分析
ZeroMQC++轻量级、无中心化、嵌入式库对性能极为敏感的场景、点对点通信
Redis队列C轻量简单、基于内存、低延迟简单场景、临时队列、对持久化要求不高

对于我们的聊天消息存储场景,最终选择了 RabbitMQ,主要基于以下考虑:

  1. 成熟稳定:RabbitMQ历史悠久,生产环境验证充分,可靠性有保障
  2. 灵活路由:提供丰富的交换机类型和绑定机制,可针对不同类型消息实现精细化路由
  3. 易于集成:与Spring生态深度整合,Spring Boot 提供了完善的 starter 支持
  4. 运维友好:部署简单,自带管理界面,便于监控和管理
  5. 社区支持:活跃的社区和丰富的文档资源,遇到问题容易找到解决方案

虽然在极高并发场景下 Kafka 或 RocketMQ 可能有更好的吞吐性能,但考虑到我们这里重点在系统的解耦上,RabbitMQ 已经能够很好地满足需求,同时降低了开发和维护成本。

应用场景

消息队列在系统架构中有多种经典应用场景:

异步处理:将耗时操作(如邮件发送、日志处理)交由消息队列异步处理,快速响应用户请求,提升体验。

性能提升:通过异步解耦,减少系统响应时间,提高吞吐量,尤其适合I/O密集型操作。

系统解耦:降低服务间直接依赖,提高系统弹性和可维护性,便于独立扩展和升级。

削峰填谷:在流量高峰期,消息队列可缓存请求,按处理能力逐步消费,防止系统过载崩溃。

在聊天消息存储场景中,我们主要利用RabbitMQ实现消息异步存储,既保证了聊天功能的响应速度,又能可靠地将消息持久化到数据库,同时为系统提供了应对消息高峰的能力。

关于RabbitMQ

一条消息在RabbitMQ中的完整生命周期如下:

  1. 生产者创建消息:在聊天应用中,用户发送一个聊天内容,应用将其封装成MQ消息
  2. 投递到交换机:生产者将消息发送到指定的Exchange,同时指定路由键(Routing Key)
  3. 交换机路由转发:Exchange根据消息的路由键和绑定规则,决定将消息投递到哪个队列
    • 若是Direct交换机,则精确匹配路由键
    • 若是Fanout交换机,则广播给所有绑定队列
    • 若是Topic交换机,则按模式匹配路由
  4. 存入队列:符合条件的队列接收并存储消息,等待消费者处理
  5. 消费者获取消息:存储服务作为消费者从队列中获取消息,可以是推模式(Push)或拉模式(Pull)
  6. 处理确认:消费者成功处理消息后(如将聊天内容存入数据库),向RabbitMQ发送确认(ACK)
  7. 消息删除:收到确认后,RabbitMQ从队列中删除该消息

在这里插入图片描述

安装RabbitMQ

RabbitMQ的安装可以通过多种方式进行,而Docker提供了最便捷的部署方案。以下是使用Docker快速部署RabbitMQ的步骤:

1. 拉取镜像

首先从Docker Hub拉取RabbitMQ官方镜像,建议选择带management标签的版本,它包含了Web管理界面,便于后续的可视化操作和监控:

docker pull rabbitmq:4.1-management

提示:各位读者在实操时可以访问Docker Hub查看并使用最新的版本

2. 启动容器

拉取镜像后,通过以下命令启动RabbitMQ容器:

docker run --name rabbitmq -p 5681:5671 -p 5682:5672 -p 4379:4369 -p 15681:15671 -p 15682:15672 -p 25682:25672 --restart always -d rabbitmq:4.1-management

这里我们做了以下映射和配置:

  • 暴露AMQP端口(5672)和管理界面端口(15672)
  • 配置容器自动重启(–restart always),确保服务器重启后RabbitMQ也能自动启动
  • 后台运行容器(-d)

3. 验证安装

启动成功后,在浏览器中访问http://127.0.0.1:15682打开RabbitMQ管理控制台:
在这里插入图片描述
使用默认的用户名和密码登录(均为guest):
在这里插入图片描述
登录成功后,您将看到RabbitMQ的管理界面,可以在这里创建交换机、队列、查看连接状态以及监控消息吞吐量等重要指标。

注意:默认的guest用户只能从localhost访问,如需远程访问,建议创建新的管理员用户并设置适当的权限。

Spring Boot 整合 RabbitMQ

在开始之前,我们先创建消息表。本文的聊天服务基于之前的文章《Java 工程师进阶必备:Spring Boot 3 + Netty 构建高并发即时通讯服务》,感兴趣的读者可以自行查阅。

DROP TABLE IF EXISTS `chat_message`;
CREATE TABLE `chat_message`  (
  `id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL,
  `sender_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '发送者的用户id',
  `receiver_id` varchar(32) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '接受者的用户id',
  `receiver_type` int(11) NULL DEFAULT NULL COMMENT '消息接受者的类型,可以作为扩展字段',
  `msg` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '聊天内容',
  `msg_type` int(11) NOT NULL COMMENT '消息类型,有文字类、图片类、视频类...等,详见枚举类',
  `chat_time` datetime NOT NULL COMMENT '消息的聊天时间,既是发送者的发送时间、又是接受者的接受时间',
  `show_msg_date_time_flag` int(11) NULL DEFAULT NULL COMMENT '标记存储数据库,用于历史展示。每超过1分钟,则显示聊天时间,前端可以控制时间长短(扩展字段)',
  `video_path` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '视频地址',
  `video_width` int(11) NULL DEFAULT NULL COMMENT '视频宽度',
  `video_height` int(11) NULL DEFAULT NULL COMMENT '视频高度',
  `video_times` int(11) NULL DEFAULT NULL COMMENT '视频时间',
  `voice_path` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '语音地址',
  `speak_voice_duration` int(11) NULL DEFAULT NULL COMMENT '语音时长',
  `is_read` tinyint(1) NULL DEFAULT NULL COMMENT '语音消息标记是否已读未读,true: 已读,false: 未读',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci COMMENT = '聊天信息存储表' ROW_FORMAT = Dynamic;

导入依赖

首先,在项目的 pom.xml 文件中添加 RabbitMQ 依赖:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加配置

application.ymlapplication.properties 文件中添加 RabbitMQ 的配置:

spring:  
  rabbitmq:
    host: 127.0.0.1
    port: 5682
    username: guest
    password: guest
    virtual-host: /

编写生产者

创建一个消息发布者类,用于发送消息到 RabbitMQ:

import com.pitayafruits.pojo.netty.ChatMsg;
import com.pitayafruits.utils.JsonUtils;


public class MessagePublisher {

    // 定义交换机的名字
    public static final String EXCHANGE = "pitayafruits_exchange";

    // 定义队列的名字
    public static final String QUEUE = "pitayafruits_queue";

    // 发送信息到消息队列接受并且保存到数据库的路由地址
    public static final String ROUTING_KEY_SEND = "pitayafruits.wechat.send";


    public static void sendMsgToSave(ChatMsg msg) throws Exception {
        RabbitMQConnectUtils connectUtils = new RabbitMQConnectUtils();
        connectUtils.sendMsg(JsonUtils.objectToJson(msg),
                EXCHANGE,
                ROUTING_KEY_SEND);
    }
    
}

编写发送消息的工具类

import com.rabbitmq.client.*;

import java.util.ArrayList;
import java.util.List;

public class RabbitMQConnectUtils {

    private final List<Connection> connections = new ArrayList<>();
    private final int maxConnection = 20;

    // 开发环境 dev
    private final String host = "127.0.0.1";
    private final int port = 5682;
    private final String username = "guest";
    private final String password = "guest";
    private final String virtualHost = "/";

    public ConnectionFactory factory;

    public ConnectionFactory getRabbitMqConnection() {
        return getFactory();
    }

    public ConnectionFactory getFactory() {
        initFactory();
        return factory;
    }

    private void initFactory() {
        try {
            if (factory == null) {
                factory = new ConnectionFactory();
                factory.setHost(host);
                factory.setPort(port);
                factory.setUsername(username);
                factory.setPassword(password);
                factory.setVirtualHost(virtualHost);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public void sendMsg(String message, String queue) throws Exception {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish("",
                            queue,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("utf-8"));
        channel.close();
        setConnection(connection);
    }

    public void sendMsg(String message, String exchange, String routingKey) throws Exception {
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        channel.basicPublish(exchange,
                            routingKey,
                            MessageProperties.PERSISTENT_TEXT_PLAIN,
                            message.getBytes("utf-8"));
        channel.close();
        setConnection(connection);
    }

    public GetResponse basicGet(String queue, boolean autoAck) throws Exception {
        GetResponse getResponse = null;
        Connection connection = getConnection();
        Channel channel = connection.createChannel();
        getResponse = channel.basicGet(queue, autoAck);
        channel.close();
        setConnection(connection);
        return getResponse;
    }

    public Connection getConnection() throws Exception {
        return getAndSetConnection(true, null);
    }

    public void setConnection(Connection connection) throws Exception {
        getAndSetConnection(false, connection);
    }

    private synchronized Connection getAndSetConnection(boolean isGet, Connection connection) throws Exception {
        getRabbitMqConnection();

        if (isGet) {
            if (connections.isEmpty()) {
                return factory.newConnection();
            }
            Connection newConnection = connections.get(0);
            connections.remove(0);
            if (newConnection.isOpen()) {
                return newConnection;
            } else {
                return factory.newConnection();
            }
        } else {
            if (connections.size() < maxConnection) {
                connections.add(connection);
            }
            return null;
        }
    }

}

编写消费者

创建一个消息消费者类,用于接收并处理消息:

import com.pitayafruits.pojo.netty.ChatMsg;
import com.pitayafruits.service.ChatMessageService;
import com.pitayafruits.utils.JsonUtils;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Auther 风间影月
 */
@Component
@Slf4j
public class RabbitMQConsumer {

    @Resource
    private ChatMessageService chatMessageService;

    @RabbitListener(queues = {RabbitMQConfig.QUEUE})
    public void watchQueue(String payload, Message message) {
        String routingKey = message.getMessageProperties().getReceivedRoutingKey();
        log.info("routingKey = " + routingKey);

        if (routingKey.equals(RabbitMQConfig.ROUTING_KEY_SEND)) {
            String msg = payload;
            ChatMsg chatMsg = JsonUtils.jsonToPojo(msg, ChatMsg.class);

            chatMessageService.saveMsg(chatMsg);
        }

    }

方法调用

完成上述封装后,在本次的案例中,直接在聊天服务的发送消息方法中调用消息发布功能即可。

// 把聊天信息作为mq的消息发送给消费者进行消费处理(保存到数据库)
MessagePublisher.sendMsgToSave(chatMsg);

小结

通过 Spring Boot 整合 RabbitMQ,我们实现了消息的异步处理机制,将聊天消息的存储操作解耦,提高了系统的性能和可扩展性。当用户发送消息时,我们将消息发送到 RabbitMQ,然后由消费者异步处理并保存到数据库中,避免了直接操作数据库导致的性能瓶颈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2395620.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

非线性声学计算与强化学习融合框架:突破复杂环境人机交互的新技术

随着人工智能的快速发展&#xff0c;尤其是在深度学习和强化学习领域&#xff0c;声学计算和人机交互进入前所未有的扩展和创新阶段。尽管传统声学方法取得了显著成功&#xff0c;但这些线性或准线性方法在实际环境中往往存在关键的不足&#xff0c;尤其在动态、复杂或混响环境…

C++ - STL #什么是STL #STL的版本 #闭源开源 #STL的六大组件

文章目录 前言 一、什么是STL 二、STL的版本 1、原始版本 2、P.J.版本 3、RW版本 4、SGI版本 三、闭源、开源 四、STL的六大组件 总结 前言 路漫漫其修远兮&#xff0c;吾将上下而求索&#xff1b; 一、什么是STL STL(standard template libaray 标准模板库)&#…

Flutter - 原生交互 - 相机Camera - 01

环境 Flutter 3.29 macOS Sequoia 15.4.1 Xcode 16.3 集成 Flutter提供了camera插件来拍照和录视频&#xff0c;它提供了一系列可用的相机&#xff0c;并使用特定的相机展示相机预览、拍照、录视频。 添加依赖 camera: 提供使用设备相机模块的工具path_provider: 寻找存储图…

湖北理元理律师事务所:个人债务管理的温度与精度

湖北理元理律师事务所&#xff1a;个人债务管理的温度与精度 面对信用卡、网贷、医疗债等多重债务压力&#xff0c;普通人常陷入“拆东墙补西墙”的恶性循环。湖北理元理律师事务所通过计划集团公司服务平台&#xff0c;推出“有温度的债务优化计划”&#xff0c;其人性化设计…

Compose原理 - 整体架构与主流程

一、整体架构 在官方文档中&#xff08;Jetpack Compose 架构层 | Android Developers&#xff09;&#xff0c;对Compose的分层有所阐述&#xff1a; 其中 Runtime&#xff1a;提供Compose的基础运行能力&#xff0c;包括State、Side-effects、CompositionLocal、Compositio…

CppCon 2014 学习: C++ Test-driven Development

“Elephant in the Room”这个比喻常用来形容那些大家都知道但没人愿意讨论的重大问题。 这段内容讲的是软件质量管理的经典做法和潜在的问题&#xff1a; 经典做法&#xff1a;开发完成后才进行人工测试&#xff08;manual testing after creation&#xff09;。隐喻“Cape o…

RAGflow详解及实战指南

目录 前言 一、RAGflow核心技术解析 1. 技术原理&#xff1a;检索与生成的协同进化 2. 架构设计&#xff1a;分层模块化与高扩展性 3. 核心优势&#xff1a;精准、高效、安全 二、RAGflow实战应用场景 1. 企业知识库搭建 2. 智能客服系统 3. 投资分析报告生成 4. 制造…

[Godot] 如何导出安卓 APK 并在手机上调试

在之前的文章中&#xff0c;我们已经详细介绍了如何配置 Godot 的安卓应用开发环境&#xff0c;包括安装 Android SDK、配置 Java 环境、设置 Godot 的 Android 导出模板等。本篇文章将进一步讲解如何将 Godot 项目导出为安卓 APK 文件&#xff0c;并实现在手机上进行调试运行。…

Linux《文件系统》

在之前的系统IO当中已经了解了“内存”级别的文件操作&#xff0c;了解了文件描述符、重定向、缓冲区等概念&#xff0c;在了解了这些的知识之后还封装出了我们自己的libc库。接下来在本篇当中将会将视角从内存转向磁盘&#xff0c;研究文件在内存当中是如何进行存储的&#xf…

NLP学习路线图(十六):N-gram模型

一、为何需要语言模型&#xff1f;概率视角下的语言本质 自然语言处理的核心挑战在于让机器“理解”人类语言。这种理解的一个关键方面是处理语言的歧义性、创造性和结构性。语言模型&#xff08;Language Model, LM&#xff09;为此提供了一种强大的数学框架&#xff1a;它赋…

Python训练第四十天

DAY 40 训练和测试的规范写法 知识点回顾&#xff1a; 彩色和灰度图片测试和训练的规范写法&#xff1a;封装在函数中展平操作&#xff1a;除第一个维度batchsize外全部展平dropout操作&#xff1a;训练阶段随机丢弃神经元&#xff0c;测试阶段eval模式关闭dropout 昨天我们介绍…

InternVL2.5-多模态大模型评估专业图片

具备图像理解功能的大模型InternVL2.5&#xff0c;能有效解析大部分图片。 对于专业图片如医学细胞切片&#xff0c;从专业角度解析&#xff0c;能推动模型应用到更广泛的领域。 InternVL2.5解析示例 prompt(胸部癌变细胞图片,来自PanNuke) 请评估这个组织的风险 InternVL2.…

医疗数理范式化:从范式迁移到认知革命的深度解析

引言 在当代医疗领域,数理思维已经从辅助工具逐渐发展成为核心决策支持系统的关键组成部分。随着数字技术的迅猛发展,医疗行业正经历着前所未有的变革,而数理思维作为这一变革的核心驱动力,正在深刻重塑医疗实践的方方面面。数理思维在医疗领域的应用,本质上是将抽象的数…

图神经网络在信息检索重排序中的应用:原理、架构与Python代码解析

现代信息检索系统和搜索引擎普遍采用两阶段检索架构&#xff0c;在人工智能应用中也被称为检索增强生成&#xff08;Retrieval-Augmented Generation, RAG&#xff09;。在初始检索阶段&#xff0c;系统采用高效的检索方法&#xff0c;包括词汇检索算法&#xff08;如BM25&…

现代数据湖架构全景解析:存储、表格式、计算引擎与元数据服务的协同生态

本文全面剖析现代数据湖架构的核心组件,深入探讨对象存储(OSS/S3)、表格式(Iceberg/Hudi/Delta Lake)、计算引擎(Spark/Flink/Presto)及元数据服务(HMS/Amoro)的协作关系,并提供企业级选型指南。 一、数据湖架构演进与核心价值 数据湖架构演进历程 现代数据湖核心价…

全志F1c200开发笔记——移植Debian文件系统

1.搭建环境 sudo apt install qemu-user-static -y sudo apt install debootstrap -y mkdir rootfs 2.拉取文件系统 这边我参照墨云大神的文档&#xff0c;但是华为镜像已经没有armel了&#xff0c;我找到了官方仓库&#xff0c;还是有的&#xff0c;拉取速度比较慢 sudo d…

支持功能安全ASIL-B的矩阵管理芯片IS32LT3365,助力ADB大灯系统轻松实现功能安全等级

随着自动驾驶技术的快速发展&#xff0c;汽车前灯智能化也越来越高。自适应远光灯 (ADB) 作为一种智能照明系统&#xff0c;在提升驾驶安全性和舒适性方面发挥着重要作用。ADB 系统通过摄像头和传感器获取前方道路信息&#xff0c;例如来车的位置、距离和速度&#xff0c;并根据…

BFS入门刷题

目录 P1746 离开中山路 P1443 马的遍历 P1747 好奇怪的游戏 P2385 [USACO07FEB] Bronze Lilypad Pond B P1746 离开中山路 #include <iostream> #include <queue> #include <cstring> using namespace std; int n; int startx, starty; int endx, endy; …

UE5 编辑器工具蓝图

文章目录 简述使用方法样例自动生成Actor&#xff0c;并根据模型的包围盒设置Actor的大小批量修改场景中Actor的属性&#xff0c;设置Actor的名字&#xff0c;设置Actor到指定的文件夹 简述 使用编辑器工具好处是可以在非运行时可以对资源或场景做一些操作&#xff0c;例如自动…

数据仓库分层 4 层模型是什么?

企业每天都在产生和收集海量数据。然而&#xff0c;面对这些数据&#xff0c;许多企业却陷入了困境&#xff1a;如何高效管理、处理和分析这些数据&#xff1f;如何从数据中提取有价值的信息来支持业务决策&#xff1f;这些问题困扰着众多数据分析师和 IT 管理者。 在众多架构…