使用 Python、Kafka 和 Faust 进行流处理

news2026/5/5 14:37:53
原文towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c?sourcecollection_archive---------2-----------------------#2024-02-18如何在高吞吐量时间序列数据上进行流处理并应用实时预测模型https://medium.com/aliosia?sourcepost_page---byline--a11740d0910c--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--a11740d0910c-------------------------------- Ali Osia·发表于Towards Data Science ·阅读时长7 分钟·2024 年 2 月 18 日–https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ef4f06938ee58f847fdbc6bc1a65aaf2.png图片来自JJ Ying于Unsplash大多数流处理库并不适合 Python而大多数机器学习和数据挖掘库却是基于 Python 的。尽管Faust库旨在将 Kafka 流处理理念引入 Python 生态系统但在易用性方面可能会带来挑战。本文件作为一个教程提供了有效使用 Faust 的最佳实践。在第一部分我介绍了流处理概念的基本概述广泛借鉴了《设计数据密集型应用》一书[1]。随后我探讨了 Faust 库的关键功能特别是 Faust 窗口这部分通常在现有文档中难以理解且难以高效利用。因此我提出了一种通过利用库自身函数的替代方法来使用 Faust 窗口。最后我分享了在 Google Cloud Platform 上实现类似管道的经验。流处理流stream指的是随着时间推移逐步可用的无界数据。事件event是一个小型的、独立的对象包含某一时刻发生的事情的详细信息例如用户交互。事件由生产者producer生成例如温度传感器并可能被一些消费者consumers消费例如在线仪表盘。传统的数据库不适合存储高吞吐量的事件流。这是因为消费者需要定期轮询数据库以识别新事件从而产生显著的开销。相反最好是让消费者在新事件出现时收到通知而消息系统messaging systems正是为此而设计的。消息代理message broker是一种广泛采用的消息传递系统其中生产者将消息写入代理消费者由代理通知并接收这些消息。基于AMQP 的消息代理AMQP-based message brokers如RabbitMQ常用于服务之间的异步消息传递和任务队列。与数据库不同它们采用瞬时消息的理念仅在消息被消费者确认后才会删除消息。当消息处理变得资源密集时可以通过使用多个消费者以负载均衡的方式从相同主题读取来实现并行处理。在这种方法中消息会被随机分配给消费者进行处理这可能导致处理的顺序与接收的顺序不同。另一方面基于日志的消息代理log-based message brokers如Apache Kafka将数据库存储的持久性与消息系统的低延迟通知能力结合在一起。它们利用分区日志结构其中每个分区表示按追加顺序存储在磁盘上的记录序列。这一设计使得重新读取旧消息成为可能。Kafka 中的负载均衡是通过将每个消费者分配给一个分区来实现的从而消息处理的顺序与接收的顺序一致但消费者的数量受限于可用分区的数量。流处理stream processing涉及对流执行操作如处理流并生成新的流、将事件数据存储在数据库中或在仪表盘上可视化数据。流分析stream analytics是一个常见的使用案例其中我们在定义的时间窗口内聚合来自一系列事件的信息。滚动窗口Tumbling windows非重叠和跳动窗口Hopping windows重叠是流分析中常用的窗口类型。流分析使用案例的例子可以是简单地计算过去一小时内的事件数或者对事件应用复杂的时间序列预测模型。流分析面临着区分事件创建时间*(事件时间)和事件处理时间的挑战因为事件处理可能由于排队或网络问题引入延迟。基于处理时间定义窗口是一种更简单的方法特别是当处理延迟较小时。然而基于事件时间定义窗口则更具挑战性。这是因为无法确定窗口内的所有数据是否已经接收完毕或者是否还有未处理的事件。因此需要处理在窗口被认为已完成后仍然到达的滞后事件*。在涉及复杂流分析的应用中如时间序列预测通常需要将一系列有序的消息作为一个整体在窗口内进行处理。在这种情况下消息之间存在强烈的相互依赖关系导致很难从代理中确认并移除单个消息。因此基于日志的消息代理成为了一种更可取的选择。此外在这种情况下由于窗口中的所有消息需要一起考虑平行处理可能不可行或实现过于复杂。然而应用复杂的机器学习模型来处理数据可能需要大量计算资源因此必须采取替代的平行处理方法。本文旨在提出一种解决方案以在高吞吐量流处理应用中有效地使用资源密集型机器学习模型。Faust 流处理有多个流处理库可供选择例如 Apache Kafka Streams、Flink、Samza、Storm 和 Spark Streaming。每个库都有自己的优缺点但其中许多并不是特别适合 Python。不过Faust是一个基于 Python 的流处理库使用 Kafka 作为底层消息系统旨在将 Kafka Streams 的理念引入 Python 生态系统。不幸的是Faust 的文档可能会让人困惑源代码也可能难以理解。例如理解 Faust 中窗口的工作方式在不参考复杂的源代码的情况下是具有挑战性的。此外Faustv1和Faust-Streamingv2仓库中存在许多开放问题解决这些问题并非一件简单的事情。接下来将提供有关 Faust 底层结构的必要知识并附上代码片段帮助有效利用 Faust 库。使用 Faust 的第一步是创建一个应用并配置项目通过指定代理和其他必要的参数。一个有用的参数是table_cleanup_interval将在后续讨论。appfaust.App(app_name,brokerbroker_address,storerocksdb_address,table_cleanup_intervaltable_cleanup_interval)然后你可以使用agent装饰器定义一个流处理器从 Kafka 主题中消费数据并对每个接收到的事件执行某些操作。schemafaust.Schema(value_serializerjson)topicapp.topic(topic_name,schemaschema)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:print(event)为了在流处理器中保持状态我们可以使用 Faust 的Table。表是一个分布式的内存字典由 Kafka 变更日志主题支持。你可以将table视为一个可以在流处理器中设置的 Python 字典。tableapp.Table(table_name,defaultint)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:table[key]eventFaust 窗口让我们考虑一个时间序列问题每秒我们需要从前 10 秒钟的样本中进行预测。因此我们需要 10 秒重叠的窗口重叠时间为 1 秒。为了实现这个功能我们可以利用 Faust 的windowed tables但在 Faust 文档中对它们的解释不够充分常常导致困惑。理想情况下流处理库应该自动执行以下任务为每个窗口保持状态事件列表确定新事件的相关窗口最后 10 个窗口更新这些窗口的状态将新事件附加到它们各自列表的末尾在窗口关闭时应用一个函数使用窗口的状态作为输入。在下面的代码片段中你可以观察到 Faust 文档中建议的构建窗口并在流处理器中使用它的方法参考 Faust 库中的这个示例# Based on Fuast example# Do not use thiswindow_wrapperapp.Table(table_name,defaultlist,on_window_closewindow_close).hopping(10,1,expiresexpire_time)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:window_setwindow_wrapper[key]prevwindow_set.value()prev.append(event)window_wrapper[key]prev在提供的代码中window_wrapper对象是WindowWrapper类的一个实例提供了一些所需的功能。expires参数决定了窗口生命周期的持续时间从创建开始计算。一旦这个指定的时间过去窗口就被视为关闭。Faust 会定期检查table_cleanup_interval持续时间以识别已关闭的窗口。然后它会应用window_close函数使用窗口状态作为输入。当你调用window_wrapper[key]时它返回一个类型为WindowSet的对象该对象内部包含所有相关的窗口。通过调用window_set.value()你可以访问最新窗口的状态另外通过调用window_set.delta(30)你可以访问 30 秒前的窗口状态。此外你还可以通过为window_wrapper[key]赋新值来更新最新窗口的状态。这种方法适用于滚动窗口但不适用于跳跃窗口跳跃窗口需要更新多个窗口的状态。[Faust 文档] 此时在访问跳跃表中的数据时我们总是访问给定时间戳的最新窗口而且我们无法修改这种行为。虽然 Faust 支持维护窗口状态、识别相关窗口并在已关闭的窗口上应用函数但它并没有完全解决第三个功能即更新所有相关窗口的状态。Google Cloud 解决方案我想简要讨论一下我在使用 Google Cloud PlatformGCP时的负面体验。GCP 推荐使用 Google Pub/Sub 作为消息代理Apache Beam 作为流处理库Google Dataflow 作为执行工具Google BigQuery 作为数据库。然而当我尝试使用这个技术栈时我遇到了许多问题导致使用起来非常具有挑战性。在 Python 中使用 Google Pub/Sub 证明是比较慢的可以查看这个和这个这让我放弃了它转而使用 Kafka。Apache Beam 是一个文档齐全的库但与 Kafka 一起使用时却遇到了自己的一些问题。直接运行器有漏洞需要使用 Dataflow且由于等待机器配置导致了显著的时间延迟。此外我还遇到了窗口触发延迟的问题尽管我尝试过解决这个问题但都没有成功可以查看这个GitHub 问题和这个Stack Overflow 贴文。而且由于多个组件的复杂集成调试整个系统是一个巨大的挑战这让我对日志的控制非常有限也使得很难 pinpoint定位Pub/Sub、Beam、Dataflow 或 BigQuery 中问题的根本原因。总的来说我在使用 Google Cloud Platform 的过程中遇到了 Python 中 Google Pub/Sub 性能慢、使用 Apache Beam 与 Kafka 时的 bugs 以及调试这些互联系统的整体困难。[1] Kleppmann, Martin.设计数据密集型应用可靠、可扩展和可维护系统背后的核心理念。 “ O’Reilly Media, Inc.”, 2017。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2585265.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot-17-MyBatis动态SQL标签之常用标签

文章目录 1 代码1.1 实体User.java1.2 接口UserMapper.java1.3 映射UserMapper.xml1.3.1 标签if1.3.2 标签if和where1.3.3 标签choose和when和otherwise1.4 UserController.java2 常用动态SQL标签2.1 标签set2.1.1 UserMapper.java2.1.2 UserMapper.xml2.1.3 UserController.ja…

wordpress后台更新后 前端没变化的解决方法

使用siteground主机的wordpress网站,会出现更新了网站内容和修改了php模板文件、js文件、css文件、图片文件后,网站没有变化的情况。 不熟悉siteground主机的新手,遇到这个问题,就很抓狂,明明是哪都没操作错误&#x…

网络编程(Modbus进阶)

思维导图 Modbus RTU(先学一点理论) 概念 Modbus RTU 是工业自动化领域 最广泛应用的串行通信协议,由 Modicon 公司(现施耐德电气)于 1979 年推出。它以 高效率、强健性、易实现的特点成为工业控制系统的通信标准。 包…

UE5 学习系列(二)用户操作界面及介绍

这篇博客是 UE5 学习系列博客的第二篇,在第一篇的基础上展开这篇内容。博客参考的 B 站视频资料和第一篇的链接如下: 【Note】:如果你已经完成安装等操作,可以只执行第一篇博客中 2. 新建一个空白游戏项目 章节操作,重…

IDEA运行Tomcat出现乱码问题解决汇总

最近正值期末周,有很多同学在写期末Java web作业时,运行tomcat出现乱码问题,经过多次解决与研究,我做了如下整理: 原因: IDEA本身编码与tomcat的编码与Windows编码不同导致,Windows 系统控制台…

利用最小二乘法找圆心和半径

#include <iostream> #include <vector> #include <cmath> #include <Eigen/Dense> // 需安装Eigen库用于矩阵运算 // 定义点结构 struct Point { double x, y; Point(double x_, double y_) : x(x_), y(y_) {} }; // 最小二乘法求圆心和半径 …

使用docker在3台服务器上搭建基于redis 6.x的一主两从三台均是哨兵模式

一、环境及版本说明 如果服务器已经安装了docker,则忽略此步骤,如果没有安装,则可以按照一下方式安装: 1. 在线安装(有互联网环境): 请看我这篇文章 传送阵>> 点我查看 2. 离线安装(内网环境):请看我这篇文章 传送阵>> 点我查看 说明&#xff1a;假设每台服务器已…

XML Group端口详解

在XML数据映射过程中&#xff0c;经常需要对数据进行分组聚合操作。例如&#xff0c;当处理包含多个物料明细的XML文件时&#xff0c;可能需要将相同物料号的明细归为一组&#xff0c;或对相同物料号的数量进行求和计算。传统实现方式通常需要编写脚本代码&#xff0c;增加了开…

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器的上位机配置操作说明

LBE-LEX系列工业语音播放器|预警播报器|喇叭蜂鸣器专为工业环境精心打造&#xff0c;完美适配AGV和无人叉车。同时&#xff0c;集成以太网与语音合成技术&#xff0c;为各类高级系统&#xff08;如MES、调度系统、库位管理、立库等&#xff09;提供高效便捷的语音交互体验。 L…

(LeetCode 每日一题) 3442. 奇偶频次间的最大差值 I (哈希、字符串)

题目&#xff1a;3442. 奇偶频次间的最大差值 I 思路 &#xff1a;哈希&#xff0c;时间复杂度0(n)。 用哈希表来记录每个字符串中字符的分布情况&#xff0c;哈希表这里用数组即可实现。 C版本&#xff1a; class Solution { public:int maxDifference(string s) {int a[26]…

【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型

摘要 拍照搜题系统采用“三层管道&#xff08;多模态 OCR → 语义检索 → 答案渲染&#xff09;、两级检索&#xff08;倒排 BM25 向量 HNSW&#xff09;并以大语言模型兜底”的整体框架&#xff1a; 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后&#xff0c;分别用…

【Axure高保真原型】引导弹窗

今天和大家中分享引导弹窗的原型模板&#xff0c;载入页面后&#xff0c;会显示引导弹窗&#xff0c;适用于引导用户使用页面&#xff0c;点击完成后&#xff0c;会显示下一个引导弹窗&#xff0c;直至最后一个引导弹窗完成后进入首页。具体效果可以点击下方视频观看或打开下方…

接口测试中缓存处理策略

在接口测试中&#xff0c;缓存处理策略是一个关键环节&#xff0c;直接影响测试结果的准确性和可靠性。合理的缓存处理策略能够确保测试环境的一致性&#xff0c;避免因缓存数据导致的测试偏差。以下是接口测试中常见的缓存处理策略及其详细说明&#xff1a; 一、缓存处理的核…

龙虎榜——20250610

上证指数放量收阴线&#xff0c;个股多数下跌&#xff0c;盘中受消息影响大幅波动。 深证指数放量收阴线形成顶分型&#xff0c;指数短线有调整的需求&#xff0c;大概需要一两天。 2025年6月10日龙虎榜行业方向分析 1. 金融科技 代表标的&#xff1a;御银股份、雄帝科技 驱动…

观成科技:隐蔽隧道工具Ligolo-ng加密流量分析

1.工具介绍 Ligolo-ng是一款由go编写的高效隧道工具&#xff0c;该工具基于TUN接口实现其功能&#xff0c;利用反向TCP/TLS连接建立一条隐蔽的通信信道&#xff0c;支持使用Let’s Encrypt自动生成证书。Ligolo-ng的通信隐蔽性体现在其支持多种连接方式&#xff0c;适应复杂网…

铭豹扩展坞 USB转网口 突然无法识别解决方法

当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…

未来机器人的大脑:如何用神经网络模拟器实现更智能的决策?

编辑&#xff1a;陈萍萍的公主一点人工一点智能 未来机器人的大脑&#xff1a;如何用神经网络模拟器实现更智能的决策&#xff1f;RWM通过双自回归机制有效解决了复合误差、部分可观测性和随机动力学等关键挑战&#xff0c;在不依赖领域特定归纳偏见的条件下实现了卓越的预测准…

Linux应用开发之网络套接字编程(实例篇)

服务端与客户端单连接 服务端代码 #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #include <stdio.h> #include <stdlib.h> #include <string.h> #include <arpa/inet.h> #include <pthread.h> …

华为云AI开发平台ModelArts

华为云ModelArts&#xff1a;重塑AI开发流程的“智能引擎”与“创新加速器”&#xff01; 在人工智能浪潮席卷全球的2025年&#xff0c;企业拥抱AI的意愿空前高涨&#xff0c;但技术门槛高、流程复杂、资源投入巨大的现实&#xff0c;却让许多创新构想止步于实验室。数据科学家…

深度学习在微纳光子学中的应用

深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向&#xff1a; 逆向设计 通过神经网络快速预测微纳结构的光学响应&#xff0c;替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…