一、数据丢失
第一种:生产者弄丢了数据。生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题,都有可能。
第二种:RabbitMQ 弄丢了数据。MQ还没有持久化自己挂了。
第三种:消费端弄丢了数据。刚消费到,还没处理,结果进程挂了,比如重启了。
解决方案
消费者从队列中获取到数据,应答成功之后,队列认为消费者对消息处理完成,从队列中删除消息。
生产者
mq告知生产者,消息接收成功,没有成功,生产者可以继续重新发送消息
RabbitMQ有两种方式来解决这个问题:
-
通过AMQP提供的事务机制实现(同步,不推荐);
-
使用发送者确认模式实现(confirm,异步);
事务使用
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:
-
channel.txSelect()声明启动事务模式;
-
channel.txCommit()提交事务;
-
channel.txRollback()回滚事务;
public class Product {
public static void main(String[] args) throws IOException, TimeoutException {
Connection conn = ConnUtils.getConn();
Channel channel = conn.createChannel();
String queueName = "my-que";
// queueDeclare(String queue队列名, boolean durable持久化, boolean exclusive是否排外, boolean autoDelete自动删除,Map<String, Object> arguments队列其他属性信息)
channel.queueDeclare(queueName,false,false,false,null);
String message = "hello rabbitmq";
try{
// 开启事务
channel.txSelect();
// 发布消息
channel.basicPublish("",queueName,null,message.getBytes());
// 提交事务
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
// 回滚事务
channel.txRollback();
}
channel.close();
conn.close();
}
}
Confirm发送方确认模式
Confirm发送方确认模式使用和事务类似,也是通过设置Channel进行发送方确认的。
Confirm的三种实现方式:
-
channel.waitForConfirms()普通发送方确认模式,单条数据;
-
channel.waitForConfirmsOrDie()批量确认模式,多条数据;
-
channel.addConfirmListener()异步监听发送方确认模式;
public class Product {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
Connection conn = ConnUtils.getConn();
// 创建信道:处理消息
Channel channel = conn.createChannel();
// 创建队列:存储消息
String queueName = "my-que";
// queueDeclare(String queue队列名, boolean durable持久化, boolean exclusive是否排外, boolean autoDelete自动删除,Map<String, Object> arguments队列其他属性信息)
channel.queueDeclare(queueName, false, false, false, null);
// 定义消息
String message = "hello rabbitmq";
// 开启发送方确认模式
channel.confirmSelect();
// basicPublish(交换器,序列名,参数信息,消息)
channel.basicPublish("", queueName, null, message.getBytes());
if(channel.waitForConfirms()){
System.out.println("发送成功");
}
channel.close();
conn.close();
}
}
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
channel.basicPublish("", queueName, null, message.getBytes());
}
// 直到所有信息都发布,只要有一个未确认就会IOException
channel.waitForConfirmsOrDie();
System.out.println("全部执行完成");
// 开启发送方确认模式
channel.confirmSelect();
for (int i = 0; i < 10; i++) {
channel.basicPublish("", queueName, null, message.getBytes());
}
//异步监听确认和未确认的消息
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("未确认消息,标识:" + deliveryTag);
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));
}
});
System.out.println("执行操作...");
MQ
rabbitmq做持久化,持久化主要分为:
-
交换器的持久化
// 参数1 exchange :交换器名 // 参数2 type :交换器类型 // 参数3 durable :是否持久化 channel.exchangeDeclare(EXCHANGE_NAME, "topic", true);
-
队列持久化
// 参数1 queue :队列名 // 参数2 durable :是否持久化 // 参数3 exclusive :仅创建者可以使用的私有队列,断开后自动删除 // 参数4 autoDelete : 当所有消费客户端连接断开后,是否自动删除队列 // 参数5 arguments channel.queueDeclare(QUEUE_NAME, true, false, false, null);
-
消息持久化
// 参数1 exchange :交换器 // 参数2 routingKey : 路由键 // 参数3 props : 消息的其他参数,其中 MessageProperties.PERSISTENT_TEXT_PLAIN 表示持久化 // 参数4 body : 消息体 channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
消费者
关闭自动应答,改为手动应答,处理完成,MQ中再做删除
// 消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@SneakyThrows
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("deliveryTag:" + envelope.getDeliveryTag());
Thread.sleep(1000);
System.out.println("获取到的消息内容:" + new String(body));
// 手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
// false:关闭自动应答
channel.basicConsume(queueName, false, consumer);
二、死信队列
-
消息过期会自动删除
-
消费者立刻接收消息进行处理
消费者立刻接收到数据处理,需求队列得数据不立刻被消费者处理,而是希望等待一段时间才被处理

死信队列:是延迟队列得一种实现方式,不是一种特殊得队列,死信交换机+消息有效期
-
交换机的类型:fanout,direct,topic,header
-
队列:不区分类型,也是一个普通得队列
-
死信交换机:将一个交换机设置为某一个队列的死信交换机,当前队列过期的消息会发送给交换机
-
消息:有效期
消息传递顺序
生产者发布消息 --> 交换机(转发消息) --> 队列(接收存储消息) --> 消费者(接收处理消息)
队列转发消息:
-
正常消息:转给消费者处理;
-
过期消息:默认删除,也可以通过配置(死信交换机)转发给交换机;
交换机的消息来源:
-
生产者发布的(普通得生产者)
-
其他队列到期删除的消息自动进入队列(队列需要绑定交换机,队列发布)
队列指的内容:
-
消息来源都是从交换机发送得
死信队列:交换机B作为队列A的死信交换机,接收队列A中被删除的消息,实现队列A中的消息延迟处理的效果
死信队列DLX(dead-letter-exchange),利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新投递到另一个exchange上,这个exchange就是DLX,似乎叫死信交换机更加贴切,当死信投递到这个exchange后,我们也可以用一个queue来绑定该exchange,该exchange就可以根据路由规则把这个死信路由到这个queue了,同样可以创建消费端去进行消费,以便对死信进行相应的处理。
延时队列:实际是不存在直接可用的延时队列,可通过死信消息和死信队列来实现延时队列的功能。RabbitMQ中的所有队列,消息存进来,立刻分发给对应的消费者进行处理。
死信交换机: DLX 全称(Dead-Letter-Exchange)。其实它是个普通的交换机,但它是设置在队列上某个参数的值对应的交换机。
死信队列:如果某个队列上存在参数:x-dead-letter-exchange, 当这个队列里的消息变成死信消息(dead message)后会被重新Pushlish到 x-dead-letter-exchange 所对应参数值的交换机上,跟这个交换机所绑定的队列就是死信队列。
为什么使用延迟队列
订单五分钟内支付:未支付订单超时
以前:1.用户做支付时,判断是否超时,修改订单状态;(不调用)
2.定时任务,每一秒刷新一次;(频繁刷新)
延时队列的应用场景很多,在我的项目开发中也涉及到很多,例如:订单五分钟未支付自动取消、订单准备超时30分钟推送提醒给门店、订单完成后两小时推送评价邀请给用户等等,这些间隔指定时间后的操作都可以使用延时队列。
消息变成死信的情况
死信消息:
-
消息被拒绝(basic.reject / basic.nack),并且requeue = false(不重回队列)。
-
消息TTL过期。
-
队列达到最大长度。
过期消息:RabbitMq 有两种设置消息过期的方式:
-
创建队列时通过 x-message-ttl 参数指定该队列消息的过期时间,这种队列里的消息过期时间全部相同。
-
生产者Pushlish消息时,通过设置消息的 expiration 参数指定过期时间,每个消息的过期时间都不一样。
如果两者同时使用,过期时间按照小的一方为准,两种方式设置的时间都是 毫秒。
Queue参数信息
-
name: 队列的名称;
-
actualName: 队列的真实名称,默认用name参数,如果name为空,则根据规则生成一个;
-
durable: 是否持久化;
-
exclusive: 是否独享、排外的;
-
autoDelete: 是否自动删除;
-
arguments:队列的其他属性参数,有如下可选项,可参看图2的arguments:
-
x-message-ttl:消息的过期时间,单位:毫秒;
-
x-expires:队列过期时间,队列在多长时间未被访问将被删除,单位:毫秒;
-
x-max-length:队列最大长度,超过该最大值,则将从队列头部开始删除消息;
-
x-max-length-bytes:队列消息内容占用最大空间,受限于内存大小,超过该阈值则从队列头部开始删除消息;
-
x-overflow:设置队列溢出行为。这决定了当达到队列的最大长度时消息会发生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁队列类型仅支持drop-head;
-
x-dead-letter-exchange:死信交换器名称,过期或被删除(因队列长度超长或因空间超出阈值)的消息可指定发送到该交换器中;
-
x-dead-letter-routing-key:死信消息路由键,在消息发送到死信交换器时会使用该路由键,如果不设置,则使用消息的原来的路由键值
-
x-single-active-consumer:表示队列是否是单一活动消费者,true时,注册的消费组内只有一个消费者消费消息,其他被忽略,false时消息循环分发给所有消费者(默认false)
-
x-max-priority:队列要支持的最大优先级数;如果未设置,队列将不支持消息优先级;
-
x-queue-mode(Lazy mode):将队列设置为延迟模式,在磁盘上保留尽可能多的消息,以减少RAM的使用;如果未设置,队列将保留内存缓存以尽可能快地传递消息;
-
x-queue-master-locator:在集群模式下设置镜像队列的主节点信息。
-
交换机参数信息
类型
-
FanoutExchange :扇形 交换机
-
DirectExchange :直连交换机 routing模式用的交换机
-
TopicExchange :主题模式交换机
参数信息
-
name:交换机名称
-
durable:是否把交换机持久化到磁盘上
-
autoDelete :是否自动删除交换机 用法有点类似上面的队列中的autoDelete 只有所有队列都和交换机接触订阅,说白了就是所有绑定到交换机上的队列不再需要改交换机,他就该死了,有点残酷
-
artuments :额外参数
实现
生产者
发布消息,设置消息过期时间
public class Producer {
// default exchange
private static String exchange = "test_dlx_exchange";
// default exchange 的路由规则: routingKey(test) 将匹配同名的 queue(test)
private static String routingKey = "dlx.abc";
public static void main(String[] args) throws IOException, TimeoutException {
// 1 创建Connection
Connection connection = ConnUtils.getConn();
// 2 创建Channel
Channel channel = connection.createChannel();
// 3 发送消息
for (int i = 0; i < 5; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
.deliveryMode(2)
.contentEncoding("UTF-8")
// expiration:消息的失效时间
.expiration("50000")
.build();
String msg = "RabbitMQ: dlx message" + i;
channel.basicPublish(exchange , routingKey , properties , msg.getBytes());
}
// 5 关闭连接
channel.close();
connection.close();
}
}
正常消费者
绑定死信队列,当消息变为死信消息时,会自动存入私信队列
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnUtils.getConn();
Channel channel = connection.createChannel();
// 定义死信队的Exchange
String dlxExchange = "dlx.exchange";
channel.exchangeDeclare(dlxExchange, "topic");
// 死信队列名
String dlxQueue = "dlx.queue";
channel.queueDeclare(dlxQueue, true, false, false, null);
// # 表示所有的key都可以路由到s死信队列
String dlxRoutingKey = "#";
// 绑定死信队列和exchange
channel.queueBind(dlxQueue, dlxExchange, dlxRoutingKey, null);
// 定义正常的消费者j监听队列
// 申明exchange
String exchangeName = "test_dlx_exchange";
channel.exchangeDeclare(exchangeName, "topic");
// 申明队列
Map<String, Object> arguments = new HashMap<>();
// 设置死信队列,arguments要设置到申明的队列上
arguments.put("x-dead-letter-exchange", dlxExchange);
String queueName = "test_dlx_queue";
channel.queueDeclare(queueName, true, false, false, arguments);
// 队列绑定到exchange
String routingKey = "dlx.#";
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(1);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("---消费者-- " + new String(message.getBody(), "UTF-8"));
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消费者--:cancelCallback ");
}
};
// callback: 消费者会调函数,处理发送过来的消息。
// cancelCallback: 消费者取消订阅时的回调方法。
// 消费消息,autoAck一定要设为false,手工ack
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
}
}
死信消费者
处理死信队列中得信息
// DLXConusmer,监听消费死信队列中的消息
public class DLXConusmer {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnUtils.getConn();
Channel channel = connection.createChannel();
String queueName = "dlx.queue";
String exchangeName = "dlx.exchange";
String routingKey = "#";
// 申明exchange
channel.exchangeDeclare(exchangeName, "topic");
// 申明队列
channel.queueDeclare(queueName, true, false, false, null);
// 队列绑定到exchange
channel.queueBind(queueName, exchangeName, routingKey, null);
DeliverCallback deliverCallback = new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
try {
System.out.println("---死信队列消费者---");
System.out.println(new String(message.getBody(), "UTF-8"));
} finally {
// consumer手动 ack 给broker
channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
}
}
};
CancelCallback cancelCallback = new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("---消费者:cancelCallback");
}
};
// 消费消息,autoAck一定要设置为false
channel.basicConsume(queueName, false, deliverCallback, cancelCallback);
}
}


















