kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

news2025/6/2 16:14:53

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {

    public void ConsuermMultithread1() {
        Properties props = initConsifg(); // 此处初始化消费者配置参数省略
        int consumerThreadNum = 5;
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props, topic).start();
        }
    }
    
    // 消费线程
    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;

        public KafkaConsumerThread(Properties prop, String topic) {
          this.kafkaConsumer = new KafkaConsumer<>(prop);
          this.kafkaConsumer.subscribe(Arrays.asList(topic));
        }
        
        @Override
        public void run() {
          try {
              while (true) {
                  ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                  for (ConsumerRecord<String, String> record: records) {
                      // 处理消息
                  }
              }
          } catch (Exception e) {
              e.printStackTrace();
          } finally {
              kafkaConsumer.close();
          }
        }
    }
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {

    public void ConsuermMultithread1() {
        Properties props = initConsifg(); // 此处初始化消费者配置参数省略
        int consumerThreadNum = 5;
        for (int i = 0; i < consumerThreadNum; i++) {
            new KafkaConsumerThread(props, topic).start();
        }
    }

    public static class KafkaConsumerThread extends Thread {
        private KafkaConsumer<String, String> kafkaConsumer;
        private ExecutorService executorService;
        private int threadNumber;

        public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {
            this.kafkaConsumer = new KafkaConsumer<>(prop);
            this.kafkaConsumer.subscribe(Collections.singletonList(topic));
            this.threadNumber = threadNumber;

            executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        }

        @Override
        public void run() {
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
                    if (!records.isEmpty()) {
                        executorService.submit(new RecordsHandler(records));
                    }
                    
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                kafkaConsumer.close();
            }
        }
    }

    public static class RecordsHandler extends Thread {
        public final ConsumerRecords<String, String> records;

        public RecordsHandler(ConsumerRecords<String, String> records) {
            this.records = records;
        }

        @Override
        public void run() {
            /// 处理records  
        }
    }
}

此方法需要引入一个共享的offsets来参与提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2394351.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[JVM] JVM内存调优

&#x1f338;个人主页:https://blog.csdn.net/2301_80050796?spm1000.2115.3001.5343 &#x1f3f5;️热门专栏: &#x1f9ca; Java基本语法(97平均质量分)https://blog.csdn.net/2301_80050796/category_12615970.html?spm1001.2014.3001.5482 &#x1f355; Collection与…

秒出PPT正式改名秒出AI,开启AI赋能新体验!

在现代办公环境中&#xff0c;借助智能工具提升工作效率已经成为趋势。秒出AI作为一款集AI PPT制作、动画、巨幕、视频、设计以及智能简历功能于一体的综合办公平台&#xff0c;为用户提供一站式智能内容生成解决方案&#xff0c;极大地简化了内容创作流程。 1. AI驱动的一键P…

VM改MAC电脑密码(截图)

进入恢复模式重置密码 重启mac并同时按下CommandR&#xff0c;进入恢复模式。进入「菜单栏-实用程序-终端」&#xff0c;输入命令「resetpassword」回车运行&#xff0c;调出密码重置工具。选择包含密码的启动磁盘卷宗、需重设密码的用户账户&#xff1b;输入并确认新的用户密…

SpringBoot+Vue+微信小程序校园自助打印系统

概述​​ 校园自助打印系统是现代化校园建设中不可或缺的一部分&#xff0c;基于SpringBootVue微信小程序开发的​​免费Java源码​​项目&#xff0c;包含完整的用户预约、打印店管理等功能模块。 ​​主要内容​​ ​​ 系统功能模块​​ ​​登录验证模块​​&#xff1a;…

【论文精读】2024 CVPR--Upscale-A-Video现实世界视频超分辨率(RealWorld VSR)

文章目录 一、摘要二、挑战三、Method3.1 前置知识3.1.1 预训练SD 4 Upscaler3.1.2 Inflated 2D Convolution 扩展2D卷积 3.2 Local Consistency within Video Segments 视频片段中的一致性3.2.1 微调时序U-Net3.2.2 微调时序VAE-Decoder 3.3 跨片段的全局一致性 Global Consis…

学术合作交流

想找志同道合的科研小伙伴&#xff01;研究方向包括&#xff1a;计算机视觉&#xff08;CV&#xff09;、人工智能&#xff08;AI&#xff09;、目标检测、行人重识别、行人搜索、虹膜识别等。欢迎具备扎实基础的本科、硕士及博士生加入&#xff0c;共同致力于高质量 SCI 期刊和…

【LUT技术专题】图像自适应3DLUT

3DLUT开山之作: Learning Image-adaptive 3D Lookup Tables for High Performance Photo Enhancement in Real-time&#xff08;2020 TPAMI &#xff09; 专题介绍一、研究背景二、图像自适应3DLUT方法2.1 前置知识2.2 整体流程2.3 损失函数的设计 三、实验结果四、局限五、总结…

德拜温度热容推导

目录 一、背景与基本假设 一、态密度的定义 二、从波矢空间出发 三、振动模式数与波矢体积关系 四、模式总数计算 五、态密度求导 六、德拜频率确定与归一化条件 二、内能表达式的推导 三、态密度代入与变量替换 四、求比热容 五、低温时&#xff08;&#xff09; …

【iOS】源码阅读(五)——类类的结构分析

文章目录 前言类的分析类的本质objc_class 、objc_object和NSObjectobjc_object&#xff1a;所有对象的基类型objc_class&#xff1a;类的底层结构NSObject&#xff1a;面向用户的根类 小结 指针内存偏移普通指针----值拷贝对象----指针拷贝或引用拷贝用数组指针引出----内存偏…

基于CangjieMagic的RAG技术赋能智能问答系统

目录 引言 示例程序分析 代码结构剖析 导入模块解读 智能体配置详情 提示词模板说明 主程序功能解析 异步聊天功能实现 检索信息展示 技术要点总结 ollama 本地部署nomic-embed-text 运行测试 结语 引言 这段时间一直在学习CangjieMagic。前几天完成了在CangjieMa…

算力租赁革命:弹性模式如何重构数字时代的创新门槛​

一、算力革命&#xff1a;第四次工业革命的核心驱动力​ 在科技飞速发展的当下&#xff0c;我们正悄然迎来第四次工业革命。华为创始人任正非在一场程序设计竞赛中曾深刻指出&#xff0c;这场革命的基础便是大算力。随着 5G、人工智能、大数据、物联网等信息技术的迅猛发展&am…

图论回溯

图论 200.岛屿数量DFS 给你一个由 ‘1’&#xff08;陆地&#xff09;和 ‘0’&#xff08;水&#xff09;组成的的二维网格&#xff0c;请你计算网格中岛屿的数量。岛屿总是被水包围&#xff0c;并且每座岛屿只能由水平方向和/或竖直方向上相邻的陆地连接形成。此外&#xff…

RFID测温芯片助力新能源产业安全与能效提升

在“双碳”目标驱动下&#xff0c;新能源产业正经历爆发式增长。无论是电动汽车、储能电站还是风光发电场&#xff0c;设备安全与能效提升始终是行业核心命题。而温度&#xff0c;这个看似普通的物理参数&#xff0c;却成为破解这一命题的关键密码。RFID测温芯片&#xff08;集…

S32K3 工具篇9:如何在无源码情况下灵活调试elf文件

S32K3 工具篇9&#xff1a;如何在无源码情况下灵活调试elf文件 一&#xff0c;文档简介二&#xff0c; 功能实现2.1 代码工具准备2.2 elf修改功能实现&#xff1a;Fun2功能跳过2.2.1 PC越过Fun22.2.2 Fun2替换为nop 2.3 elf修改功能实现&#xff1a;Fun4替换Fun2入口2.3.1 link…

Nacos 配置文件总结

Nacos 配置文件总结 文章目录 Nacos 配置文件总结1 、在 Nacos 服务端添加配置文件1. 启动Nacos Server。2. 新建配置文件。3. 发布配置集后&#xff0c;我们便可以在配置列表中查看相应的配置文件。4. 配置nacos数据库5. 运行 Nacos 容器6. 验证安装结果7. 配置验证 2 、在 Na…

ASP.NET Web Forms框架识别

ASP.NET 支持三种不同的开发模式&#xff1a; Web Pages&#xff08;Web 页面&#xff09;、MVC&#xff08;Model View Controller 模型-视图-控制器&#xff09;、Web Forms&#xff08;Web 窗体&#xff09;&#xff1a; Web Pages 单页面模式MVC 模型-视图-控制器Web Form…

哈工大计统大作业-程序人生

摘 要 本项目以“程序人生-Hellos P2P”为核心&#xff0c;通过编写、预处理、编译、汇编、链接及运行一个简单的Hello程序&#xff0c;系统探讨了计算机系统中程序从代码到进程的全生命周期。实验基于Ubuntu环境&#xff0c;使用GCC工具链完成代码转换&#xff0c;分析了预处…

设计模式——装饰器设计模式(结构型)

摘要 文中主要介绍了装饰器设计模式&#xff0c;它是一种结构型设计模式&#xff0c;可在不改变原有类代码的情况下&#xff0c;动态为对象添加额外功能。文中详细阐述了装饰器模式的角色、结构、实现方式、适合场景以及实战示例等内容&#xff0c;还探讨了其与其他设计模式的…

途景VR智拍APP:开启沉浸式VR拍摄体验

在数字化时代&#xff0c;VR技术以其沉浸式的体验逐渐走进了人们的日常生活。途景VR智拍APP作为一款集看图和拍照于一体的VR软件&#xff0c;为用户带来了全新的视觉体验和便捷的拍摄方式&#xff0c;无论是专业摄影师还是普通用户&#xff0c;都能轻松上手&#xff0c;拍出令人…

Linux环境搭建MCU开发环境

操作系统版本&#xff1a; ubuntu 22.04 文本编辑器&#xff1a; vscode 开发板&#xff1a; stm32f103c8t6 调试器&#xff1a; st-link 前言 步骤一&#xff1a; 安装交叉编译工具链 步骤二&#xff1a; 创建工程目录结构 步骤三&#xff1a; 调试…