使用 Python、Kafka 和 Faust 进行流处理
原文towardsdatascience.com/stream-processing-with-python-kafka-faust-a11740d0910c?sourcecollection_archive---------2-----------------------#2024-02-18如何在高吞吐量时间序列数据上进行流处理并应用实时预测模型https://medium.com/aliosia?sourcepost_page---byline--a11740d0910c--------------------------------https://towardsdatascience.com/?sourcepost_page---byline--a11740d0910c-------------------------------- Ali Osia·发表于Towards Data Science ·阅读时长7 分钟·2024 年 2 月 18 日–https://github.com/OpenDocCN/towardsdatascience-blog-zh-2024/raw/master/docs/img/ef4f06938ee58f847fdbc6bc1a65aaf2.png图片来自JJ Ying于Unsplash大多数流处理库并不适合 Python而大多数机器学习和数据挖掘库却是基于 Python 的。尽管Faust库旨在将 Kafka 流处理理念引入 Python 生态系统但在易用性方面可能会带来挑战。本文件作为一个教程提供了有效使用 Faust 的最佳实践。在第一部分我介绍了流处理概念的基本概述广泛借鉴了《设计数据密集型应用》一书[1]。随后我探讨了 Faust 库的关键功能特别是 Faust 窗口这部分通常在现有文档中难以理解且难以高效利用。因此我提出了一种通过利用库自身函数的替代方法来使用 Faust 窗口。最后我分享了在 Google Cloud Platform 上实现类似管道的经验。流处理流stream指的是随着时间推移逐步可用的无界数据。事件event是一个小型的、独立的对象包含某一时刻发生的事情的详细信息例如用户交互。事件由生产者producer生成例如温度传感器并可能被一些消费者consumers消费例如在线仪表盘。传统的数据库不适合存储高吞吐量的事件流。这是因为消费者需要定期轮询数据库以识别新事件从而产生显著的开销。相反最好是让消费者在新事件出现时收到通知而消息系统messaging systems正是为此而设计的。消息代理message broker是一种广泛采用的消息传递系统其中生产者将消息写入代理消费者由代理通知并接收这些消息。基于AMQP 的消息代理AMQP-based message brokers如RabbitMQ常用于服务之间的异步消息传递和任务队列。与数据库不同它们采用瞬时消息的理念仅在消息被消费者确认后才会删除消息。当消息处理变得资源密集时可以通过使用多个消费者以负载均衡的方式从相同主题读取来实现并行处理。在这种方法中消息会被随机分配给消费者进行处理这可能导致处理的顺序与接收的顺序不同。另一方面基于日志的消息代理log-based message brokers如Apache Kafka将数据库存储的持久性与消息系统的低延迟通知能力结合在一起。它们利用分区日志结构其中每个分区表示按追加顺序存储在磁盘上的记录序列。这一设计使得重新读取旧消息成为可能。Kafka 中的负载均衡是通过将每个消费者分配给一个分区来实现的从而消息处理的顺序与接收的顺序一致但消费者的数量受限于可用分区的数量。流处理stream processing涉及对流执行操作如处理流并生成新的流、将事件数据存储在数据库中或在仪表盘上可视化数据。流分析stream analytics是一个常见的使用案例其中我们在定义的时间窗口内聚合来自一系列事件的信息。滚动窗口Tumbling windows非重叠和跳动窗口Hopping windows重叠是流分析中常用的窗口类型。流分析使用案例的例子可以是简单地计算过去一小时内的事件数或者对事件应用复杂的时间序列预测模型。流分析面临着区分事件创建时间*(事件时间)和事件处理时间的挑战因为事件处理可能由于排队或网络问题引入延迟。基于处理时间定义窗口是一种更简单的方法特别是当处理延迟较小时。然而基于事件时间定义窗口则更具挑战性。这是因为无法确定窗口内的所有数据是否已经接收完毕或者是否还有未处理的事件。因此需要处理在窗口被认为已完成后仍然到达的滞后事件*。在涉及复杂流分析的应用中如时间序列预测通常需要将一系列有序的消息作为一个整体在窗口内进行处理。在这种情况下消息之间存在强烈的相互依赖关系导致很难从代理中确认并移除单个消息。因此基于日志的消息代理成为了一种更可取的选择。此外在这种情况下由于窗口中的所有消息需要一起考虑平行处理可能不可行或实现过于复杂。然而应用复杂的机器学习模型来处理数据可能需要大量计算资源因此必须采取替代的平行处理方法。本文旨在提出一种解决方案以在高吞吐量流处理应用中有效地使用资源密集型机器学习模型。Faust 流处理有多个流处理库可供选择例如 Apache Kafka Streams、Flink、Samza、Storm 和 Spark Streaming。每个库都有自己的优缺点但其中许多并不是特别适合 Python。不过Faust是一个基于 Python 的流处理库使用 Kafka 作为底层消息系统旨在将 Kafka Streams 的理念引入 Python 生态系统。不幸的是Faust 的文档可能会让人困惑源代码也可能难以理解。例如理解 Faust 中窗口的工作方式在不参考复杂的源代码的情况下是具有挑战性的。此外Faustv1和Faust-Streamingv2仓库中存在许多开放问题解决这些问题并非一件简单的事情。接下来将提供有关 Faust 底层结构的必要知识并附上代码片段帮助有效利用 Faust 库。使用 Faust 的第一步是创建一个应用并配置项目通过指定代理和其他必要的参数。一个有用的参数是table_cleanup_interval将在后续讨论。appfaust.App(app_name,brokerbroker_address,storerocksdb_address,table_cleanup_intervaltable_cleanup_interval)然后你可以使用agent装饰器定义一个流处理器从 Kafka 主题中消费数据并对每个接收到的事件执行某些操作。schemafaust.Schema(value_serializerjson)topicapp.topic(topic_name,schemaschema)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:print(event)为了在流处理器中保持状态我们可以使用 Faust 的Table。表是一个分布式的内存字典由 Kafka 变更日志主题支持。你可以将table视为一个可以在流处理器中设置的 Python 字典。tableapp.Table(table_name,defaultint)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:table[key]eventFaust 窗口让我们考虑一个时间序列问题每秒我们需要从前 10 秒钟的样本中进行预测。因此我们需要 10 秒重叠的窗口重叠时间为 1 秒。为了实现这个功能我们可以利用 Faust 的windowed tables但在 Faust 文档中对它们的解释不够充分常常导致困惑。理想情况下流处理库应该自动执行以下任务为每个窗口保持状态事件列表确定新事件的相关窗口最后 10 个窗口更新这些窗口的状态将新事件附加到它们各自列表的末尾在窗口关闭时应用一个函数使用窗口的状态作为输入。在下面的代码片段中你可以观察到 Faust 文档中建议的构建窗口并在流处理器中使用它的方法参考 Faust 库中的这个示例# Based on Fuast example# Do not use thiswindow_wrapperapp.Table(table_name,defaultlist,on_window_closewindow_close).hopping(10,1,expiresexpire_time)app.agent(topic)asyncdefprocessor(stream):asyncforeventinstream:window_setwindow_wrapper[key]prevwindow_set.value()prev.append(event)window_wrapper[key]prev在提供的代码中window_wrapper对象是WindowWrapper类的一个实例提供了一些所需的功能。expires参数决定了窗口生命周期的持续时间从创建开始计算。一旦这个指定的时间过去窗口就被视为关闭。Faust 会定期检查table_cleanup_interval持续时间以识别已关闭的窗口。然后它会应用window_close函数使用窗口状态作为输入。当你调用window_wrapper[key]时它返回一个类型为WindowSet的对象该对象内部包含所有相关的窗口。通过调用window_set.value()你可以访问最新窗口的状态另外通过调用window_set.delta(30)你可以访问 30 秒前的窗口状态。此外你还可以通过为window_wrapper[key]赋新值来更新最新窗口的状态。这种方法适用于滚动窗口但不适用于跳跃窗口跳跃窗口需要更新多个窗口的状态。[Faust 文档] 此时在访问跳跃表中的数据时我们总是访问给定时间戳的最新窗口而且我们无法修改这种行为。虽然 Faust 支持维护窗口状态、识别相关窗口并在已关闭的窗口上应用函数但它并没有完全解决第三个功能即更新所有相关窗口的状态。Google Cloud 解决方案我想简要讨论一下我在使用 Google Cloud PlatformGCP时的负面体验。GCP 推荐使用 Google Pub/Sub 作为消息代理Apache Beam 作为流处理库Google Dataflow 作为执行工具Google BigQuery 作为数据库。然而当我尝试使用这个技术栈时我遇到了许多问题导致使用起来非常具有挑战性。在 Python 中使用 Google Pub/Sub 证明是比较慢的可以查看这个和这个这让我放弃了它转而使用 Kafka。Apache Beam 是一个文档齐全的库但与 Kafka 一起使用时却遇到了自己的一些问题。直接运行器有漏洞需要使用 Dataflow且由于等待机器配置导致了显著的时间延迟。此外我还遇到了窗口触发延迟的问题尽管我尝试过解决这个问题但都没有成功可以查看这个GitHub 问题和这个Stack Overflow 贴文。而且由于多个组件的复杂集成调试整个系统是一个巨大的挑战这让我对日志的控制非常有限也使得很难 pinpoint定位Pub/Sub、Beam、Dataflow 或 BigQuery 中问题的根本原因。总的来说我在使用 Google Cloud Platform 的过程中遇到了 Python 中 Google Pub/Sub 性能慢、使用 Apache Beam 与 Kafka 时的 bugs 以及调试这些互联系统的整体困难。[1] Kleppmann, Martin.设计数据密集型应用可靠、可扩展和可维护系统背后的核心理念。 “ O’Reilly Media, Inc.”, 2017。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2585265.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!