项目模拟实现消息队列第二天

news2025/5/12 19:32:31

消息应答的模式

1.自动应答:

消费者把这个消息取走了,就算是应答了(相当于没有应答)

2.手动应答:

basicAck方法属于手动应答(消费者需要主动调用这个api进行应答)

小结

1.需要实现生产者,broker server,消费者这三个部分的

2.针对生产者和消费者来说,

主要编写的是 客户端和服务器的网络通信部分,给客户端提供一组api,让客户端的业务代码来调用,从而通过网络通信的方式远程调用broker server的方法

至于生产者的数据从哪里来,消费者取到数据之后干嘛,生产者和消费者的具体业务逻辑,我们不去关心

3.[重点]

实现broker server以及broker server内部的一些核心概念和核心api

核心概念:Virtual host,Exchange,Queue,Binding,Message

核心api:创建交换机,删除交换机,创建队列,删除队列,创建绑定,解除绑定,发送消息,订阅消息,应答消息

4.持久化

上述这些关键数据,在硬盘上怎么存储,什么格式存储,存储在数据库中,文件中?后续服务器重启了,如何读取当前数据,把内存中的内容恢复过来

上述要做的工作,最终目标就是实现一个"分布式系统"下这样的生产者消费者模型

消息队列中存在的核心概念

1.交换机        Exchange

2.队列.          Queue

3.绑定           Binding

4.消息           Message

针对交换机,队列,绑定,消息---内存中也需要存储(执行效率高),硬盘上也需要存储(持久化),有些交换机,队列,绑定等,需要持久化存储,但是有些则不需要,用户使用的时候,可以通过开关(boolean值)来决定是否真正需要持久化 

管理关键概念需要两个部分

1.内存

2.硬盘 ->数据库,文件

考虑使用MySQL本身比较重量,此处为了使用更方便,简化环境,采用更轻量的数据库,SQLite(一个完整的SQLite数据库,只有一个单独的可执行文件(不到1M),这个数据库相当于是直接操作本地的硬盘文件

SQLite应用非常广泛,在一些性能不高的设备上,使用数据库的首选,尤其是移动端和嵌入式设备Android系统,就是内置的SQLite(不用额外安装,直接使用maven,把SQLite的依赖引入进来即可,会自动加载jar和动态库文件

SQLite如何建库能

当把依赖和配置都搞好了,

难点1:

如何把

private Map<String,Object> arguments=new HashMap<>()转化成数据库中的字符串类型呢?

关键要点是:MyBatis在玩成数据库操作的时候,会自动调用对象的getter和setter

1.比如MyBatis往数据库中写数据,就会调用对象的getter方法拿到属性的值,再往数据库中写,如果这个过程让,getArguments得到的结果是String类型的,此时就可以把数据写到数据库中了

2.比如MyBatis从数据库读数据的时候,就会调用对象的setter方法,把数据库智能鼓独到的结果设置到对象的属性中,比如这个过程,让setArguments,参数是一个String,并且在setArguments内部针对字符串解析,解析成一个Map对象。

使用Mybatis来创建数据表的时候,需要先创建出数据库来,创建meta.db这个文件,由于data目录不存在,所以创建meta.db文件的操作,也就失败了。

Message,如何在硬盘上存储

1.消息操作并不涉及到复杂的增删改查

2.消息数量可能非常多,数据库访问效率是不高的

直接把消息存储在文件中

一下设定了消息如何在文件中存储

消息是依附于队列的

因此存储的时候,就把消息按照队列的维度展开

此处已经有了个data目录,meta.db就在这个目录中

在data中创建一些子目录,每个队列有一个子目录,子目录的名字,就是队列名

data->

         meta.db

         testQueue1               这几个也是目录,用来存储对应的消息

         testQueue2

         testQueue3

每个队列的子目录喜爱,再分配两个文件,来存储信息

第一个文件:queue_data.txt这里保存消息的内容

第二个文件:   queue_stat.txt这里保存消息的统计信息

queue_data这个文件是一个二进制格式的文件

做出以下约定~

这个文件中包含若干个消息,每个消息都以二进制的方式存储

每个消息都由这几个部分构成

二进制的格式是java标准库序列化来实现的

Message对象,是在内存中记录一份,硬盘上也记录一份,内存中这一份,要记录offsetBeg和offsetEnd随时找到内存中的Message对象,就能找到对应的硬盘上的Message对象了。

对于Broker Server来说,消息是需要新增,也是需要删除的,生产者生产一个消息过来,就得新增这个消息,消费者吧这个消息消费掉,这个消息就要删除

新增和删除,对于内存来说-好办(直接使用一些集合类)

但是在文件上就麻烦了,新增消息,可以直接把新的消息追加到文件末尾~删除消息不好处理,文件可以视为一个顺序表,这样的结构,如果此时直接删除中间元素,就需要涉及到类似于顺序表搬运这样的操作-效率非常低的是。

因此我们采用逻辑删除-isValid为1,有效消息,isValid为0-无效消息

随着时间的推移,这个消息文件可能会越来越大~并且,这里可能大部分都是无效的消息,针对这种情况,就需要考虑对当前消息数据文件,进行垃圾回收。

使用复制算法,针对消息数据文件中的垃圾,进行回收

如果某个队列中,消息特别多,而且都是有效消息,此时就会导致整个消息数据文件特别大,后续针对这个文件的各种操作,成本就会上升很多,假如这个文件大小是10G,此时如果出发一次GC,整体耗时就会非常高了

对于RabbitMQ来说,解决方案是把一个大的文件,拆成若干个小文件。

文件拆分:当单个文件长度到达一个阈值之后,就会拆分成两个文件(拆者拆着,就变成了很多文件)

文件合并:每个单独的文件都会进行GC,如果GC之后,发现文件变小了很多,就可能会和相邻的文件合并。

这样做,就可以在消息特别多的时候,也能保证性能上的及时响应

这一块逻辑比较复杂,因此此处不进行实现,咱们只考虑单个文件的情况

如果要实现这个机制,大概思路:需要专门的数据结构,来存储当前队列中有多少个数据文件,每个文件大小是多少,消息数目是多少,无效消息是多少

2.设计策略,什么时候出发文件的拆分,什么时候触发文件的合并

统计文件,读写比较简单

消息数据文件,比较复杂

消息序列化如何实现:

首先什么叫做序列化:把一个对象(结构化数据)转成一个字符串/字节数组->(降维打击了)

注意这样的序列化完成之后,对象的信息是不丢失的。这样后面才可以进行反序列化

序列化之后,方便存储和传输->(相当于干粮脱水)

一般是存储在文件中,文件只能存字符串/二进制数据,不能直接存对象

通过网络传输 JSON来完成序列化,反序列化

由于Message,里面存储的body是二进制数据,不太方便使用JSON进行序列化,JSON序列化得到的结果是文本数据,无法存储二进制。

我们准备直接使用二进制的序列化方式,针对Message对象进行序列化~

针对二进制序列化,也有很多种解决方案

1.JAVA标准库提高了序列化的方案,ObjectInputStream(用来序列化)和ObjectOutputStream(反序列化)

2.Hessian也是一个解决方案

3.protobuffer

4.thrift

咱们此处使用第一个方案,标准库自带的方案,好处:不必引入额外以来,学习成本低

遇到的并发问题。,此时写入文件之后,真实消息所在的位置,就和刚才计算的offset不匹配了!

出路:加锁

如果两个线程,是往同一个队列中写消息,此时需要阻塞等待

如果两个线程,往不同队列中写消息,此时不需要阻塞队列。(不同队列,对应不同的文件,各自写各自的,就不会产生冲突了)

实现消息文件的垃圾回收

由于当前会不停的往消息文件中写入消息,并且删除消息只是逻辑删除,这就可能导致消息文件越来越大,并且里面又包含大量无效消息

此处的垃圾回收,使用复制算法

判定,当文件中信息总数超过2000,并且有效消息的数目不足50%,就要触发垃圾回收~,就把文件中所有有效消息取出来,单独的在写入到一个新的文件中,删除旧文件,使用新文件代替

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2374150.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

5.Redission

5.1 前文锁问题 基于 setnx 实现的分布式锁存在下面的问题&#xff1a; 重入问题&#xff1a;重入问题是指 获得锁的线程可以再次进入到相同的锁的代码块中&#xff0c;可重入锁的意义在于防止死锁&#xff0c;比如 HashTable 这样的代码中&#xff0c;他的方法都是使用 sync…

dify 部署后docker 配置文件修改

1&#xff1a;修改 复制 ./dify/docker/.env.example ./dify/docker/.env 添加一下内容 # 启用自定义模型 CUSTOM_MODEL_ENABLEDtrue# 将OLLAMA_API_BASE_URL 改为宿主机的物理ip OLLAMA_API_BASE_URLhttp://192.168.72.8:11434# vllm 的 OPENAI的兼容 API 地址 CUSTOM_MODE…

数据结构——排序(万字解说)初阶数据结构完

目录 1.排序 2.实现常见的排序算法 2.1 直接插入排序 ​编辑 2.2 希尔排序 2.3 直接选择排序 2.4 堆排序 2.5 冒泡排序 2.6 快速排序 2.6.1 递归版本 2.6.1.1 hoare版本 2.6.1.2 挖坑法 2.6.1.3 lomuto前后指针 2.6.1.4 时间复杂度 2.6.2 非递归版本 2.7 归并排序…

快速入门深度学习系列(3)----神经网络

本文只针对图进行解释重要内容 这就是入门所需要掌握的大部分内容 对于不懂的名词或概念 你可以及时去查 对于层数 标在上面 对于该层的第几个元素 标在下面 输入层算作第0层 对于第一层的w b 参数 维度如下w:4*3 b:4*1 这个叫做神经元 比如对于第一层的神经元 这里说的很…

在线工具源码_字典查询_汉语词典_成语查询_择吉黄历等255个工具数百万数据 养站神器,安装教程

在线工具源码_字典查询_汉语词典_成语查询_择吉黄历等255个工具数百万数据 养站神器&#xff0c;安装教程 资源宝分享&#xff1a;https://www.httple.net/154301.html 一次性打包涵盖200个常用工具&#xff01;无论是日常的图片处理、文件格式转换&#xff0c;还是实用的时间…

Linux 阻塞和非阻塞 I/O 简明指南

目录 声明 1. 阻塞和非阻塞简介 2. 等待队列 2.1 等待队列头 2.2 等待队列项 2.3 将队列项添加/移除等待队列头 2.4 等待唤醒 2.5 等待事件 3. 轮询 3.1 select函数 3.2 poll函数 3.3 epoll函数 4. Linux 驱动下的 poll 操作函数 声明 本博客所记录的关于正点原子…

Java开发经验——阿里巴巴编码规范经验总结2

摘要 这篇文章是关于Java开发中阿里巴巴编码规范的经验总结。它强调了避免使用Apache BeanUtils进行属性复制&#xff0c;因为它效率低下且类型转换不安全。推荐使用Spring BeanUtils、Hutool BeanUtil、MapStruct或手动赋值等替代方案。文章还指出不应在视图模板中加入复杂逻…

机器人手臂“听不懂“指令?Ethercat转PROFINET网关妙解通信僵局

机器人手臂"听不懂"指令&#xff1f;Ethercat转PROFINET网关妙解产线通信僵局 协作机器人&#xff08;如KUKA iiWA&#xff09;使用EtherCAT控制&#xff0c;与Profinet主站&#xff08;如西门子840D CNC&#xff09;同步动作。 客户反馈&#xff1a;基于Profinet…

深度学习 CNN

CNN 简介 什么是 CNN&#xff1f; 卷积神经网络&#xff08;Convolutional Neural Network&#xff09;是专为处理网格数据&#xff08;如图像&#xff09;设计的神经网络。核心组件&#xff1a; 卷积层 &#xff1a;提取局部特征&#xff08;如边缘、纹理&#xff09;通过卷…

MySQL索引原理以及SQL优化(二)

目录 1. 索引与约束 1.1 索引是什么 1.2 索引的目的 1.3 索引分类 1.3.1 数据结构 1.3.2 物理存储 1.3.3 列属性 1.3.4 列的个数 1.4 主键的选择 1.5 索引使用场景 1.6 索引的底层实现 1.6.1 索引存储 1.6.2 页 1.6.3 B 树 1.6.4 B 树层高问题 1.6.5 自增 id 1.7 innod…

MATLAB中矩阵和数组的区别

文章目录 前言环境配置1. 数据结构本质2. 运算规则&#xff08;1&#xff09;基本运算&#xff08;2&#xff09;特殊运算 3. 函数与操作4. 高维支持5. 创建方式 前言 在 MATLAB 中&#xff0c;矩阵&#xff08;Matrix&#xff09; 和 数组&#xff08;Array&#xff09; 的概…

Desfire Ev1\Ev2\Ev3卡DES\3K3DES\AES加解密读写C#示例源码

本示例使用的发卡器&#xff1a;https://item.taobao.com/item.htm?spma21dvs.23580594.0.0.1d292c1bYhsS9c&ftt&id917152255720 using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; using S…

MySQL核心内容【完结】

MySQL核心内容 文章目录 MySQL核心内容1.MySQL核心内容目录2.MySQL知识面扩展3.MySQL安装4.MySQL配置目录介绍Mysql配置远程ip连接 5.MySQL基础1.MySQL数据类型1.数值类型2.字符串类型3.日期和时间类型4.enum和set 2.MySQL运算符1.算数运算符2.逻辑运算符3.比较运算符 3.MySQL完…

C++类和对象进阶 —— 与数据结构的结合

&#x1f381;个人主页&#xff1a;工藤新一 &#x1f50d;系列专栏&#xff1a;C面向对象&#xff08;类和对象篇&#xff09; &#x1f31f;心中的天空之城&#xff0c;终会照亮我前方的路 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 文章目录 […

Django之账号登录及权限管理

账号登录及权限管理 目录 1.登录功能 2.退出登录 3.权限管理 4.代码展示合集 这篇文章, 会讲到如何实现账号登录。账号就是我们上一篇文章写的账号管理功能, 就使用那里面已经创建好的账号。这一次登录, 我们分为三种角色, 分别是员工, 领导, 管理员。不同的角色, 登录进去…

EXCEL中嵌入其他表格等文件

在EXCEL中嵌入其他表格 先放链接&#xff1a;https://jingyan.baidu.com/article/295430f11708c34d7e00509a.html 步骤如下&#xff1a; 1、打开一个需要嵌入新表格的excel表。 2、切换至“插入”菜单中&#xff0c;单击选择“对象”。 3、如下图所示&#xff0c;会弹出“对象…

21. LangChain金融领域:合同审查与风险预警自动化

引言&#xff1a;当AI成为24小时不眠的法律顾问 2025年某商业银行的智能合同系统&#xff0c;将百万级合同审查时间从平均3周缩短至9分钟&#xff0c;风险条款识别准确率达98.7%。本文将基于LangChain的金融法律框架&#xff0c;详解如何构建合规、精准、可追溯的智能风控体系…

Springboot使用事件流调用大模型接口

什么是事件流 事件流&#xff08;Event Stream&#xff09; 是一种处理和传递事件的方式&#xff0c;通常用于系统中的异步消息传递或实时数据流。在事件驱动架构&#xff08;Event-Driven Architecture&#xff09;中&#xff0c;事件流扮演着至关重要的角色。 事件流的概念…

计算机网络--2

TCP三次握手 TCP连接为什么需要三次握手 1. 由于网络情况复杂,可能会出现丢包现象,如果第二次握手的时候服务器就认为这个端口可用,然后一直开启,但是如果客户端未收到服务器发送的回复,那么就会重新发送请求,服务器就会重新开启一个端口连接,这样就会浪费一个端口。 三…

尤雨溪宣布:Vue 生态正式引入 AI

在前端开发领域,Vue 框架一直以其易用性和灵活性受到广大开发者的喜爱。 而如今,Vue 生态在人工智能(AI)领域的应用上又迈出了重要的一步。 尤雨溪近日宣布,Vue、Vite 和 Rolldown 的文档网站均已添加了llms.txt文件,这一举措旨在让大型语言模型(LLM)更方便地理解这些…