目录
1. 直连交换机(Direct实战)
provider生产者(publisher)
consumer消费者
2. 主题交换机(Topic实战)
provider生产者(publisher)
consumer消费者
3. 扇形交换机(Fanout实战)
provider生产者(publisher)
consumer消费者
前言
想学习RabbitMQ基础的请阅读下边博文链接
RabbitMQ【基本使用】_JoneClassMate的博客-CSDN博客
1. 直连交换机(Direct实战)
provider生产者(publisher)
-
DirectConfig
package com.jmh.provider.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 蒋明辉
* @data 2022/11/25 19:02
*/
@Configuration
@SuppressWarnings("all")
public class DirectConfig {
/**
* 创建队列
*/
@Bean
public Queue directQueueA(){
return new Queue("directQueueA",true);
}
@Bean
public Queue directQueueB(){
return new Queue("directQueueB",true);
}
@Bean
public Queue directQueueC(){
return new Queue("directQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directExchange");
}
/**
* 设置队列和交换机的绑定
*/
@Bean
public Binding bindingA(){
return BindingBuilder.bind(directQueueA()).to(directExchange()).with("AA");
}
@Bean
public Binding bindingB(){
return BindingBuilder.bind(directQueueB()).to(directExchange()).with("BB");
}
@Bean
public Binding bindingC(){
return BindingBuilder.bind(directQueueC()).to(directExchange()).with("CC");
}
}
- controller
package com.jmh.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 蒋明辉
* @data 2022/11/25 19:08
*/
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 直连交换机
* @param key
* @return
*/
@RequestMapping("/directSend")
public String directSend(String key){
rabbitTemplate.convertAndSend("directExchange",key,"Hello World");
return "yes";
}
}
consumer消费者
-
DirectReceiverA
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueA")
@Slf4j
public class DirectReceiverA {
@RabbitHandler
public void info(String msg){
log.info("A接收到了"+msg);
}
}
- DirectReceiverB
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueB")
@Slf4j
public class DirectReceiverB {
@RabbitHandler
public void info(String msg){
log.info("B接收到了"+msg);
}
}
- DirectReceiverC
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "directQueueC")
@Slf4j
public class DirectReceiverC {
@RabbitHandler
public void info(String msg){
log.info("C接收到了"+msg);
}
}
2. 主题交换机(Topic实战)
provider生产者(publisher)
-
TopicConfig
package com.jmh.provider.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 蒋明辉
* @data 2022/11/25 19:02
*/
@Configuration
@SuppressWarnings("all")
public class TopicConfig {
private static final String KEY_A="*.a.*";
private static final String KEY_B="*.*.a";
private static final String KEY_C="a.#";
/**
* 创建队列
*/
@Bean
public Queue topicQueueA(){
return new Queue("topicQueueA",true);
}
@Bean
public Queue topicQueueB(){
return new Queue("topicQueueB",true);
}
@Bean
public Queue topicQueueC(){
return new Queue("topicQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}
/**
* 设置队列和交换机的绑定
*/
@Bean
public Binding topicBindingA(){
return BindingBuilder.bind(topicQueueA()).to(topicExchange()).with(KEY_A);
}
@Bean
public Binding topicBindingB(){
return BindingBuilder.bind(topicQueueB()).to(topicExchange()).with(KEY_B);
}
@Bean
public Binding topicBindingC(){
return BindingBuilder.bind(topicQueueC()).to(topicExchange()).with(KEY_C);
}
}
- controller
package com.jmh.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 蒋明辉
* @data 2022/11/25 19:08
*/
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 主题交换机
* @param key
* @return
*/
@RequestMapping("/topicSend")
public String topicSend(String key){
rabbitTemplate.convertAndSend("topicExchange",key,"Hello World");
return "yes";
}
}
consumer消费者
-
TopicReceiverA
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueA")
@Slf4j
public class TopicReceiverA {
@RabbitHandler
public void info(String msg){
log.info("A接收到了"+msg);
}
}
- TopicReceiverB
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueB")
@Slf4j
public class TopicReceiverB {
@RabbitHandler
public void info(String msg){
log.info("B接收到了"+msg);
}
}
- TopicReceiverC
package com.jmh.consumer.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author 蒋明辉
* @data 2022/11/25 19:12
*/
@Component
@SuppressWarnings("all")
@RabbitListener(queues = "topicQueueC")
@Slf4j
public class TopicReceiverC {
@RabbitHandler
public void info(String msg){
log.info("C接收到了"+msg);
}
}
3. 扇形交换机(Fanout实战)
provider生产者(publisher)
-
FanoutConfig
package com.jmh.provider.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 蒋明辉
* @data 2022/11/25 19:02
*/
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
/**
* 创建队列
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}
@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}
@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 设置队列和交换机的绑定
*/
@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}
- controller
package com.jmh.provider.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @author 蒋明辉
* @data 2022/11/25 19:08
*/
@RestController
@SuppressWarnings("all")
public class ProviderController {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 扇形交换机
* @param key
* @return
*/
@RequestMapping("/fanoutSend")
public String fanoutSend(){
rabbitTemplate.convertAndSend("fanoutExchange",null,"Hello World");
return "yes";
}
}
consumer消费者
-
FanoutConfig
package com.jmh.provider.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author 蒋明辉
* @data 2022/11/25 19:02
*/
@Configuration
@SuppressWarnings("all")
public class FanoutConfig {
/**
* 创建队列
*/
@Bean
public Queue fanoutQueueA(){
return new Queue("fanoutQueueA",true);
}
@Bean
public Queue fanoutQueueB(){
return new Queue("fanoutQueueB",true);
}
@Bean
public Queue fanoutQueueC(){
return new Queue("fanoutQueueC",true);
}
/**
* 创建交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
/**
* 设置队列和交换机的绑定
*/
@Bean
public Binding fanoutBindingA(){
return BindingBuilder.bind(fanoutQueueA()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingB(){
return BindingBuilder.bind(fanoutQueueB()).to(fanoutExchange());
}
@Bean
public Binding fanoutBindingC(){
return BindingBuilder.bind(fanoutQueueC()).to(fanoutExchange());
}
}