Kafka 如何保证顺序消费

news2025/6/7 6:19:08

在消息队列的应用场景中,保证消息的顺序消费对于一些业务至关重要,例如金融交易中的订单处理、电商系统的库存变更等。Kafka 作为高性能的分布式消息队列系统,通过巧妙的设计和配置,能够实现消息的顺序消费。接下来,我们将深入探讨 Kafka 保证顺序消费的原理与方法,并结合图文进行详细说明。

一、Kafka 顺序消费的基础:分区特性

Kafka 的主题(Topic)由多个分区(Partition)组成,每个分区都是一个有序的、不可变的消息序列。在单个分区内,消息按照生产者发送的顺序依次追加到日志文件中,并且每个消息都有唯一的偏移量(Offset)来标识其在分区内的位置。这就意味着,在同一个分区内,消息天然是有序的,这是 Kafka 实现顺序消费的基础。

1.1 分区的作用

分区的存在使得 Kafka 能够实现水平扩展,提高消息处理的并行度。多个生产者可以同时向不同的分区发送消息,多个消费者也可以同时从不同的分区消费消息。然而,这种并行处理的方式可能会导致消息在主题层面的顺序被打乱,因为不同分区之间的消息是相互独立的,没有严格的顺序关系。所以,要保证消息的顺序消费,就需要对分区进行合理的管理和控制。

1.2 分区与消息顺序的关系

如下图所示,一个主题包含三个分区,每个分区内的消息都是有序的,但分区之间的消息顺序无法保证。例如,分区 1 中的消息 M1、M2、M3 按顺序写入,分区 2 中的消息 N1、N2、N3 也按顺序写入,但从主题整体来看,无法确定 M1 和 N1 谁先被处理。

二、保证顺序消费的方法

2.1 生产者端:控制消息发送到同一分区

为了保证消息的顺序消费,生产者需要将具有顺序依赖关系的消息发送到同一个分区。可以通过以下几种方式实现:

  • 自定义分区器:开发者可以实现Partitioner接口来自定义分区策略。例如,在电商系统中,可以根据订单 ID 的哈希值将同一订单相关的消息发送到同一个分区。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class OrderPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 假设key为订单ID,通过哈希值取模分配到固定分区
        return Math.abs(key.hashCode()) % cluster.partitionCountForTopic(topic);
    }
    @Override
    public void close() {}
    @Override
    public void configure(Map<String, ?> configs) {}
}

在生产者配置中指定自定义分区器:

Properties properties = new Properties();
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, OrderPartitioner.class.getName());

  • 固定分区策略:如果业务场景允许,也可以直接将消息发送到固定的分区。例如,在一些简单的日志记录场景中,将所有日志消息都发送到分区 0。

ProducerRecord<String, String> record = new ProducerRecord<>("log_topic", 0, "log_key", "log_message");

2.2 消费者端:一个分区仅由一个消费者消费

在消费者端,为了确保分区内的消息按顺序消费,需要保证一个分区只能被消费者组内的一个消费者消费。Kafka 的消费者组机制天然支持这一点,当消费者组内的消费者数量小于或等于分区数量时,Kafka 会自动将分区分配给消费者,且每个分区最多被一个消费者消费。

如下图所示,消费者组中有两个消费者,主题有四个分区,Kafka 会将分区 0 和 1 分配给消费者 1,分区 2 和 3 分配给消费者 2,这样每个消费者都能按顺序消费自己负责的分区内的消息。

2.3 消费者端:控制消费线程

如果消费者使用多线程处理消息,需要注意控制线程的消费顺序,避免出现乱序消费的情况。一种常见的方法是为每个分区分配一个独立的消费线程,确保同一分区的消息在同一个线程中按顺序处理。例如:

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
public class OrderedConsumer {
    private static final int THREAD_POOL_SIZE = 10;
    private final KafkaConsumer<String, String> consumer;
    private final ExecutorService executorService;
    private final Map<String, Thread> partitionThreads = new ConcurrentHashMap<>();
    public OrderedConsumer(KafkaConsumer<String, String> consumer) {
        this.consumer = consumer;
        this.executorService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
    }
    public void startConsuming() {
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    String partition = record.partition() + "";
                    Thread thread = partitionThreads.get(partition);
                    if (thread == null ||!thread.isAlive()) {
                        thread = new Thread(() -> consumePartition(partition));
                        partitionThreads.put(partition, thread);
                        executorService.submit(thread);
                    }
                }
            }
        } finally {
            consumer.close();
            executorService.shutdown();
        }
    }
    private void consumePartition(String partition) {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                if (record.partition() + "".equals(partition)) {
                    // 处理消息
                    processMessage(record);
                }
            }
        }
    }
    private void processMessage(ConsumerRecord<String, String> record) {
        // 具体的消息处理逻辑
        System.out.println("Consumed message: " + record.value());
    }
}

三、顺序消费的应用场景

3.1 金融交易场景

在金融交易系统中,订单的创建、支付、退款等操作必须按照顺序进行处理,否则可能会导致资金错误或业务逻辑混乱。通过将同一订单的相关消息发送到同一个分区,并保证分区按顺序消费,可以确保订单操作的正确性和一致性。

3.2 数据库变更日志

在数据库的变更数据捕获(Change Data Capture,CDC)场景中,Kafka 可以用于记录数据库表的增删改操作。为了保证数据库状态的一致性,这些变更日志必须按照顺序消费和应用。利用 Kafka 的顺序消费特性,能够准确地将数据库变更同步到其他系统。

3.3 电商库存管理

在电商系统中,库存的扣减和回补操作需要严格按顺序执行,否则可能会出现超卖或库存数据不准确的问题。将库存相关的消息发送到同一分区并顺序消费,可以保证库存操作的准确性。

Kafka 通过分区特性、生产者分区策略以及消费者消费方式的控制,能够有效地保证消息的顺序消费。在实际应用中,开发者需要根据具体的业务场景,合理配置和使用这些机制,以满足业务对消息顺序性的要求。同时,也要注意顺序消费可能带来的性能影响,在保证顺序的前提下,通过合理的优化措施提高系统的整体性能。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2401244.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【算法题】算法一本通

每周更新至完结&#xff0c;建议关注收藏点赞。 目录 待整理文章已整理的文章方法论思想总结模版工具总结排序 数组与哈希表栈双指针&#xff08;滑动窗口、二分查找、链表&#xff09;树前缀树堆 优先队列&#xff08;区间/间隔问题、贪心 &#xff09;回溯图一维DP位操作数学…

Modbus转Ethernet IP赋能挤出吹塑机智能监控

在现代工业自动化领域&#xff0c;小疆智控Modbus转Ethernet IP网关GW-EIP-001与挤出吹塑机的应用越来越广泛。这篇文章将为您详细解读这两者的结合是如何提高生产效率&#xff0c;降低维护成本的。首先了解什么是Modbus和Ethernet IP。Modbus是一种串行通信协议&#xff0c;它…

什么是终端安全管理系统(终端安全管理软件2024科普)

在当今数字化迅速发展的时代&#xff0c;企业面临着越来越多的信息安全威胁。为了应对这些挑战&#xff0c;保障公司数据的安全性和完整性&#xff0c;终端安全管理系统&#xff08;Endpoint Security Management System&#xff09;应运而生。 本文将为您深入浅出地科普2024年…

【JVM】Java类加载机制

【JVM】Java类加载机制 什么是类加载&#xff1f; 在 Java 的世界里&#xff0c;每一个类或接口在经过编译后&#xff0c;都会生成对应的 .class 字节码文件。 所谓类加载机制&#xff0c;就是 JVM 将这些 .class 文件中的二进制数据加载到内存中&#xff0c;并对其进行校验…

《C++初阶之入门基础》【C++的前世今生】

【C的前世今生】目录 前言&#xff1a;---------------起源---------------一、历史背景二、横空出世---------------发展---------------三、标准立世C98&#xff1a;首个国际标准版本C03&#xff1a;小修订版本 四、现代进化C11&#xff1a;现代C的开端C14&#xff1a;对C11的…

Apache APISIX

目录 Apache APISIX是什么&#xff1f; Lua Lua 的主要特点&#xff1a; Lua 的常见应用&#xff1a; CVE-2020-13945(Apache APISIX默认API Token导致远程Lua代码执行) ​编辑Lua脚本解析 CVE-2021-45232(Apache APISIX Dashboard API权限绕过导致RCE) Apache …

如何在 git dev 中创建合并请求

先将 自己的代码 推到 自己的远程的 分支上 在 创建 合并请求 根据提示 将 自己的远程的 源码 合并到 对应的分支上 然后 创建 合并请求 等待 对应的 人 来 进行合并就行

基于nlohmann/json 实现 从C++对象转换成JSON数据格式

C对象的JSON序列化与反序列化 基于JsonCpp库实现C对象序列化与反序列化 JSON 介绍 JSON作为一种轻量级的数据交换格式&#xff0c;在Web服务和应用程序中广泛使用。 JSON&#xff08;JavaScript Object Notation&#xff09;是一种轻量级的数据交换格式&#xff0c;易于人阅读…

《T/CI 404-2024 医疗大数据智能采集及管理技术规范》全面解读与实施分析

规范背景与详细信息 《T/CI 404-2024 医疗大数据智能采集及管理技术规范》是由中国国际科技促进会联合河南科技大学、河南科技大学第一附属医院、深圳市人民医院等十余家医疗机构与企业共同制定的团体标准,于2024年5月正式发布实施。该规范是我国医疗大数据领域的重要技术标准…

国产三维CAD皇冠CAD在「金属压力容器制造」建模教程:蒸汽锅炉

面对蒸汽锅炉设计中复杂的曲面封头、密集的管板开孔、多变的支撑结构以及严格的强度与安全规范&#xff08;如GB150、ASME等&#xff09;&#xff0c;传统二维设计手段往往捉襟见肘&#xff0c;易出错、效率低、协同难。国产三维CAD皇冠CAD&#xff08;CrownCAD&#xff09;凭借…

C++中单例模式详解

在C中&#xff0c;单例模式 (Singleton Pattern) 确保一个类只有一个实例&#xff0c;并提供一个全局访问点来获取这个实例。这在需要一个全局对象来协调整个系统行为的场景中非常有用。 为什么要有单例模式&#xff1f; 在许多项目中&#xff0c;某些类从逻辑上讲只需要一个实…

舆情监控系统爬虫技术解析

之前我已经详细解释过爬虫在系统中的角色和技术要点&#xff0c;这次需要更聚焦“如何实现”这个动作。 我注意到上次回复偏重架构设计&#xff0c;这次应该拆解为更具体的操作步骤&#xff1a;从目标定义到数据落地的完整流水线。尤其要强调动态调度这个容易被忽视的环节——…

Vue3中Ant-design-vue的使用-附完整代码

前言 首先介绍一下什么是Ant-design-vue Ant Design Vue 是基于 Vue 3 的企业级 UI 组件库&#xff08;同时兼容 Vue 2&#xff09;&#xff0c;是蚂蚁金服开源项目 Ant Design 的 Vue 实现版本。它遵循 Ant Design 的设计规范&#xff0c;提供丰富的组件和高质量的设计体系&…

Redis Sorted Set 深度解析:从原理到实战应用

Redis Sorted Set 深度解析&#xff1a;从原理到实战应用 在 Redis 丰富的数据结构家族中&#xff0c;Sorted Set&#xff08;有序集合&#xff09;凭借独特的设计和强大的功能&#xff0c;成为处理有序数据场景的得力工具。无论是构建实时排行榜&#xff0c;还是实现基于时间的…

若依框架修改模板,添加通过excel导入数据功能

版本&#xff1a;我后端使用的是RuoYi-Vue-fast版本&#xff0c;前端是RuoYi-Vue3 需求: 我需要每个侧边栏功能都需要具有导入excel功能&#xff0c;但是若依只有用户才具备&#xff0c;我需要代码生成的每个功能都拥有导入功能。​ 每次生成一个一个改实在是太麻烦了。索性…

web全栈开发学习-01html基础

背景 最近在付费网站学习web全栈开发&#xff0c;记录一下阶段性学习。今天刚好学完html基础&#xff0c;跟着教程画了个基础的网站。 样品展示: 开发工具 vscode Visual Studio Code - Code Editing. Redefined 常用插件 Prettier&#xff1a;格式优化 Live Sever:实时调…

多线程环境中,如果多个线程同时尝试向同一个TCP客户端发送数据,添加同步机制

原代码 public async Task SendToClientAsync(TcpClient targetClient, byte[] data, int offset, int length) {try{// 1. 检查客户端是否有效if (targetClient null || !targetClient.Connected){Console.WriteLine("Cannot send: client is not connected");ret…

【含文档+PPT+源码】基于微信小程序的旅游论坛系统的设计与实现

项目介绍 本课程演示的是一款基于微信小程序的旅游论坛系统的设计与实现&#xff0c;主要针对计算机相关专业的正在做毕设的学生与需要项目实战练习的 Java 学习者。 1.包含&#xff1a;项目源码、项目文档、数据库脚本、软件工具等所有资料 2.带你从零开始部署运行本套系统 …

贝叶斯优化+LSTM+时序预测=Nature子刊!

贝叶斯优化与LSTM的融合在时间序列预测领域取得了显著成效&#xff0c;特别是在处理那些涉及众多超参数调整的复杂问题时。 1.这种结合不仅极大提高了预测的精确度&#xff0c;还优化了模型训练流程&#xff0c;提升了效率和成本效益。超参数优化的新篇章&#xff1a;LSTM因其…

Vue3(ref与reactive)

一&#xff0c;ref创建_基本类型的响应式数据 在 Vue 3 中&#xff0c;ref是创建响应式数据的核心 API 之一 ** ref的基本概念** ref用于创建一个可变的响应式数据引用&#xff0c;适用于任何类型的值&#xff08;基本类型、对象、数组等&#xff09;。通过ref包装的值会被转…