在并发量很高的时候,服务端处理不过来客户端发的请求,这个时候可以使用消息队列,实现削峰。原理就是请求先打到队列上,服务端从队列里取出消息进行处理,处理不过来的消息就堆积在消息队列里等待。
可以模拟一下这个过程:
发送方把10万条消息在短时间内发送到消息队列
接收方把这些消息存储到数据库
目录
一、具体实现
1. 创建两个spring项目
2. 分别引入 RabbitMQ 的依赖
3. 配置文件中配置RabbitMQ的信息(这里是.yml文件的格式)
4. 发送方 Sender
发送消息
多线程实现Runnable接口
测试类
5. 接收方
二、结果
一、具体实现
1. 创建两个spring项目
2. 分别引入 RabbitMQ 的依赖
<!--        rabbitMQ 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>3. 配置文件中配置RabbitMQ的信息(这里是.yml文件的格式)
spring:
  rabbitmq:
    host: 这里写IP地址
    port: 5672 #端口号
    username: 用户名
    password: 密码
    virtual-host: /4. 发送方 Sender
发送消息
@Component
public class Sender {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    public void send(String str) {
        //first是消息队列的名称
        rabbitTemplate.convertAndSend("first", str);
    }
}多线程实现Runnable接口
@Data
public class MyRunnable implements Runnable {
    private String str;
    private Sender sender;
    private CountDownLatch countDownLatch;
    public MyRunnable(CountDownLatch countDownLatch,Sender sender, String str){
            this.countDownLatch = countDownLatch;
        this.sender = sender;
        this.str = str;
    }
    @Override
    public void run() {
        try {
            String threadName = Thread.currentThread().getName();
            //发送线程名和消息
            sender.send(threadName + " " + str);
            //控制台输出
            System.out.println(threadName + " " + str);
            countDownLatch.countDown();
        } catch (Exception e) {
            System.out.println(new Date() + " Send mq message failed.");
            e.printStackTrace();
        }
    }
}测试类
@SpringBootTest
class Rabbitmq1ApplicationTests {
    @Autowired
    Sender sender;
    @Test
    void contextLoads() throws InterruptedException {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10,
                300, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
        //循环100000次,模拟10万条消息
        final int count = 100000;
        CountDownLatch countDownLatch = new CountDownLatch(count);
        for(int i = 0; i < count; i++){
            MyRunnable myRunnable = new MyRunnable(countDownLatch,sender,"" + i);
            threadPoolExecutor.execute(myRunnable);
        }
        countDownLatch.await();
    }
}5. 接收方
@Component
@RabbitListener(queues = "first")
public class Receiver {
    @Autowired
    MessageService messageService;
    @RabbitHandler
    public void process(String str){
        //控制台输出
        System.out.println("msg: " + str);
        //对消息的处理,字符串分割
        String[] strings = str.split(" ");
        Message message = new Message(strings[0],strings[1]);
        //插入数据库
        messageService.insert(message);
    }
}这里@RabbitListener注解监听着 first 队列 ,当有收到消息的时候,就交给 @RabbitHandler 的方法处理。
二、结果
可以看到发送方很快就发送完毕了,接收方这边还在慢慢的处理中
过了一会。。。

又过了一会。。。

完美地发挥了削峰的作用。


















