消息队列如何保证消息可靠性(kafka以及RabbitMQ)

news2025/5/14 4:00:43

目录

RabbitMQ保证消息可靠性

生产者丢失消息

MQ丢失消息

消费端丢失了数据

Kakfa的消息可靠性

生产者的消息可靠性

Kakfa的消息可靠性

消费者的消息可靠性


RabbitMQ保证消息可靠性

生产者丢失消息

1.事务消息保证

生产者在发送消息之前,开启事务消息随后生产者发送消息,消息发送之后,如果消息没有被MQ接收到的话,生产者会收到异常报错,生产者回滚事务,然后重试消息,如果收到了消息,就能提交事务了

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendTransactionalMessage() {
    ConnectionFactory connectionFactory = rabbitTemplate.getConnectionFactory();
    Channel channel = connectionFactory.createConnection().createChannel(false);

    try {
        channel.txSelect(); // 开启事务
        channel.basicPublish("exchange", "routing.key", null, "message".getBytes());
        channel.txCommit(); // 提交事务
    } catch (Exception e) {
        channel.txRollback(); // 出错回滚
    }
}

2.使用confirm机制

  • 普通confirm机制,就是发送消息之后,等待服务器confirm之后再发送下一个消息
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
        System.out.println("消息成功发送到Broker");
    } else {
        System.out.println("消息发送失败,原因:" + cause);
    }
});

rabbitTemplate.convertAndSend("exchange", "routing.key", "message");
  • 批量confirm机制,每发送一批消息之后,等待服务器confirm
Channel channel = connection.createChannel(false);
channel.confirmSelect();

for (int i = 0; i < 100; i++) {
    channel.basicPublish("exchange", "routing.key", null, ("msg" + i).getBytes());
}
channel.waitForConfirms(); // 等待所有消息确认
  • 异步confirm机制,服务器confirm一个或者多个消息之后,客户端(生产者)能够通过回调函数来确定消息是否被confirm(推荐)
SortedSet<Long> pendingSet = Collections.synchronizedSortedSet(new TreeSet<>());
channel.confirmSelect();

channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long tag, boolean multiple) {
        if (multiple) pendingSet.headSet(tag + 1).clear();
        else pendingSet.remove(tag);
    }

    public void handleNack(long tag, boolean multiple) {
        System.err.println("未确认消息:" + tag);
        if (multiple) pendingSet.headSet(tag + 1).clear();
        else pendingSet.remove(tag);
    }
});

while (true) {
    long seq = channel.getNextPublishSeqNo();
    channel.basicPublish("demo.exchange", "demo.key",
        MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());
    pendingSet.add(seq);
}

MQ丢失消息

防止MQ的丢失数据的话,方法就是开启RabbitMQ的持久化,消息写入之后(也就是到了MQ之后)就直接持久化到磁盘中,即使Rabbimq自己挂了之后,会恢复数据。

设置持久化步骤

  • 创建queue的时候直接设置持久化,此时就能持久化queue的元数据(不是消息)
@Bean
public Queue durableQueue() {
    return new Queue("myQueue", true); // true 表示持久化
}
  • 发送消息的时候指定消息为deliveryMode设置为2,也就是设置消息为持久化,此时消息可以持久化磁盘上
MessageProperties props = new MessageProperties();
props.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Message message = new Message("content".getBytes(), props);
rabbitTemplate.send("exchange", "routing.key", message);

极端情况:

消息写到RabbitMQ之后,但是还没有持久化到磁盘之后直接挂了,导致内存中消息丢失。

解决方法:持久化与生产者的confirm机制配合,当且仅当持久化了消息之后,再confirm,避免数据与消息丢失,此时生产者收不到ack,也是可以自己重发

消费端丢失了数据

意思就是消息已经拉取到了信息,还没有处理(注意这是已经告诉MQ我拉取到数据了),结果进程挂了,重启之后继续消费下一条消息,导致中间的这一条没有消费到,此时数据丢失了。

利用ack机制处理

取消RabbiMQ的自动ack,也就是一个api,可以在消费端消费完了消息之后再调用api告诉MQ我们收到并且处理了该消息。如果没有返回ack,RabbitMQ会把该消息分配给其他的consumer处理,消息不会丢失。通过配置处理

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

Kakfa的消息可靠性

生产者的消息可靠性

在kafka中,可以在producer(生产段)设置一个参数,也就是ack=all,要求每个数据,必须写入所有的replica(也就是所有该分区的副本),才认为是接收成功。该参数设置的是你的leader接收到消息后,所有的follower都同步到消息后才认为写成功

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all"); // 等待所有副本确认
props.put("retries", Integer.MAX_VALUE); // 无限重试
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key", "value");

producer.send(record, (metadata, exception) -> {
    if (exception == null) {
        System.out.println("发送成功:" + metadata.offset());
    } else {
        exception.printStackTrace();
    }
});
producer.close();

Kakfa的消息可靠性

kafka默认是会将消息持久化到磁盘上的,但是还是有情况会导致丢失数据

kafka某个broker宕机,随后重新选举partition的leader。倘若在该broker中的partition中的leader副本中的消息,还没有被其他broker中的follower同步,此时同步缺失的数据就丢失了,也就是少了一些数据

解决方法:

  • 给 topic 设置 replication.factor 参数:这个值必须大于 1,要求每个 partition 必须有至少 2 个副本。
  • 在 Kafka 服务端设置 min.insync.replicas 参数:这个值必须大于 1,这个是要求一个 leader 至少感知到有至少一个 follower 还跟自己保持联系。
  • 在 producer 端设置 acks=all :这个是要求每条数据,必须是写入所有 replica 之后,才能认为是写成功了。
  • 在 producer 端设置 retries=MAX (很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了。

按照上面的配置之后,leader的切换就不会导致数据缺失了。

消费者的消息可靠性

唯一可能也是类似于RabbitMQ中的,也就是说你消费到该消息的时候,消费者自动提交offset,让kafka以为你消费好了该消息,但是自己还没处理就宕机后,会导致重启后没有消费该消息。

解决方法:

关闭kafka默认的自动提交offset,通过消费端业务逻辑处理完消息后,再手动提交offset,当然这里就是会导致重复消费了,这里就是幂等性的问题了。比如你刚处理完,还没提交 offset,结果自己挂了,此时肯定会重复消费一次

手动提交api:consumer.commitSync();

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
props.put("enable.auto.commit", "false"); // 关闭自动提交
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理消息
        System.out.println("处理消息:" + record.value());
    }
    // 手动提交 offset
    consumer.commitSync();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2375090.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTTP学习

HTTP知识 01. 经典五层模型 应用层 为应用软件提供了很多服务&#xff0c;构建于协议之上。 传输层 数据的传输都是在这层定义的&#xff0c;数据过大分包&#xff0c;分片。 网络层 为数据在节点之间传输创建逻辑链路 数据链路层 通讯实体间建立数据链路连接 物理层 主要作用…

go语言实现IP归属地查询

效果: 实现代码main.go package mainimport ("encoding/json""fmt""io/ioutil""net/http""os" )type AreaData struct {Continent string json:"continent"Country string json:"country"ZipCode …

Android RxJava框架分析:它的执行流程是如何的?它的线程是如何切换的?如何自定义RxJava操作符?

目录 RxJava是什么&#xff1f;为什么使用。RxJava是如何使用的呢&#xff1f;RxJava如何和Retrofit一起使用。RxJava源码分析。 &#xff08;1&#xff09;他执行流程是如何的。&#xff08;2&#xff09;map&#xff08;3&#xff09;线程的切换。 如何自定义RxJava操作符…

MySQL及线程关于锁的面试题

目录 1.了解过 MySQL 死锁问题吗&#xff1f; 2.什么是线程死锁&#xff1f;死锁相关面试题 2.1 什么是死锁&#xff1a; 2.2 形成死锁的四个必要条件是什么&#xff1f; 2.3 如何避免线程死锁&#xff1f; 3. MySQL 怎么排查死锁问题&#xff1f; 4.Java线上死锁问题如…

【工作记录】crmeb后端项目打开、运行

1、下载代码 1&#xff09;安装git 不再详述 2&#xff09;git拉代码 项目地址如下&#xff0c;在vscode-分支中拉代码 # 克隆项目 git clone https://gitee.com/ZhongBangKeJi/crmeb_java/ 截图如下是已经成功拉下来 注意安装对应版本 2、maven配置 安装配置见&#x…

智能手表测试计划文档(软/硬件)

&#x1f4c4; 智能手表测试计划文档&#xff08;软/硬件&#xff09; 项目名称&#xff1a;Aurora Watch S1 文档编号&#xff1a;AW-S1-QA-TP-001 编制日期&#xff1a;2025-xx-xx 版本&#xff1a;V1.0 编写人&#xff1a;xxx&#xff08;测试主管&#xff09; 一、测试目标…

k8s监控方案实践(三):部署与配置Grafana可视化平台

k8s监控方案实践&#xff08;三&#xff09;&#xff1a;部署与配置Grafana可视化平台 文章目录 k8s监控方案实践&#xff08;三&#xff09;&#xff1a;部署与配置Grafana可视化平台一、Grafana简介1. 什么是Grafana&#xff1f;2. Grafana与Prometheus的关系3. Grafana应用场…

嵌入式系统架构验证工具:AADL Inspector v1.10 全新升级

软件架构建模与早期验证是嵌入式应用的关键环节。架构分析与设计语言&#xff08;AADL&#xff09;是专为应用软件及执行平台架构模型设计的语言&#xff0c;兼具文本与图形化的双重特性。AADL Inspector是一款轻量级的独立工具&#xff1a; 核心处理能力包括 √ 支持处理AA…

STM32-模电

目录 一、MOS管 二、二极管 三、IGBT 四、运算放大器 五、推挽、开漏、上拉电阻 一、MOS管 1. MOS简介 这里以nmos管为例&#xff0c;注意箭头方向。G门极/栅极&#xff0c;D漏极&#xff0c;S源极。 当给G通高电平时&#xff0c;灯泡点亮&#xff0c;给G通低电平时&a…

华为云Flexus+DeepSeek征文|从开通到应用:华为云DeepSeek-V3/R1商用服务深度体验

前言 本文章主要讲述在华为云ModelArts Studio上 开通DeepSeek-V3/R1商用服务的流程&#xff0c;以及开通过程中的经验分享和使用感受帮我更多开发者&#xff0c;在华为云平台快速完成 DeepSeek-V3/R1商用服务的开通以及使用入门注意&#xff1a;避免测试过程中出现部署失败等问…

鸿蒙NEXT开发动画案例5

1.创建空白项目 2.Page文件夹下面新建Spin.ets文件&#xff0c;代码如下&#xff1a; /*** TODO SpinKit动画组件 - Pulse 脉冲动画* author: CSDN—鸿蒙布道师* since: 2024/05/09*/ ComponentV2 export struct SpinFive {// 参数定义Require Param spinSize: number 48;Re…

ctfshow——web入门351~356

SSRF没有出网的部分 web入门351 $ch curl_init($url); 作用&#xff1a;初始化一个 cURL 会话&#xff0c;并设置目标 URL。解释&#xff1a; curl_init($url) 创建一个新的 cURL 资源&#xff0c;并将其与 $url 关联。这里的 $url 是用户提供的&#xff0c;因此目标地址完全…

【PostgreSQL数据分析实战:从数据清洗到可视化全流程】金融风控分析案例-10.1 风险数据清洗与特征工程

&#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 &#x1f449; 点击关注不迷路 文章大纲 PostgreSQL金融风控分析案例&#xff1a;风险数据清洗与特征工程实战一、案例背景&#xff1a;金融风控数据处理需求二、风险数据清洗实战&#xff08;一&#xff09;缺失值…

美女热舞混剪视频批量剪辑生产技术实践:智能处理与原创性提升方案解析

一、引言&#xff1a;短视频工业化生产的技术转型 在美女类短视频内容运营中&#xff0c;通过标准化技术流程实现「高质量、规模化」产出成为核心需求。本文结合实战经验&#xff0c;解析如何通过智能素材重组、AI 语音合成、动态元素叠加等技术手段&#xff0c;构建自动化生产…

神经网络基础-从零开始搭建一个神经网络

一、什么是神经网络 人工神经网络(Articial Neural Network,简写为ANN)也称为神经网络(NN),是一种模仿生物神经网络和功能的计算模型,人脑可以看做是一个生物神经网络,由众多的神经元连接而成,各个神经元传递复杂的电信号,树突接收到输入信号,然后对信号进行处理,通…

#Redis黑马点评#(五)Redisson原理详解

目录 一 基于Redis的分布式锁优化 二 Redisson 1 实现步骤 2 Redisson可重入锁机制 3 Redisson可重试机制 4 Redisson超时释放机制 5 RedissonMultiLock解决主从一致性 三 trylock与lock两者有何区别 四 Redis优化秒杀 一 基于Redis的分布式锁优化 二 Redisson Redis…

23.(vue3.x+vite)引入组件并动态切换(component)

让多个组件使用同一个挂载点,并动态切换,这就是动态组件 效果截图 A组件代码: <template><div><div>{{message }}</</

VBA会被Python代替吗

VBA不会完全被Python取代、但Python在自动化、数据分析与跨平台开发等方面的优势使其越来越受欢迎、两者将长期并存且各具优势。 Python以其易于学习的语法、强大的开源生态系统和跨平台支持&#xff0c;逐渐成为自动化和数据分析领域的主流工具。然而&#xff0c;VBA依旧在Exc…

SEMI E40-0200 STANDARD FOR PROCESSING MANAGEMENT(加工管理标准)-(三)完结

10 消息服务详情 10.1 本章定义实现加工管理概念所需的消息服务。这些消息已在第8.1节中初步介绍。 协议无关性&#xff1a;这些服务独立于所使用的消息协议&#xff0c;可映射至SECS-II&#xff08;SEMI E5&#xff09;或其他类似协议。 10.1.1 消息服务定义内容包括&#…

MySQL数据库创建、删除、修改

一&#xff1a;建库建表 我们以学校体系进行建表。将数据库命名为school。 以下代码中的大写均可小写不影响。如CREATE DATABASE与create database相同 四个关键的实体分别是学院、老师、学生和课程&#xff0c;其中&#xff0c;学生跟学院是从属关系&#xff0c;这个关系从…