mq调用流程

创建消息转换器
package com.wd.config;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqMessageConvertConfig {
/**
* 公共的消息转换器
*
* @return MessageConverter
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
创建exchange交换机:普通交换机、延迟交换机、死信交换机
package com.wd.config;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqExchangeConfig {
public static final int DELAY_TIME = 20 * 1000;
/**
* 普通交换机名称
*/
public static final String EXCHANGE_NAME = "wd_exchange";
/**
* 延迟交换机名称
*/
public static final String DELAY_EXCHANGE_NAME = "wd_delay_exchange";
/**
* 死信交换机
*/
public static final String DEAD_EXCHANGE_NAME = "wd_dead_exchange";
@Bean
public DirectExchange exchange() {
return new DirectExchange(EXCHANGE_NAME, true, false);
}
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(DELAY_EXCHANGE_NAME, true, false);
}
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(DEAD_EXCHANGE_NAME, true, false);
}
}
创建master的connection
package com.wd.config.master;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqMasterConnectionConfig {
@Value("${rabbitmq.master.vhost}")
private String vhost;
@Value("${rabbitmq.master.addresses}")
private String addresses;
@Value("${rabbitmq.master.username}")
private String username;
@Value("${rabbitmq.master.password}")
private String password;
@Bean
public ConnectionFactory masterConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(addresses);
cachingConnectionFactory.setVirtualHost(vhost);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(@Qualifier("masterConnectionFactory") ConnectionFactory masterConnectionFactory,
MessageConverter messageConverter)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(masterConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
创建slave的connection
package com.wd.config.slave;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqSlaveConnectionConfig {
@Value("${rabbitmq.slave.vhost}")
private String vhost;
@Value("${rabbitmq.slave.addresses}")
private String addresses;
@Value("${rabbitmq.slave.username}")
private String username;
@Value("${rabbitmq.slave.password}")
private String password;
@Bean
public ConnectionFactory slaveConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
cachingConnectionFactory.setAddresses(addresses);
cachingConnectionFactory.setVirtualHost(vhost);
cachingConnectionFactory.setUsername(username);
cachingConnectionFactory.setPassword(password);
return cachingConnectionFactory;
}
@Bean
public RabbitTemplate slaveRabbitTemplate(@Qualifier("slaveConnectionFactory") ConnectionFactory slaveConnectionFactory,
MessageConverter messageConverter)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(slaveConnectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setMessageConverter(messageConverter);
return rabbitTemplate;
}
}
创建队列A: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;
@Configuration
public class QueueAConfig {
private static final String QUEUE_A_NAME = "wd_queue_a";
private static final String DELAY_QUEUE_A_NAME = "wd_delay_queue_a";
private static final String DEAD_QUEUE_A_NAME = "wd_dead_queue_a";
private static final String QUEUE_A_ROUTING_KEY = "queue_A_routing_key";
private static final String DELAY_QUEUE_A_ROUTING_KEY = "delay_queue_a_routing_key";
private static final String DEAD_LETTER_QUEUE_A_ROUTING_KEY = "dead_letter_queue_A_routing_key";
@Bean
public Queue queueA() {
return new Queue(QUEUE_A_NAME, true);
}
@Bean
public Binding queueABinding(@Qualifier("queueA") Queue queueA,
@Qualifier("exchange") DirectExchange exchange) {
return BindingBuilder.bind(queueA).to(exchange).with(QUEUE_A_ROUTING_KEY);
}
@Bean
public Queue delayQueueA() {
Map<String, Object> args = new HashMap<>();
//设置延迟队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//设置延迟队列绑定的死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_A_ROUTING_KEY);
//设置延迟队列的 TTL 消息存活时间
args.put("x-message-ttl", DELAY_TIME);
return new Queue(DELAY_QUEUE_A_NAME, true, false, false, args);
}
@Bean
public Binding delayQueueABinding(@Qualifier("delayQueueA") Queue delayQueueA,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueueA).to(delayExchange).with(DELAY_QUEUE_A_ROUTING_KEY);
}
@Bean
public Queue deadQueueA() {
return new Queue(DEAD_QUEUE_A_NAME, true);
}
@Bean
public Binding deadQueueABinding(@Qualifier("deadQueueA") Queue deadQueueA,
@Qualifier("deadExchange") DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueueA).to(deadExchange).with(DEAD_LETTER_QUEUE_A_ROUTING_KEY);
}
}
创建队列B: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;
@Configuration
public class QueueBConfig {
private static final String QUEUE_B_NAME = "wd_queue_b";
private static final String DELAY_QUEUE_B_NAME = "wd_delay_queue_b";
private static final String DEAD_QUEUE_B_NAME = "wd_dead_queue_b";
private static final String QUEUE_B_ROUTING_KEY = "queue_b_routing_key";
private static final String DELAY_QUEUE_B_ROUTING_KEY = "delay_queue_b_routing_key";
private static final String DEAD_LETTER_QUEUE_B_ROUTING_KEY = "dead_letter_queue_b_routing_key";
@Bean
public Queue queueB() {
return new Queue(QUEUE_B_NAME, true);
}
@Bean
public Binding queueBBinding(@Qualifier("queueB") Queue queueB,
@Qualifier("exchange") DirectExchange exchange) {
return BindingBuilder.bind(queueB).to(exchange).with(QUEUE_B_ROUTING_KEY);
}
@Bean
public Queue delayQueueB() {
Map<String, Object> args = new HashMap<>();
//设置延迟队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//设置延迟队列绑定的死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_B_ROUTING_KEY);
//设置延迟队列的 TTL 消息存活时间
args.put("x-message-ttl", DELAY_TIME);
return new Queue(DELAY_QUEUE_B_NAME, true, false, false, args);
}
@Bean
public Binding delayQueueBBinding(@Qualifier("delayQueueB") Queue delayQueueB,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueueB).to(delayExchange).with(DELAY_QUEUE_B_ROUTING_KEY);
}
@Bean
public Queue deadQueueB() {
return new Queue(DEAD_QUEUE_B_NAME, true);
}
@Bean
public Binding deadQueueABinding(@Qualifier("deadQueueB") Queue deadQueueB,
@Qualifier("deadExchange") DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueueB).to(deadExchange).with(DEAD_LETTER_QUEUE_B_ROUTING_KEY);
}
}
创建队列C: 分为普通队列、延迟队列、死信队列
package com.wd.config.queue;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
import static com.wd.config.RabbitmqExchangeConfig.DEAD_EXCHANGE_NAME;
import static com.wd.config.RabbitmqExchangeConfig.DELAY_TIME;
@Configuration
public class QueueCConfig {
private static final String QUEUE_C_NAME = "wd_queue_c";
private static final String DELAY_QUEUE_C_NAME = "wd_delay_queue_c";
private static final String DEAD_QUEUE_C_NAME = "wd_dead_queue_c";
private static final String QUEUE_C_ROUTING_KEY = "queue_c_routing_key";
private static final String DELAY_QUEUE_C_ROUTING_KEY = "delay_queue_c_routing_key";
private static final String DEAD_LETTER_QUEUE_C_ROUTING_KEY = "dead_letter_queue_c_routing_key";
@Bean
public Queue queueC() {
return new Queue(QUEUE_C_NAME, true);
}
@Bean
public Binding queueCBinding(@Qualifier("queueC") Queue queueC,
@Qualifier("exchange") DirectExchange exchange) {
return BindingBuilder.bind(queueC).to(exchange).with(QUEUE_C_ROUTING_KEY);
}
@Bean
public Queue delayQueueC() {
Map<String, Object> args = new HashMap<>();
//设置延迟队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME);
//设置延迟队列绑定的死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUE_C_ROUTING_KEY);
//设置延迟队列的 TTL 消息存活时间
args.put("x-message-ttl", DELAY_TIME);
return new Queue(DELAY_QUEUE_C_NAME, true, false, false, args);
}
@Bean
public Binding delayQueueCBinding(@Qualifier("delayQueueC") Queue delayQueueC,
@Qualifier("delayExchange") DirectExchange delayExchange) {
return BindingBuilder.bind(delayQueueC).to(delayExchange).with(DELAY_QUEUE_C_ROUTING_KEY);
}
@Bean
public Queue deadQueueC() {
return new Queue(DEAD_QUEUE_C_NAME, true);
}
@Bean
public Binding deadQueueCBinding(@Qualifier("deadQueueC") Queue deadQueueC,
@Qualifier("deadExchange") DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueueC).to(deadExchange).with(DEAD_LETTER_QUEUE_C_ROUTING_KEY);
}
}
创建master的消息监听RabbitListenerContainerFactory
后续使用注解 @RabbitListener 时指定ListenerContainerFactory
@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "masterListenerContainerFactory")
package com.wd.config.master;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqMasterListenerConfig {
@Bean
public SimpleRabbitListenerContainerFactory masterListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(value = "masterConnectionFactory") ConnectionFactory masterConnectionFactory,
MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置消息转换器
factory.setMessageConverter(messageConverter);
// 关闭自动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, masterConnectionFactory);
return factory;
}
}
创建slave的消息监听RabbitListenerContainerFactory
后续使用注解 @RabbitListener 时指定ListenerContainerFactory
@RabbitListener(queues = DEAD_LETTER_QUEUE_B, containerFactory = "slaveListenerContainerFactory")
package com.wd.config.slave;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitmqSlaveListenerConfig {
@Bean
public SimpleRabbitListenerContainerFactory slaveListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier(value = "slaveConnectionFactory") ConnectionFactory slaveConnectionFactory,
MessageConverter messageConverter) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
// 设置消息转换器
factory.setMessageConverter(messageConverter);
// 关闭自动ACK
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
configurer.configure(factory, slaveConnectionFactory);
return factory;
}
}



















