文章目录
- 一、创建SpringBoot项目
- 二、创建核心类
- 创建 Exchange类
- 创建 MSGQueue类
- 创建 Binding类
- 创建Message类
 
一、创建SpringBoot项目
在项目中添加这四个依赖!
 
二、创建核心类
交换机 :Exchange
 队列 :Queue
 绑定关系: Binding
 消息 :Message
 这些核心类都存在于 BrokerServer 中.
先创建出服务器与客户端的包.
 再在服务器中创建 core包,用来存放这些核心类.
 
创建 Exchange类
首先考虑,咱们在此处共实现了三种交换机类型,所以咱们可以创建一个枚举类来表示交换机类型.
/**
 * 表示交换机类型
 */
public enum ExchangeType {
    DIRECT(0),
    FANOUT(1),
    TOPIC(2);
    private final int type;
    private ExchangeType(int type) {
        this.type = type;
    }
    public int getType() {
        return type;
    }
}
咱们再考虑,Exchange类中有哪些属性?
- 1.name,当作交换机的唯一身份标识
- 2.ExchangeType,表示交换机类型
- 3.durable,表示这个交换机是否需要持久化存储
- 4.autoDelete,表示该交换机在无人使用后,是否会自动删除
- 5.arguments,表示后续的一些拓展功能
/**
 * 表示一个交换机
 * 交换机的使用者是生产者
 */
@Data
public class Exchange {
    // 此处使用 name 作为交换机的身份标识,(唯一的)
    private String name;
    // 交换机类型,DIRECT,FANOUT,TOPIC
    private ExchangeType type = ExchangeType.DIRECT;
    // 该交换机是否要持久化存储,true表示要,false表示不要
    private boolean durable = false;
    // 如果当前交换机,没人使用了,就会自动删除
    // 这个属性暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
    private boolean autoDelete = false;
    // arguments 表示的是创建交换机时指定的一些额外参数
    // 这个属性也暂时放在这(后续代码中没有实现,RabbitMQ中实现了)
    // 为了把这个 arguments 存到数据库中,需要将 arguments 转换为 json 格式的字符串
    private Map<String,Object> arguments = new HashMap<>();
    // 这里的 get set 用于与数据库交互使用
    public String getArguments() {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            // 将 arguments 按照 JSON 格式 转换成 字符串
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        // 如果代码抛出异常,返回一个空的 json 字符串
        return "{}";
    }
    public void setArguments(String arguments) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            // 将库中的 arguments 按照 JSON 格式解析,转换成 Map 对象
            this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
    public void setArguments(Map<String,Object> arguments) {
        this.arguments = arguments;
    }
    // 这里的 get set ,用来更方便的获取/设置 arguments 中的键值对
    // 这一组 getter setter 是在Java内部代码使用的(比如测试的时候)
    public Object getArguments(String key) {
        return arguments.get(key);
    }
    public void setArguments(String key,Object value) {
        arguments.put(key, value);
    }
}
创建 MSGQueue类
MSGQueue类中有哪些属性?
 与Exchange类大差不差.
 直接贴代码
/**
 * 表示一个存储消息的队列
 * MSG =》Message
 * 消息队列的使用者是消费者
 */
@Data
public class MSGQueue {
    // 表示队列的身份标识
    private String name;
    // 表示队列是否持久化
    private boolean durable = false;
    // true -> 这个队列只能被一个消费者使用,false -> 大家都能使用这个队列
    // 后续代码不实现相关功能
    private boolean exclusive = false;
    // true -> 没人使用后,自动删除,false -> 没人使用,不自动删除
    private boolean autoDelete = false;
    // 表示扩展参数,后续代码没有实现
    private Map<String,Object> arguments = new HashMap<>();
    public String getArguments() {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            return objectMapper.writeValueAsString(arguments);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        return null;
    }
    public void setArguments(String arguments) {
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            this.arguments = objectMapper.readValue(arguments, new TypeReference<HashMap<String,Object>>() {});
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
    }
    public void setArguments(Map<String,Object> arguments) {
        this.arguments = arguments;
    }
    public Object getArguments(String key) {
        return arguments.get(key);
    }
    public void setArguments(String key,Object value) {
        arguments.put(key, value);
    }
}
创建 Binding类
/**
 * 表示队列和交换机之间的绑定关系
 */
@Data
public class Binding {
    private String exchangeName;
    private String queueName;
    // 主题交换机的匹配key
    private String bindingKey;
}
创建Message类
Message类,大致可以分为三个部分.
- 消息自身的属性
- 消息的正文
- 消息的持久化存储所需属性
我们新建一个 BasicProperties 类来表示 消息的属性.
/**
 * 这个类表示消息的属性
 */
@Data					// 实现 Serializable 接口是为了后续的序列化操作
public class BasicProperties implements Serializable {
    // 消息的唯一身份标识
    private String messageId;
    // 如果当前交换机是 DIRECT,此时 routingKey 表示要转发的队列名
    // 如果当前交换机是 FANOUT,此时 routingKey 无意义
    // 如果当前交换机是 TOPIC,此时 routingKey 就要和bindingKey进行匹配,匹配成功才转发给对应的消息队列
    private String routingKey;
    // 这个属性表示消息是否要持久化,1表示不持久化,2 表示持久化
    private int deliverMode = 1;
}
持久化存储会在下面讲到,莫慌.
/**
 * 这个类表示一个消息
 */
@Data					// 实现 Serializable 接口是为了后续的序列化操作
public class Message implements Serializable {
    // 消息的属性
    private BasicProperties basicProperties = new BasicProperties();
    // 消息的正文
    private byte[] body;
    // 相当于消息的版本号,主要针对 Message 类有改动后,再去反序列化之前旧的 message时,可能会出现错误
    // 因此引入消息版本号,如果版本号不匹配,就不允许反序列化直接报错,来告知程序猿,后续代码中并未实现该功能
    private static final long serialVersionUid = 1L;
    // 下面的属性是持久化存储需要的属性
    // 消息存储到文件中,使用一下两个偏移量来确定消息在文件中的位置 [offsetBeg,offsetEnd)
    // 这两个属性不需要 序列化 存储到文件中,存储到文件中后位置就固定了,
    // 这两个属性的作用是让 内存 中的 message 能顺利找到 文件 中的 message
    // 被 transient 修饰的属性,不会被 标准库 的 序列化方式 序列化
    private transient long offsetBeg = 0; // 消息数据的开头举例文件开头的位置偏移(字节)
    private transient long offsetEnd = 0; // 消息数据的结尾举例文件开头的位置偏移(字节)
    // 使用这个属性表示该消息在文件中是否是有效信息(逻辑删除)
    // 0x1表示有效,0x0表示无效
    private byte isValid = 0x1;
    // 创建工厂方法,让工厂方法封装 new Message 对象的过程
    // 该方法创建的 Message 对象,会自动生成唯一的MessageId
    public static Message createMessageWithId(String routingKey,BasicProperties basicProperties,byte[] body) {
        Message message = new Message();
        if (basicProperties != null) {
            message.setBasicProperties(basicProperties);
        }
        message.basicProperties.setRoutingKey(routingKey);
        // 此处生成的 MessageId 以 M- 作为前缀
        message.setMessageId("M-" + UUID.randomUUID());
        message.setBody(body);
        // 此处先将 message的核心部分 basicProperties 与 body设置了
        // 而 offsetBeg,offsetEnd,isValid,这些属性是持久化时才设置的
        return message;
    }
    // 直接获取消息id
    public String getMessageId() {
        return basicProperties.getMessageId();
    }
    // 直接更改消息id
    public void setMessageId(String messageId) {
        basicProperties.setMessageId(messageId);
    }
    // 直接获取 消息的key
    public String getRoutingKey() {
        return basicProperties.getRoutingKey();
    }
    // 直接更改 消息的key
    public void setRoutingKey(String routingKey) {
        basicProperties.setRoutingKey(routingKey);
    }
    // 直接获取 消息的是否持久化存储字段
    public int getDeliverMode() {
        return basicProperties.getDeliverMode();
    }
    // 直接修改 消息的是否持久化存储字段
    public void setDeliverMode(int mode) {
        basicProperties.setDeliverMode(mode);
    }
}

这些核心类就都建好了,下篇文章就来考虑他们的持久化存储与内存存储!



















