kafka学习笔记(三、消费者Consumer使用教程——从指定位置消费)

news2025/6/1 0:44:21

在这里插入图片描述


1.简介

Kafka的poll()方法消费无法精准的掌握其消费的起始位置,auto.offset.reset参数也只能在比较粗粒度的指定消费方式。更细粒度的消费方式kafka提供了seek()方法可以指定位移消费允许消费者从特定位置(如固定偏移量、时间戳或分区首尾)开始消费消息。

2.指定消费位置

2.1.从特定偏移量开始消费

使用seek(TopicPartition partition, long offset)指定具体偏移量。

源码分析:

  • seek()方法更新消费者内部的subscriptions对象的position字段,记录目标偏移量。
  • 后续poll()时,Fetcher类根据此位置向Broker发送拉取请求。

代码示例:

consumer.subscribe(Collections.singleton("test-topic"));
Set<TopicPartition> assignment = new HashSet<>();
// 确保分配到分区
while (assignment.isEmpty()) {
    consumer.poll(Duration.ofMillis(100));
    assignment = consumer.assignment();
}
// 设置所有分区从offset=100开始消费
assignment.forEach(tp -> consumer.seek(tp, 100));

2.2.从时间戳开始消费

使用offsetsForTimes()获取时间戳对应的偏移量,再调用seek()

源码分析:

offsetsForTimes()向Broker发送ListOffsetRequest,查询满足时间戳条件的最早或最新偏移量。

代码实例:

Map<TopicPartition, Long> timestamps = assignment.stream()
    .collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 24 * 3600 * 1000L));
// 获取24小时前的偏移量
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
offsets.forEach((tp, offsetAndTs) -> {
    if (offsetAndTs != null) consumer.seek(tp, offsetAndTs.offset());
});

2.3.从分区首尾消费

使用seekToBeginning()seekToEnd(),或通过beginningOffsets()/endOffsets()获取首尾偏移量后手动设置。

代码实例:

// 从分区末尾开始消费(等效于auto.offset.reset=latest)
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(assignment);
assignment.forEach(tp -> consumer.seek(tp, endOffsets.get(tp)));

2.4.注意事项

  1. 分区分配与poll()的依赖
    seek()必须在分区分配完成后调用,否则会抛出IllegalStateException。需通过循环poll()确保分配到分区。

  2. 数据过期问题
    若指定偏移量对应的消息已被删除(如日志清理导致),seek()将失效。此时需使用beginningOffsets()获取当前最小有效偏移量。

  3. 异步提交与位移覆盖风险
    异步提交(commitAsync())失败时不会重试,可能因位移回滚导致重复消费。需结合同步提交(commitSync())保证原子性

  4. seek()方法提供了我们可以将消费者位移保存在外部的能力,还可以配合在均衡监听器来提供更加精准的消费能力。

3.完整代码实例

public class SeekToTimestampDemo {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "seek-demo");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("test-topic"));

        // 等待分区分配
        Set<TopicPartition> assignment = new HashSet<>();
        while (assignment.isEmpty()) {
            consumer.poll(Duration.ofMillis(100));
            assignment = consumer.assignment();
        }

        // 获取24小时前的时间戳对应偏移量
        Map<TopicPartition, Long> timestamps = assignment.stream()
            .collect(Collectors.toMap(tp -> tp, tp -> System.currentTimeMillis() - 86400000L));
        Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);

        // 指定位移
        offsets.forEach((tp, offsetAndTs) -> {
            if (offsetAndTs != null) {
                consumer.seek(tp, offsetAndTs.offset());
            } else {
                // 处理无有效偏移量的情况(如从头开始)
                consumer.seekToBeginning(Collections.singleton(tp));
            }
        });

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            records.forEach(record -> System.out.printf("offset=%d, value=%s%n", record.offset(), record.value()));
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2392290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【后端高阶面经:架构篇】46、分布式架构:如何应对高并发的用户请求

一、架构设计原则:构建可扩展的系统基石 在分布式系统中,高并发场景对架构设计提出了极高要求。 分层解耦与模块化是应对复杂业务的核心策略,通过将系统划分为客户端、CDN/边缘节点、API网关、微服务集群、缓存层和数据库层等多个层次,实现各模块的独立演进与维护。 1.1 …

网络编程学习笔记——TCP网络编程

文章目录 1、socket()函数2、bind()函数3、listen()4、accept()5、connect()6、send()/write()7、recv()/read()8、套接字的关闭9、TCP循环服务器模型10、TCP多线程服务器11、TCP多进程并发服务器 网络编程常用函数 socket() 创建套接字bind() 绑定本机地址和端口connect() …

Vue+element-ui,实现表格渲染缩略图,鼠标悬浮缩略图放大,点击缩略图播放视频(一)

Vueelement-ui&#xff0c;实现表格渲染缩略图&#xff0c;鼠标悬浮缩略图放大&#xff0c;点击缩略图播放视频 前言整体代码预览图具体分析基础结构主要标签作用videoel-popover 前言 如标题&#xff0c;需要实现这样的业务 此处文章所实现的&#xff0c;是静态视频资源。 注…

day13 leetcode-hot100-22(链表1)

160. 相交链表 - 力扣&#xff08;LeetCode&#xff09; 1.哈希集合HashSet 思路 &#xff08;1&#xff09;将A链的所有数据存储到HashSet中。 &#xff08;2&#xff09;遍历B链&#xff0c;找到是否在A中存在。 具体代码 /*** Definition for singly-linked list.* pu…

【Oracle】DQL语言

个人主页&#xff1a;Guiat 归属专栏&#xff1a;Oracle 文章目录 1. DQL概述1.1 什么是DQL&#xff1f;1.2 DQL的核心功能 2. SELECT语句基础2.1 基本语法结构2.2 最简单的查询2.3 DISTINCT去重 3. WHERE条件筛选3.1 基本条件运算符3.2 逻辑运算符组合3.3 高级条件筛选 4. 排序…

HUAWEI华为MateBook D 14 2021款i5,i7集显非触屏(NBD-WXX9,NbD-WFH9)原装出厂Win10系统

适用型号&#xff1a;NbD-WFH9、NbD-WFE9A、NbD-WDH9B、NbD-WFE9、 链接&#xff1a;https://pan.baidu.com/s/1qTCbaQQa8xqLR-4Ooe3ytg?pwdvr7t 提取码&#xff1a;vr7t 华为原厂WIN系统自带所有驱动、出厂主题壁纸、系统属性联机支持标志、系统属性专属LOGO标志、Office…

【STIP】安全Transformer推理协议

Secure Transformer Inference Protocol 论文地址&#xff1a;https://arxiv.org/abs/2312.00025 摘要 模型参数和用户数据的安全性对于基于 Transformer 的服务&#xff08;例如 ChatGPT&#xff09;至关重要。虽然最近在安全两方协议方面取得的进步成功地解决了服务 Transf…

leetcode hot100刷题日记——27.对称二叉树

方法一&#xff1a;递归法 class Solution { public:bool check(TreeNode *left,TreeNode *right){//左子树和右子树的节点同时是空的是对称的if(leftnullptr&&rightnullptr){return true;}if(leftnullptr||rightnullptr){return false;}//检查左右子树的值相不相等&a…

高考加油(Python+HTML)

前言 询问DeepSeek根据自己所学到的知识来生成多个可执行的代码&#xff0c;为高考学子加油。最开始生成的都会有点小问题&#xff0c;还是需要自己调试一遍&#xff0c;下面就是完整的代码&#xff0c;当然了最后几天也不会有多少人看&#xff0c;都在专心的备考。 Python励…

贪心算法应用:Ford-Fulkerson最大流问题详解

Java中的贪心算法应用:Ford-Fulkerson最大流问题详解 1. 最大流问题概述 最大流问题(Maximum Flow Problem)是图论中的一个经典问题,旨在找到一个从源节点(source)到汇节点(sink)的最大流量。Ford-Fulkerson方法是解决最大流问题的经典算法之一,它属于贪心算法的范畴…

UE5 Niagara 如何让四元数进行旋转

Axis Angle中&#xff0c;X,Y,Z分别为旋转的轴向&#xff0c;W为旋转的角度&#xff0c;在这里旋转角度不需要除以2&#xff0c;因为里面已经除了&#xff0c;再将计算好的四元数与要进行旋转的四元数进行相乘&#xff0c;结果就是按照原来的角度绕着某一轴向旋转了某一角度

从“黑箱”到透明化:MES如何重构生产执行全流程?

引言 在传统制造企业中&#xff0c;生产执行环节常面临“计划混乱、进度难控、异常频发、数据滞后”的困境。人工派工效率低下、物料错配频发、质量追溯困难等问题&#xff0c;直接导致交付延期、成本攀升、客户流失。深蓝易网MES系统以全流程数字化管理为核心&#xff0c;通过…

探索Linux互斥:线程安全与资源共享

个人主页&#xff1a;chian-ocean 文章专栏-Linux 前言&#xff1a; 互斥是并发编程中避免竞争条件和保护共享资源的核心技术。通过使用锁或信号量等机制&#xff0c;能够确保多线程或多进程环境下对共享资源的安全访问&#xff0c;避免数据不一致、死锁等问题。 竞争条件 竞…

JWT安全:假密钥.【签名随便写实现越权绕过.】

JWT安全&#xff1a;假密钥【签名随便写实现越权绕过.】 JSON Web 令牌 (JWT)是一种在系统之间发送加密签名 JSON 数据的标准化格式。理论上&#xff0c;它们可以包含任何类型的数据&#xff0c;但最常用于在身份验证、会话处理和访问控制机制中发送有关用户的信息(“声明”)。…

Python爬虫实战:抓取百度15天天气预报数据

&#x1f310; 编程基础第一期《9-30》–使用python中的第三方模块requests&#xff0c;和三个内置模块(re、json、pprint)&#xff0c;实现百度地图的近15天天气信息抓取 记得安装 pip install requests&#x1f4d1; 项目介绍 网络爬虫是Python最受欢迎的应用场景之一&…

RV1126 + FFPEG多路码流项目

代码主体思路&#xff1a; 一.VI,VENC,RGA模块初始化 1.先创建一个自定义公共结构体&#xff0c;用于方便管理各个模块 rkmedia_config_public.h //文件名字#ifndef _RV1126_PUBLIC_H #define _RV1126_PUBLIC_H#include <assert.h> #include <fcntl.h> #include …

NodeJS 基于 Koa, 开发一个读取文件,并返回给客户端文件下载,以及读取文件形成列表和文件删除的代码演示

前言 在上一篇文章 《Nodejs 实现 Mysql 数据库的全量备份的代码演示》 中&#xff0c;我们演示了如何将用户的 Mysql 数据库进行备份的代码。但是&#xff0c;这个备份&#xff0c;只是备份在了服务器上了。 而我们用户的真实需求&#xff0c;是需要将备份文件下载到本地进行…

为什么在我的Flask里面有两个路由,但是在网页里有一个却不能正确访问到智能体

1. /zhoushibo 能访问&#xff0c;/chat 直接浏览器访问报 Method Not Allowed 原因&#xff1a; /zhoushibo 路由是你用 app.route(/zhoushibo) 定义的&#xff0c;返回的是一个HTML网页&#xff0c;浏览器访问没问题。 /chat 路由你用的是 app.route(/chat, methods[POST])…

哈工大计算机系统2024大作业——Hello的程序人生

计算机系统 大作业 题 目 程序人生-Hello’s P2P 专 业 人工智能 学   号 2022112040 班 级 2203601 学 生 郄东昕 指 导 教 师 吴锐 计算机科学与技术学院…

2025年软件测试面试八股文(含答案+文档)

&#x1f345; 点击文末小卡片&#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 Part1 1、你的测试职业发展是什么&#xff1f; 测试经验越多&#xff0c;测试能力越高。所以我的职业发展是需要时间积累的&#xff0c;一步步向着高级测试工程师…