Kafka 消息模式实战:从简单队列到流处理(二)

news2025/6/8 13:26:04

四、Kafka 流处理实战

4.1 Kafka Streams 简介

Kafka Streams 是 Kafka 提供的流处理库,它为开发者提供了一套简洁而强大的 API,用于构建实时流处理应用程序。Kafka Streams 基于 Kafka 的高吞吐量、分布式和容错特性,能够处理大规模的实时数据流,并提供低延迟的处理能力。

Kafka Streams 的设计理念是将流处理逻辑简化为一系列的操作,开发者可以使用类似于 SQL 的语法来定义这些操作,从而实现复杂的流处理任务。它支持有状态和无状态的处理,并且能够自动管理分布式环境下的状态存储和故障恢复。

4.2 流处理拓扑(Topology)

流处理拓扑定义了流处理的逻辑和流程,它是一个有向无环图(DAG),由数据源(Source)、处理器(Processor)和接收器(Sink)组成。

  • 数据源:数据源是拓扑的起点,它从 Kafka 主题中读取数据,并将数据发送给下游的处理器。数据源可以是一个或多个 Kafka 主题。
  • 处理器:处理器是拓扑的核心组件,它对输入的数据进行处理和转换。处理器可以执行各种操作,如过滤、映射、聚合、连接等。一个拓扑中可以包含多个处理器,它们按照顺序依次对数据进行处理。
  • 接收器:接收器是拓扑的终点,它将处理后的结果数据发送到 Kafka 主题或其他外部系统中。接收器可以是一个或多个 Kafka 主题,也可以是其他类型的输出目标,如文件系统、数据库等。

4.3 单词计数示例

下面我们通过一个 Java 代码示例,展示如何使用 Kafka Streams 实现单词计数功能。在这个示例中,我们从一个 Kafka 主题读取文本数据,对每个单词进行计数,并将结果输出到另一个 Kafka 主题。

首先,在 Maven 项目的pom.xml文件中添加 Kafka Streams 依赖:

 

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>

<version>3.5.1</version>

</dependency>

接下来,编写实现单词计数功能的代码:

 

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.KafkaStreams;

import org.apache.kafka.streams.StreamsBuilder;

import org.apache.kafka.streams.StreamsConfig;

import org.apache.kafka.streams.kstream.KStream;

import org.apache.kafka.streams.kstream.KTable;

import java.util.Arrays;

import java.util.Properties;

public class WordCountExample {

public static void main(String[] args) {

// 配置Kafka Streams应用

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-app");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 构建流处理拓扑

StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> source = builder.stream("input-topic");

KTable<String, Long> wordCounts = source

.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

.filter((key, word) ->!word.isEmpty())

.groupBy((key, word) -> word)

.count();

wordCounts.toStream().to("output-topic",

org.apache.kafka.streams.kstream.Produced.with(Serdes.String(), Serdes.Long()));

// 创建并启动Kafka Streams实例

KafkaStreams streams = new KafkaStreams(builder.build(), props);

streams.start();

// 添加关闭钩子,在程序终止时优雅地关闭Kafka Streams

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

}

}

在上述代码中:

  • 首先配置了 Kafka Streams 应用的基本属性,包括应用 ID、Kafka 集群地址以及默认的键和值序列化器。
  • 然后使用StreamsBuilder构建流处理拓扑。从input-topic主题读取数据,将每行文本拆分成单词,过滤掉空单词,按单词分组并计数。
  • 最后将计数结果转换为流,并输出到output-topic主题。
  • 创建并启动KafkaStreams实例,并添加关闭钩子,确保程序在终止时能够优雅地关闭 Kafka Streams。

4.4 高级功能

Kafka Streams 提供了许多高级功能,使其能够满足复杂的实时流处理需求。

窗口操作:窗口操作允许在特定的时间范围内对流数据进行聚合和计算。Kafka Streams 支持固定窗口(Tumbling Window)、滑动窗口(Hopping Window)和会话窗口(Session Window)。例如,使用固定窗口计算每 5 分钟内的订单数量:

 

KTable<Windowed<String>, Long> windowedCounts = source

.groupByKey()

.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))

.count();

连接操作:连接操作可以将多个流或表的数据进行合并。Kafka Streams 支持内连接(Inner Join)、左连接(Left Join)和外连接(Outer Join)。例如,将用户信息表和订单流进行连接,获取每个订单对应的用户信息:

 

KTable<String, User> userTable = builder.table("user-topic");

KStream<String, Order> orderStream = builder.stream("order-topic");

KStream<String, OrderWithUser> joinedStream = orderStream.join(userTable,

(order, user) -> new OrderWithUser(order, user));

状态存储:Kafka Streams 支持有状态处理,能够在处理过程中保存中间状态。状态存储可以保存在内存中或使用 RocksDB 持久化存储。例如,在单词计数示例中,count操作会将计数结果存储在状态存储中,以便后续查询和更新:

 

KTable<String, Long> wordCounts = source

.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

.filter((key, word) ->!word.isEmpty())

.groupBy((key, word) -> word)

.count(Materialized.as("word-count-store"));

容错处理:Kafka Streams 内置了容错机制,能够自动处理数据丢失、节点故障等问题,保证数据处理的一致性和完整性。它会将应用程序的状态保存到 Kafka 中,以便在发生故障时恢复状态。当某个 Kafka Streams 实例发生故障时,其他实例可以接管其工作,继续处理数据,确保流处理任务的连续性。

五、总结与展望

在本次 Kafka 消息模式的探索之旅中,我们从简单队列起步,逐步深入到流处理的复杂领域,全面领略了 Kafka 作为强大分布式消息系统的魅力与实力。

在简单队列场景中,Kafka 展现了其作为消息队列的基础能力。通过搭建 Kafka 和 Zookeeper 环境,我们顺利创建主题,实现了生产者与消费者之间的消息传递。生产者可以灵活地选择同步或异步方式发送消息,消费者则通过自动或手动提交偏移量来确保消息的可靠消费。这种简单而高效的消息队列模式,在许多应用场景中发挥了关键作用,如解耦系统组件、实现异步通信以及流量控制等,为构建稳定、可扩展的应用架构提供了有力支持。

而当我们踏入 Kafka 流处理的世界,更是发现了其无限的潜力。Kafka Streams 提供了一套简洁而强大的 API,使我们能够轻松构建实时流处理应用。通过单词计数示例,我们看到了如何从 Kafka 主题读取数据,对数据进行处理和转换,并将结果输出到其他主题。窗口操作、连接操作、状态存储以及容错处理等高级功能,进一步拓展了 Kafka 流处理的应用范围,使其能够应对各种复杂的实时数据处理需求,如实时监控、实时推荐、欺诈检测等。

展望未来,Kafka 在大数据和实时处理领域的发展前景一片光明。随着技术的不断进步,Kafka 有望在以下几个方面取得更大的突破:

  • 流处理能力持续增强:Kafka Streams 和 KSQL 将不断进化,提供更强大的功能和更高的性能。未来,它们可能会支持更多复杂的流处理任务,以及更多 SQL 特性,使开发者能够更加便捷地处理实时数据流。
  • 云原生支持不断深化:随着 Kubernetes 等云原生技术的普及,Kafka 将更好地融入云原生环境。未来,Kafka 在 Kubernetes 上的部署和管理将变得更加简单,资源利用将更加高效,弹性扩展能力也将进一步增强,为企业在云端构建实时数据处理平台提供更优质的解决方案。
  • 多租户支持更加完善:为了满足多租户环境下的应用需求,Kafka 将进一步增强其安全性和隔离性。通过更细粒度的访问控制和配额管理,Kafka 将确保不同租户之间的数据和资源得到有效隔离,同时提供更好的审计和监控功能,保障多租户环境的稳定运行。
  • 运维和监控工具不断优化:Kafka 将持续提升其运维和监控工具的能力,增强 Kafka Manager、Confluent Control Center 等工具的功能,并与 Prometheus、Grafana 等主流监控系统实现更好的集成,为用户提供更全面、更实时的监控和报警机制,降低 Kafka 集群的运维成本。
  • 存储引擎持续演进:分层存储(Tiered Storage)等新技术的应用,将使 Kafka 能够将数据分层存储到不同的存储介质上,从而降低存储成本并提高存储效率。未来,Kafka 的存储引擎可能会进一步优化,以适应不断增长的数据量和多样化的存储需求。
  • 性能和可靠性进一步提升:Kafka 社区正在考虑引入 Raft 协议来替代目前的 ZooKeeper 协议,这将有望简化 Kafka 的部署和管理,并提供更高的可用性和一致性保障。此外,Kafka 还可能在数据处理速度、容错能力等方面进行优化,以满足对性能和可靠性要求极高的应用场景。
  • 智能数据路由和处理成为趋势:借助机器学习和人工智能技术,Kafka 未来可能会实现智能数据路由和处理。通过动态调整数据路由策略,Kafka 能够更高效地处理和分发数据,提高整个系统的性能和效率,为用户提供更加智能化的实时数据处理服务。

Kafka 作为大数据和实时处理领域的重要工具,将继续引领技术发展的潮流。无论是在简单队列场景还是复杂的流处理应用中,Kafka 都将发挥不可替代的作用,为企业的数字化转型和创新发展提供强大的技术支持。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2404180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

固定ip和非固定ip的区别是什么?如何固定ip地址

在互联网中&#xff0c;我们常会接触到固定IP和非固定IP的概念。它们究竟有何不同&#xff1f;如何固定IP地址&#xff1f;让我们一起来探究这个问题。 一、固定IP和非固定IP的区别是什么 固定IP&#xff08;静态IP&#xff09;和非固定IP&#xff08;动态IP&#xff09;是两种…

使用矩阵乘法+线段树解决区间历史和问题的一种通用解法

文章目录 前言P8868 [NOIP2022] 比赛CF1824DP9990/2020 ICPC EcFinal G 前言 一般解决普通的区间历史和&#xff0c;只需要定义辅助 c h s − t ⋅ a chs-t\cdot a chs−t⋅a&#xff0c; h s hs hs是历史和&#xff0c; a a a是区间和&#xff0c; t t t是时间戳&#xff0c…

如何从浏览器中导出网站证书

以导出 GitHub 证书为例&#xff0c;点击 小锁 点击 导出 注意&#xff1a;这里需要根据你想要证书格式手动加上后缀名&#xff0c;我的是加 .crt 双击文件打开

低功耗MQTT物联网架构Java实现揭秘

文章目录 一、引言二、相关技术概述2.1 物联网概述2.2 MQTT协议java三、基于MQTT的Iot物联网架构设计3.1 架构总体设计3.2 MQTT代理服务器选择3.3 物联网设备设计3.4 应用服务器设计四、基于MQTT的Iot物联网架构的Java实现4.1 开发环境搭建4.2 MQTT客户端实现4.3 应用服务器实现…

ideal2022.3.1版本编译项目报java: OutOfMemoryError: insufficient memory

最近换了新电脑&#xff0c;用新电脑拉项目配置后&#xff0c;启动时报错&#xff0c;错误描述 idea 启动Springboot项目在编译阶段报错&#xff1a;java: OutOfMemoryError: insufficient memory 2. 处理方案 修改VM参数&#xff0c;分配更多内存 ❌ 刚刚开始以为时JVM内存设置…

centos7编译安装LNMP架构

一、LNMP概念 LNMP架构是一种常见的网站服务器架构&#xff0c;由Linux操作系统、Nginx Web服务器、MySQL数据库和PHP后端脚本语言组成。 1 用户请求&#xff1a;用户通过浏览器输入网址&#xff0c;请求发送到Nginx Web服务器。 2 Nginx处理&#xff1a;Nginx接收请求后&…

Spring Boot 3.3 + MyBatis 基础教程:从入门到实践

Spring Boot 3.3 MyBatis 基础教程&#xff1a;从入门到实践 在当今的Java开发领域&#xff0c;Spring Boot和MyBatis是构建高效、可维护的后端应用的两个强大工具。Spring Boot简化了Spring应用的初始搭建和开发过程&#xff0c;而MyBatis则提供了一种灵活的ORM&#xff08;…

征文投稿:如何写一份实用的技术文档?——以软件配置为例

&#x1f4dd; 征文投稿&#xff1a;如何写一份实用的技术文档&#xff1f;——以软件配置为例 目录 [TOC](目录)&#x1f9ed; 技术文档是通往成功的“说明书”&#x1f4a1; 一、明确目标读者&#xff1a;他们需要什么&#xff1f;&#x1f4cb; 二、结构清晰&#xff1a;让读…

tensorflow image_dataset_from_directory 训练数据集构建

以数据集 https://www.kaggle.com/datasets/vipoooool/new-plant-diseases-dataset 为例 目录结构 训练图像数据集要求&#xff1a; 主目录下包含多个子目录&#xff0c;每个子目录代表一个类别。每个子目录中存储属于该类别的图像文件。 例如 main_directory/ ...cat/ ...…

GOOUUU ESP32-S3-CAM 果云科技开发板开发指南(一)(超详细!)Vscode+espidf 通过摄像头拍摄照片并存取到SD卡中,文末附源码

看到最近好玩的开源项目比较多&#xff0c;就想要学习一下esp32的开发&#xff0c;目前使用比较多的ide基本上是arduino、esp-idf和platformio&#xff0c;前者编译比较慢&#xff0c;后两者看到开源大佬的项目做的比较多&#xff0c;所以主要学习后两者。 本次使用的硬件是GO…

全流程开源!高德3D贴图生成系统,白模一键生成真实感纹理贴图

导读 MVPainter 随着3D生成从几何建模迈向真实感还原&#xff0c;贴图质量正逐渐成为决定3D资产视觉表现的核心因素。我们团队自研的MVPainter系统&#xff0c;作为业内首个全流程开源的3D贴图生成方案&#xff0c;仅需一张参考图与任意白模&#xff0c;即可自动生成对齐精确…

html 滚动条滚动过快会留下边框线

滚动条滚动过快时&#xff0c;会留下边框线 但其实大部分时候是这样的&#xff0c;没有多出边框线的 滚动条滚动过快时留下边框线的问题通常与滚动条样式和滚动行为有关。这种问题可能出现在使用了自定义滚动条样式的情况下。 注意&#xff1a;使用方法 6 好使&#xff0c;其它…

数据通信与计算机网络——数据与信号

主要内容 模拟与数字 周期模拟信号 数字信号 传输减损 数据速率限制 性能 注&#xff1a;数据必须被转换成电磁信号才能进行传输。 一、模拟与数字 数据以及表示数据的信号可以使用模拟或者数字的形式。数据可以是模拟的也可以是数字的&#xff0c;模拟数据是连续的采用…

【LLM大模型技术专题】「入门到精通系列教程」LangChain4j与Spring Boot集成开发实战指南

LangChain4j和SpringBoot入门指南 LangChain4jLangchain4j API语言模型消息类型内存对象ChatMemory接口的主要实现设置 API 密钥SpringBoot Configuration配置ChatLanguageModelStreamingChatLanguageModel初始化ChatModel对象模型配置分析介绍说明通过JavaConfig创建ChatModel…

Vue3 GSAP动画库绑定滚动条视差效果 绑定滚动条 滚动条动画 时间轴

介绍 GSAP 用于创建高性能、可控制的动画效果。由 GreenSock 团队开发&#xff0c;旨在提供流畅、快速、稳定的动画效果&#xff0c;并且兼容各种浏览器。 提供了多个插件&#xff0c;扩展了动画的功能&#xff0c;如 ScrollTrigger&#xff08;滚动触发动画&#xff09;、Dra…

grafana-mcp-analyzer:基于 MCP 的轻量 AI 分析监控图表的运维神器!

还在深夜盯着 Grafana 图表手动排查问题&#xff1f;今天推荐一个让 AI 能“读图说话”的开源神器 —— grafana-mcp-analyzer。 想象一下这样的场景&#xff1a; 凌晨3点&#xff0c;服务器告警响起。。。你睁着惺忪的眼睛盯着复杂的监控图表 &#x1f635;‍&#x1f4ab;花…

【题解-洛谷】B3622 枚举子集(递归实现指数型枚举)

题目&#xff1a;B3622 枚举子集&#xff08;递归实现指数型枚举&#xff09; 题目描述 今有 n n n 位同学&#xff0c;可以从中选出任意名同学参加合唱。 请输出所有可能的选择方案。 输入格式 仅一行&#xff0c;一个正整数 n n n。 输出格式 若干行&#xff0c;每行…

(LeetCode 每日一题)3170. 删除星号以后字典序最小的字符串(贪心+栈)

题目&#xff1a;3170. 删除星号以后字典序最小的字符串 思路&#xff1a;贪心栈&#xff0c;时间复杂度0(n)。 对于每一个‘ * ’&#xff0c;优先选最右边的最小字符&#xff0c;才会使最终得到的字符串最小。 用栈&#xff0c;来记录每个字符的位置下标。细节看注释。 C版本…

使用 HTML + JavaScript 实现文章逐句高亮朗读功能

在这个信息爆炸的时代&#xff0c;我们每天都要面对大量的文字阅读。无论是学习、工作还是个人成长&#xff0c;阅读都扮演着至关重要的角色。然而&#xff0c;在快节奏的生活中&#xff0c;我们往往难以找到足够的安静时间专注于阅读。本文用 HTML JavaScript 实现了一个基于…

双碳时代,能源调度的难题正从“发电侧”转向“企业侧”

安科瑞刘鸿鹏 摘要 在“双碳”战略和能源结构转型的大背景下&#xff0c;企业储能电站逐步成为提升能源利用效率、增强用能韧性的重要手段。随着系统规模扩大与运行复杂度提升&#xff0c;如何对光伏、储能、负荷等流进行实时调控&#xff0c;成为智慧用能的关键。ACCU100微…