消息确认机制分为消息发送确认机制与消息消费确认机制

消息发送确认机制
消息发送确认机制:消息由producer发送后,确认其是否到达broker,又是否被exchange转发至对应queue的机制
该机制分为两部分:producer---broker,exchange---queue
前者的实现依靠ConfirmCallback机制,后者的实现依靠ReturrnsCallback机制
ConfirmCallback
实现ConfirmCallback接口,并重写confirm方法
confirm方法参数含义:
correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
public CorrelationData() {
    this.id = UUID.randomUUID().toString();
}
ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)
cause:传输失败的原因 
当消息传输至broker后就会触发ConfirmCallback回调,无论传输是否成功,可根据传输的结果进行后续处理
@Component
// ConfirmCallback 用于确认消息是否到达 broker(rabbitmq服务器)
// 实现 ConfirmCallback 接口 重写confirm()方法
public class ConfirmCallbackComponent implements RabbitTemplate.ConfirmCallback {
    /*
    correlationData:CorrelationData类只有一个 id 属性 用于唯一标识该消息
    public CorrelationData() {
        this.id = UUID.randomUUID().toString();
    }
    ack:消息是否成功传输到 broker (true表示成功传输 false表示传输失败)
    cause:传输失败的原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            System.out.println("消息发送异常");
        } else {
            System.out.println("消息发送成功" + " correlationData=" + correlationData.getId() + " ack=" + ack + " cause=" + cause);
        }
    }
}ReturrnsCallback
实现ReturnsCallback接口,并重写returnedMessage方法
当消息转发失败后就会触发ReturrnsCallback,会将消息返回给生产者,同时会返回与消息转发失败的相关信息(包含在参数returned内),可对此采取后续处理
@Component
// 实现ReturnCallback接口 重写returnedMessage()方法
public class ReturnsCallbackComponent implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println("ReturnCallback: replyCode=" + returned.getReplyCode() + " replyText=" + returned.getReplyText() + " message= " + returned.getMessage() + " exchange=" + returned.getExchange() + " routingKey=" + returned.getRoutingKey());
    }
}配置文件
注:生产者端配置文件
ConfirmCallback
publisher-confirm-type: correlated
    #NONE:
      #禁用发布确认 是默认值。
    #CORRELATED:
      #发布消息后 交换机会触发回调方法。
    #SIMPLE:
      #有两种效果:
        #1:和CORRELATED一样会触发回调方法
        #2:发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果
    #根据返回结果来判定下一步的逻辑: waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel 则接下来无法发送消息到 brokerReturnsCallback
    template:
      mandatory: true # 设置当交换机分发消息失败时 将消息返回至生产者(否则直接丢弃)
    publisher-returns: true # 允许消息返回至生产者消息消费确认机制
生产者Service
此处需要调用ConfirmCallback接口与ReturnsCallback接口的实现类实例
@Service
public class WorkService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ConfirmCallbackComponent confirmCallbackComponent;
    @Autowired
    private ReturnsCallbackComponent returnsCallbackComponent;
    public void sendMessage(String exchange, String routingKey, Object msg) {
        // 消息被手动ack时的处理
        rabbitTemplate.setConfirmCallback(confirmCallbackComponent);
        // 消息重返队列时的处理
        rabbitTemplate.setReturnsCallback(returnsCallbackComponent);
        // 发送消息
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                // 是否持久化消息
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
                    return message;
                },
                // 实现ConfirmCallback接口 重写其confirm方法时 作为confirm方法的参数
                new CorrelationData(UUID.randomUUID().toString()));
    }
}消费者Service
deliveryTag:表示消息投递序号 接收消息后deliveryTag++
手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作
multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息
举个栗子:
        假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
        此时发送第四条消息 其deliveryTag为4 且该消息被确认
        若multiple被设置为true 则会将1、2、3、4的消息全部进行确认
requeue:消息是否重入队列 true为重入
方法参数: 
basicAck: deliveryTag multiple
basicReject: deliveryTag requeue
basicNack: deliveryTag multiple requeue
 
@Service
@RabbitListener(queues = "work_confirm_queue")
public class WorkerService {
    @RabbitHandler
    public void workerMessage(String msg, Channel channel, Message message) throws IOException {
        /*
        deliveryTag:表示消息投递序号 接收消息后deliveryTag++
        手动确认模式下 我们可以对指定deliveryTag的消息进行ack、nack、reject等操作
        multiple:是否批量确认消息 值为true则会一次性ack所有小于当前消息deliveryTag的消息
        举个栗子:
                假设已发送三条消息 deliveryTag分别是1、2、3 但均未被确认
                此时发送第四条消息 其deliveryTag为4 且该消息被确认
                若multiple被设置为true 则会将1、2、3、4的消息全部进行确认
        requeue:消息是否重入队列 true为重入
        方法参数:
        basicAck: deliveryTag multiple
        basicReject: deliveryTag requeue
        basicNack: deliveryTag multiple requeue
         */
        try {
            System.out.println("worker收到消息: " + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 确认消息
        }  catch (Exception e) {
            // 判断消息是否已重返过队列
            if (message.getMessageProperties().getRedelivered()) {
                System.out.println("worker再次接收消息失败 队列拒绝消息的重返 " + msg);
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息重返
            } else {
                System.out.println("worker接收消息失败 消息将返回队列 " + msg);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // 消息重入队列
            }
        }
    }
}配置文件
注:消费者端配置文件
    listener:
      simple:
        prefetch: 1 # 消费者一次性可以消费的最大消息数
        acknowledge-mode: manual # 开启手动应答
        # none 一律视为应答
        # manual 手动应答
        # auto 自动应答(与none区别在于有应答条件)
        retry:
          enabled: true # 开启重试
          max-attempts: 10 # 最大重试数(若使用try-catch 则该设置失效)
          initial-interval: 1000ms # 重试间隔测试
关于sleep方法:单元测试运行完毕后即关闭,而调用方法与进行通信需要时间,为了确保能收到消费者端的应答,需要保证信道处于开启状态,故sleep
    @Autowired
    WorkService workService;
    @Test
    void workQueuesOrders() throws InterruptedException {
        workService.sendMessage("", "work_confirm_queue", "hello");
        TimeUnit.SECONDS.sleep(5);
    }









![[本人毕业设计] 别踩白块_计算机科学与技术_前端H5游戏毕设](https://img-blog.csdnimg.cn/42bb197631804ac5b29d1e2cc966f54f.png)








