目录
docker
Java
导包
配置文件
Work Queues
消息堆积
消息生产者发送消息到队列
消息消费者接收消息
Fanout交换机
Direct交换机发送消息
用Java代码创建交换机和队列、绑定
Direct交换机
Direct交换机发送消息
用Java代码创建交换机和队列、绑定
基于注解声明队列和交换机
Topic交换机
Topic交换机发送消息
发送对象类型的消息
使用json序列化代替默认的jdk序列化
对象类型的消息对象的接收
RabbitMQ是一个高性能的异步通信组件;
docker
我部署在docker下
docker run \
-e RABBITMQ_DEFAULT_USER=root \ #用户名
-e RABBITMQ_DEFAULT_PASS=root \ #密码
-v mq-plugins:/plugins \ #目录挂载,mq-plugins是数据卷
--name mq \ #容器名
--hostname mq \ #主机名
-p 15672:15672 \ #端口映射
-p 5672:5672 \
--network hmall \ #网咯
-d \
rabbitmq:3.8-management
15672端口是控制台端口,图形化界面;
5672端口是通信的端口,发收消息用的;
publisher 消息的发送者
exchange 交换机,复制路由消息
queue 队列,存储信息
consumer 消息的消费者
virtual-host 虚拟主机,起数据隔离作用;每个项目可以建一个自己的虚拟主机;
Java
AMQP 是用于在应用程序之间传递业务消息的开放标准。该协议与平台无关,更符合微服务中独立性的要求;
Spring AMQP 是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息,是抽象;底层的实现默认是Spring-rabbitmq;
导包
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
消息提供者/消费者的配置文件
spring:
rabbitmq:
host: 192.168.88.130 # 主机名
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
Work Queues
消息堆积
默认情况下,RabbitMQ会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否以及处理完消息,可能会出现消息的堆积;
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完才能获取下一个消息
消息生产者发送消息到队列
生产者发送消息到队列而不是交换机;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue() {
String queueName = "simple.queue";
String msg = "hello ampq";
rabbitTemplate.convertAndSend(queueName, msg);
}
}
消息消费者接收消息
@Slf4j
@Component
public class MQListener {
@RabbitListener(queues = {"simple.queue"})
public void listenSimpleQueue(String msg) {
System.out.println("消费者收到了simple.queue队列的消息:【" + msg + "】");
}
}
Fanout交换机
Fanout交换机会把接收到的消息广播到每一个跟其绑定的队列,所以也叫广播模式;
Direct交换机发送消息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendFanout() {
String exchangeName = "hmall.fanout";
String msg = "hello every";
rabbitTemplate.convertAndSend(exchangeName, null, msg);
}
}
因为接收消息是一样的,所以这里就不赘述了;
用Java代码创建交换机和队列、绑定
一般在消费者项目声明
@Configuration
public class FanoutConfig {
/**
* 声明fanout交换机
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("hmall.fanout");
}
/**
* 第二种声明fanout交换机的写法
*
* @return
*/
@Bean
public FanoutExchange fanoutExchange1() {
return ExchangeBuilder.fanoutExchange("hmall.fanout2").build();
}
/**
* 声明一个队列
*
* @return
*/
@Bean
public Queue fanoutQueue() {
return new Queue("fanout.queue1");
}
/**
* 第二种声明队列的写法
*
* @return
*/
@Bean
public Queue fanoutQueue2() {
// 持久化
return QueueBuilder.durable("fanout.quque2").build();
}
/**
* 绑定队列和交换机
*
* @param fanoutQueue
* @param fanoutExchange
* @return
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
}
/**
* 第二种绑定队列和交换机的方法
*
* @return
*/
@Bean
public Binding bindingQueue2() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
}
Direct交换机
Direct交换机会将接收到的消息根据规则路由到指定的队列,因此称为定向路由;
每一个队列都与一个交换机设置一个BindingKey;
发布者发布消息时,指定消息的RoutingKey;
交换机将消息路由到BindingKey与RoutingKey一致的队列;
将BindingKey写成一致就可以实现广播消息的功能;
Direct交换机发送消息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendDirect() {
String exchangeName = "hmall.direct";
String routingKey = "blue";
String msg = "hello every";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
}
用Java代码创建交换机和队列、绑定
@Configuration
public class DirectConfig {
/**
* 声明direct交换机
*
* @return
*/
@Bean
public DirectExchange directExchange() {
return new DirectExchange("hmall.direct");
}
/**
* 第二种声明direct交换机的写法
*
* @return
*/
@Bean
public DirectExchange directExchange1() {
return ExchangeBuilder.directExchange("hmall.direct2").build();
}
/**
* 声明一个队列
*
* @return
*/
@Bean
public Queue directQueue() {
return new Queue("direct.queue1");
}
/**
* 第二种声明队列的写法
*
* @return
*/
@Bean
public Queue directQueue2() {
// 持久化
return QueueBuilder.durable("direct.quque2").build();
}
/**
* 绑定队列和交换机
*
* @param directQueue
* @param directExchange
* @return
*/
@Bean
public Binding bindingQueue3(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("red");
}
@Bean
public Binding bindingQueue4(Queue directQueue, DirectExchange directExchange) {
return BindingBuilder.bind(directQueue).to(directExchange).with("yellow");
}
}
基于注解声明队列和交换机
@Slf4j
@Component
public class MQListener {
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(name = "direct.queue3", declare = "true"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
)})
public void listenDirectQueue3(String msg) {
System.out.println("消费者3收到了direct.queue3队列的消息:【" + msg + "】");
}
}
Topic交换机
类似于direct交换机,与direct交换机的区别是,topic交换机的routingKey可以是多个单词的列表,并以 . 分割;
队列和交换机指定BindingKey时可以使用通配符;
# 代表0个或多个单词
* 代表一个单词
Topic交换机发送消息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendTopic() {
String exchangeName = "hmall.topic";
String routingKey = "china.news";
String msg = "这是一条消息通知";
rabbitTemplate.convertAndSend(exchangeName, routingKey, msg);
}
}
发送对象类型的消息
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendObject() {
Map<String, Object> msg = new HashMap<>();
msg.put("name", "jack");
msg.put("age", 21);
rabbitTemplate.convertAndSend("object.queue", msg);
}
}
使用json序列化代替默认的jdk序列化
<!--jackson-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
@Configuration
public class MessageConverterConfig {
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
对象类型的消息对象的接收
@Slf4j
@Component
public class MQListener {
@RabbitListener(queues = {"object.queue"})
public void listenObjecQueue(Map<String, Object> msg) {
System.out.println("消费者收到了object.queue队列的消息:【" + msg + "】");
}
}
消息的可靠性质
发送者的可靠性
消息发送时丢了
生产者重连
spring:
rabbitmq:
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长=initial-interval * multiplier
max-attempts: 3 # 最大重试次数
生产者确认
开启确认机制后,在MQ成功后收到消息后会返回确认消息给生产者。返回的结果有一下几种情况;
消息到了MQ,但路由失败。此时会通过PublisherReturn返回路由异常原因,然后返回ACK,告知投递成功;
临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功;
持久消息投递到了MQ,并且入队完成持久化,返回ACK,告知投递成功;
其他情况都会返回NACK,告知投递失败;
配置文件
spring:
rabbitmq:
publisher-confirm-type: correlated # MQ异步回调的方式返回回执消息
publisher-returns: true # 开启返回机制
配置类
@Slf4j
@Configuration
public class MqConfig implements ApplicationContextAware {
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);
// 配置回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.debug("收到消息的回调,exchange:{},key:{},msg:{},code:{},text:{}", returnedMessage.getExchange(),
returnedMessage.getRoutingKey(), returnedMessage.getMessage(),
returnedMessage.getReplyCode(), returnedMessage.getReplyText());
}
});
}
}
发送消息
@Test
void testConfirmCallback() throws InterruptedException {
// 1、创建id
// CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
CorrelationData cd = new CorrelationData();
// 2、添加confirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
log.error("消息回调失败", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
log.debug("收到confirm callback回执");
if (result.isAck()) {
log.info("消息发送成功,收到ack");
} else {
log.error("消息发送失败,收到nack,原因:{}", result.getReason());
}
}
});
rabbitTemplate.convertAndSend("hmall.direct", "red", "hello");
Thread.sleep(2000);
}
但是上面的代码我尝试了但回调方法始终没有触发;
MQ的可靠性
mq把消息丢了
在默认情况下,RabbitMQ会将接收到的消息保存在内存以降低消息的收发延迟。这样会导致两个问题:
一旦MQ宕机,内存中的消息会丢失;
内存空间有限,当消费者故障或处理过慢时,会导致消息积压,引发MQ阻塞;
数据持久化
交换机的持久化
@Test
void testPageOut() {
Message msg = MessageBuilder
.withBody("hello".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT).build(); // PERSISTENT持久化
for (int i = 0; i < 1E6; i++) {
rabbitTemplate.convertAndSend("simple.queue", msg);
}
}
Lazy Queue
惰性队列
接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认2048条);
消费者要消费消息时才会从磁盘中读取并加载到内存;
支持数百万的消息存储;
3.12版本后,所有队列都是惰性队列,无法更改;
消费者的可靠性
消费者把消息丢了
消费者确认机制
当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:
ack 成功处理消息,RabbitMQ从队列中删除该消息;
nack 消息处理失败,RabbitMQ需要再次投递消息;
reject 消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息;
SpringAMQP有三种ack方式
none 不处理,即消息投递给消费者后立刻ack,消息会立刻从MQ删除。非常不安全,不建议使用;
manual 手动模式,需要自己在业务代码中调用api,发送ack或reject,存在业务入侵,但更灵活;
auto 自动模式,SpringAMQP利用AOP对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回ack,当业务出现异常时,根据异常判断返回不同结果。
如果是业务异常,会自动返回nack;
如果是消息处理或校验异常,自动返回reject;
消息消费者配置文件
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto