目录
前言
1. 生产者
2. 消费者
3. 启动消息队列服务器
4. 运行效果
结语
前言
在上一章节,我们完成了消息队列的客户端部分,至此我们整个消息队列项目就构建完成了,那我们做的这个消息队列到底有什么效果,以及如何去使用我们自己的消息队列呢?那么本文,就将我们的MQ进行实战操作,写一个基于MQ的生产者消费者模型.本项目全部代码已上传Gitee,链接放在文章末尾,欢迎大家访问!

1. 生产者
我们的生产者就是一个客户端,需要将自己生产出来的消息发送到消息队列中,供消费者进行使用.
我们创建一个生产者,在服务器端创建交换机(直接),队列,然后往对应的队列进行投递消息.
1. 实例化创建连接的工厂类
2. 设置消息队列服务器的IP地址以及端口号
3. 新建一个连接,创建Channel,交换机,队列
4. 新建一个消息转换成字节文件进行发送,此时给线程一个休眠的时间,确保已经发送到消息队列服务器
5. 关闭通道,关闭连接
package com.example.demo.demo;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.ExchangeType;
import java.io.IOException;
/**
 * Created with IntelliJ IDEA.
 * Description:生产者  通常是一个单独的服务器程序
 * User: YAO
 * Date: 2023-08-03
 * Time: 16:06
 */
public class DemoProducer {
    public static void main(String[] args) throws IOException, InterruptedException {
        System.out.println("启动生产者");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        // 创建交换机和队列
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);
        // 创建一个消息并发送
        byte[] body = "hello".getBytes();
        boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
        System.out.println("消息投递完成! ok=" + ok);
        Thread.sleep(500);
        channel.close();
        connection.close();
    }
}
 
2. 消费者
消费者也是客户端,所做的前期工作是一样的,只不过是发送的请求不同.
1. 消费者需要进行订阅消息,接收到消息之后,执行回调进行消费消息.
2. 消费者需要循环等待消息队列的响应,等待消费.
package com.example.demo.demo;
import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqserver.core.BasicProperties;
import com.example.demo.mqserver.core.ExchangeType;
import java.io.IOException;
/**
 * Created with IntelliJ IDEA.
 * Description:消费者  通常是一个单独的服务器程序
 * User: YAO
 * Date: 2023-08-03
 * Time: 16:07
 */
public class DemoConsumer {
    public static void main(String[] args) throws MqException, InterruptedException, IOException {
        System.out.println("启动消费者!");
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(9090);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
        channel.queueDeclare("testQueue", true, false, false, null);
        channel.basicConsume("testQueue", true, new Consumer() {
            @Override
            public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
                System.out.println("[消费数据] 开始!");
                System.out.println("consumerTag=" + consumerTag);
                System.out.println("basicProperties=" + basicProperties);
                String bodyString = new String(body, 0, body.length);
                System.out.println("body=" + bodyString);
                System.out.println("[消费数据] 结束!");
            }
        });
        // 由于消费者也不知道生产者要生产多少, 就在这里通过这个循环模拟一直等待消费.
        while (true) {
            Thread.sleep(500);
        }
    }
} 
3. 启动消息队列服务器
在Spring Boot 项目的启动类中,实例化Broker Server,传入端口号,进行启动服务器.
package com.example.demo;
import com.example.demo.mqserver.BrokerServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import java.io.IOException;
@SpringBootApplication
public class DemoApplication {
	public static ConfigurableApplicationContext context;
	public static void main(String[] args) throws IOException {
		context = SpringApplication.run(DemoApplication.class, args);
		BrokerServer brokerServer = new BrokerServer(9090);
		brokerServer.start();
	}
}
 
4. 运行效果
1. 服务器启动:

2. 此时如果再重启服务器,会提示数据库已经存在,就会将数据恢复到内存

3. 启动生产者进行投递消息

上述就是按照我们自定义的应用层协议进行发送请求.
我们再来看服务器这边的日志:

4. 启动消费者进行消费消息

我们再来看服务器这边日志

结语
以上就是一个简单的Demo,实现了基于MQ的生产者消费者模型.其他的功能,大家可以在做完这个项目之后自行进行测试.至此这个消息队列的项目就全部完结了,内容还是很多的,希望可以通过这个系列能够帮助到大家去了解消息队列的实现原理.也希望大家能够有所收获,那就到这里吧.接下来就要开始新的项目了(实现论坛系统),又是一个挑战,我们一起加油!❤️
完整的项目代码已上传Gitee,欢迎大家访问.👇👇👇
模拟实现消息队列
https://gitee.com/yao-fa/advanced-java-ee/tree/master/My-mq













![P4381 [IOI2008] Island (求基环树直径)](https://img-blog.csdnimg.cn/9bad0e782a124e44921beff6fbc798b8.jpeg#pic_center)





