目录
1.创建一个SpringBoot项目
2.创建核心类
2.1 Exchange类
2.2 MessageQueue类
2.3 Binding类
2.4 Message类
1.Message的组成
2.逻辑删除
3.工厂方法
4.序列化与反序列化
5.offsetBeg和offsetEnd
1.创建一个SpringBoot项目
1.点击
2.填写表单
3.添加依赖
2.创建核心类
创建好Spring Boot项目就可以开始创建核心类了
消息队列中核心概念:
1.交换机
2.队列
3.绑定
4.消息
都存在于Broker Server中
2.1 Exchange类
1.关于持久化的理解
// 交换机是否要持久化存储 true 需要持久化; false 不必持久化
private boolean durable = false;有些交换机,队列,绑定,是需要持久化的,有些则不需要,所以可以设置开关来让用户决定是否需要持久化
2.关于arguments的理解
RabbitMQ中 Exchange的 x-arguments
参考网址
https://www.rabbitmq.com/docs/exchanges#ae
2.2 MessageQueue类
所需要的属性及注释
// 队列的身份标识
private String name;
// 队列是否持久化 true 表示持久化保存
private boolean durable = false;
// 队列是否独占 ture 表示这个队列只能被一个消费者使用
// 并未实现该功能(留着扩展)(RabbitMQ实现了)
private boolean exclusive = false;
// 如果当前队列 没人(消费者)使用 就会自动删除
// 并未实现自动删除功能(留着扩展)(RabbitMQ实现了)
private boolean autoDelete = false;
// 创建队列时 指定的一些额外的参数选项
// 并未实现该功能(留着扩展)(RabbitMQ实现了)
private Map<String, Object> arguments = new HashMap<>();
2.3 Binding类
public class Binding {
private String exchangeName;
private String queueName;
// 支持 TOPIC主题交换机 相当于 出题角色
private String bindingKey;
public String getExchangeName() {
return exchangeName;
}
public void setExchangeName(String exchangeName) {
this.exchangeName = exchangeName;
}
public String getQueueName() {
return queueName;
}
public void setQueueName(String queueName) {
this.queueName = queueName;
}
public String getBindingKey() {
return bindingKey;
}
public void setBindingKey(String bindingKey) {
this.bindingKey = bindingKey;
}
}
为什么Binding没有durable 、autoDelete 这两个属性 ?
因为Exchange或者MessageQueue中有一个没有持久化,单独持久化他们的Binding是没有意义的。
2.4 Message类
public class Message implements Serializable {
// Massage 最核心的部分
private BasicProperties basicProperties = new BasicProperties();
private byte[] body;
// 以下是: 辅助属性
// 如果要存储到文件中(持久化)
// 一个文件会存储很多的消息, 如何找到某个消息, 在文件中的具体位置?
// 使用下列的 两个偏移量 来进行表示 [offsetBeg, offsetEnd)
// 这两个属性并不需要序列化保存到文件中
// 1.此时消息被写入文件后 所在的位置固定 并不需要存储这两个属性
// 2.属性存在的目的:为了让内存中的Message对象, 能够快速找到对应的硬盘上的Message的位置
private transient long offsetBeg = 0; // 消息数据的开头距离文件开头的位置偏移(字节)
private transient long offsetEnd = 0; // 消息数据的结尾距离文件开头的位置便宜(字节)
// 使用这个属性表示该消息在文件中是否是有效消息(逻辑删除)
// 0x1 表示有效 0x0 表示无效
private byte isValid = 0x1;
// 创建一个工厂方法 封装 创建Message对象的而过程
// 此方法 创建的Message对象 会 自动生成 唯一的MessageId
// 如果 routingKey 和 basicProperties 中的 routingKey 冲突了 以外面为主
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
// 此处加上 "M-" 前缀 区分不同实体的ID
message.setMessageId("M-" + UUID.randomUUID());
message.basicProperties.setRoutingKey(routingKey);
message.body = body;
return message;
}
public String getMessageId() {
return basicProperties.getMessageId();
}
public void setMessageId(String messageId) {
basicProperties.setMessageId(messageId);
}
public String getRoutingKey() {
return basicProperties.getRoutingKey();
}
public void setRoutingKey(String routingKey) {
basicProperties.setRoutingKey(routingKey);
}
public int getDeliverMode() {
return basicProperties.getDeliverMode();
}
public void setDeliverMode(int mode) {
basicProperties.setDeliverMode(mode);
}
public BasicProperties getBasicProperties() {
return basicProperties;
}
public void setBasicProperties(BasicProperties basicProperties) {
this.basicProperties = basicProperties;
}
public byte[] getBody() {
return body;
}
public void setBody(byte[] body) {
this.body = body;
}
public long getOffsetBeg() {
return offsetBeg;
}
public void setOffsetBeg(long offsetBeg) {
this.offsetBeg = offsetBeg;
}
public long getOffsetEnd() {
return offsetEnd;
}
public void setOffsetEnd(long offsetEnd) {
this.offsetEnd = offsetEnd;
}
public byte getIsValid() {
return isValid;
}
public void setIsValid(byte isValid) {
this.isValid = isValid;
}
}
1.Message的组成
1.属性部分 BasicProperties
2.正文部分 byte[]
为什么正文部分不用 String[]?
因为String[] 只能存储文本,所以采用byte[] 增加 通用性
2.逻辑删除
字段解析:private byte isValid = 0x1;
与数据库相同,删除数据使用逻辑删除。不是真正的删除而是标记为无效。
3.工厂方法
// 创建一个工厂方法 封装 创建Message对象的而过程
// 此方法 创建的Message对象 会 自动生成 唯一的MessageId
// 如果 routingKey 和 basicProperties 中的 routingKey 冲突了 以外面为主
public static Message createMessageWithId(String routingKey, BasicProperties basicProperties, byte[] body){
Message message = new Message();
if (basicProperties != null) {
message.setBasicProperties(basicProperties);
}
// 此处加上 "M-" 前缀 区分不同实体的ID
message.setMessageId("M-" + UUID.randomUUID());
message.basicProperties.setRoutingKey(routingKey);
message.body = body;
return message;
}
为什么不用构造方法?
使用构造方法也能达成同样目的,但是工厂方法可以让名字更清晰,毕竟工厂方法就是来填构造 方法不能随便取名字的坑。
4.序列化与反序列化
准备工作:实现一个Serializable接口(标记接口)
此处准备使用标准库自带的方式进行序列化和反序列化,为什么不使用Json方式?
Json的本质是文本格式
而此处的Message里面存储的是二进制
5.offsetBeg和offsetEnd
offsetBeg 和 offsetEnd 不需要序列化 ?
1.此时消息被写入文件后 所在的位置固定 并不需要存储这两个属性
2.属性存在的目的:为了让内存中的Message对象, 能够快速找到对应的硬盘上的Message的位置