Flink 编程基础:Scala 版 DataStream API 入门

news2025/5/20 21:21:09

大家好!我是心海

流处理技术在大数据时代正变得越来越重要,而 Apache Flink 作为领先的流处理引擎,凭借其高性能、低延迟和丰富的 API 受到了广泛关注。本文将以 Scala 语言为例,详细讲解 Flink DataStream API 的基本编程模型,从数据源、数据转换、数据输出,到窗口划分与时间概念,最后结合经典的 WordCount 案例,带大家一步步动手实践。

目录

一、DataStream API:构建流处理世界的基石 

二、 基本编程实践:WordCount 示例

2.1 代码示例

2.2 代码解析

三、窗口的划分:在无限流中框定边界 

3.1 时间概念

3.2 窗口划分

 3.3 窗口计算

3.4 窗口计算示例

 四、总结


 

一、DataStream API:构建流处理世界的基石 

想象一下,现实世界的数据就像一条奔流不息的河流,时刻都在产生新的信息。DataStream API 就是 Flink 提供给我们的工具箱,里面装满了各种强大的工具,帮助我们捕获、转换和分析这条“数据河流”。

DataStream 编程模型:三段论

一个典型的 Flink DataStream 应用程序可以概括为以下三个核心步骤:

  1. 数据源(Source):数据的起点

  2. 数据转换(Transformation):数据的加工厂

  3. 数据输出(Sink):数据的归宿

可以用一张简单的图来表示这个过程:


二、 基本编程实践:WordCount 示例

为了让大家更直观地理解 Flink 编程,我们以经典的 WordCount 案例来讲解。下面的示例代码使用 Scala 编写,涵盖了数据源、数据转换、窗口计算以及数据输出的全流程。

2.1 代码示例

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object WordCount {
  def main(args: Array[String]): Unit = {
    // 创建流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 1. 数据源:从本地集合创建数据流(实际项目中可替换为读取文件或 Kafka)
    val text = env.fromElements(
      "Flink is a streaming engine",
      "Flink supports batch processing",
      "Scala makes Flink programming concise"
    )

    // 2. 数据转换:拆分单词,转换为 (word, 1) 格式
    val counts = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map(word => (word, 1))
      .keyBy(_._1)
      .reduce((a, b) => (a._1, a._2 + b._2))

    // 3. 数据输出:打印到控制台(实际项目中可写入文件、Kafka 或数据库)
    counts.print()

    // 执行程序
    env.execute("Scala Flink WordCount Example")
  }
}

2.2 代码解析

  • 数据源
    使用 env.fromElements 创建一个包含多条文本数据的流。在实际生产中,可以使用 env.readTextFile 或者 KafkaSource 读取实时数据。

  • 数据转换

    1. flatMap:将每行文本拆分成单词,并转换为小写。

    2. filter:过滤掉空字符串。

    3. map:将每个单词映射成 (word, 1) 元组。

    4. keyBy:根据单词进行分组。

    5. reduce:聚合相同单词的计数。

  • 数据输出
    使用 print 将计算结果输出到控制台。在实际项目中可以替换成其他 Sink(例如写入 Kafka、数据库或文件)。


三、窗口的划分:在无限流中框定边界 

由于流数据是无限的,我们需要将无限的流划分成有限大小的“窗口”,然后在每个窗口上进行计算。Flink 提供了灵活的窗口机制,可以根据时间、数量或其他条件来划分窗口。

无限延伸的蓝色波浪线代表数据流,上面被垂直的虚线分割成若干个矩形区域,每个矩形区域代表一个窗口。每个窗口内部包含若干个数据点。 

3.1 时间概念

在定义和计算窗口时,时间是一个至关重要的概念

  • 事件时间(Event Time)
    指数据中携带的时间戳,反映数据生成的真实时间

  • 处理时间(Processing Time)
    指系统接收到数据时的时间,不受数据本身时间戳影响。

  • 摄取时间(Ingestion Time)
    数据进入 Flink 系统的时间,一般介于事件时间和处理时间之间。

选择哪种时间语义取决于你的应用场景和对时间准确性的要求。事件时间通常是最准确的,但也可能涉及到处理乱序事件的问题。

3.2 窗口划分

常见的窗口类型有:

  • 滚动窗口(Tumbling Window)
    固定长度且互不重叠的窗口,如上面 WordCount 示例中的 5 秒窗口。

  • 滑动窗口(Sliding Window)
    窗口大小固定,但窗口之间存在重叠部分,可设置窗口滑动步长

  • 会话窗口(Session Window)
    根据数据之间的间隔动态划分,当间隔超过设定阈值时视为新窗口。

 3.3 窗口计算

在窗口内执行聚合

一旦我们定义了窗口的划分方式和使用的时间语义,就可以在每个窗口内执行各种计算,例如计数、求和、平均值、最大值、最小值等等。

Flink 提供了不同的 Window Assigners(窗口分配器)来定义如何将数据分配到窗口中,常见的有:

  • 时间窗口(Time Windows): 基于时间长度划分窗口,例如滚动时间窗口(Tumbling Time Window)、滑动时间窗口(Sliding Time Window)、会话窗口(Session Window)。

  • 计数窗口(Count Windows): 基于元素的数量划分窗口,例如滚动计数窗口(Tumbling Count Window)、滑动计数窗口(Sliding Count Window)。

3.4 窗口计算示例

下面是一个使用滑动窗口的示例,统计每个单词在 10 秒窗口内每隔 5 秒统计一次的计数:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time

object SlidingWindowWordCount {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val text = env.fromElements(
      "Flink streaming window example",
      "Flink supports different time semantics",
      "Scala and Flink make a great combination"
    )

    val counts = text
      .flatMap(_.toLowerCase.split("\\W+"))
      .filter(_.nonEmpty)
      .map(word => (word, 1))
      .keyBy(_._1)
      // 定义滑动窗口:窗口长度 10 秒,滑动步长 5 秒
      .timeWindow(Time.seconds(10), Time.seconds(5))
      .sum(1)

    counts.print()
    env.execute("Scala Flink Sliding Window WordCount")
  }
}
  • flatMap(_.toLowerCase.split("\\W+")):将每行文本转换为小写,然后按非单词字符(如空格、标点符号)进行拆分,将每个单词作为一个独立的元素输出。
  • filter(_.nonEmpty):过滤掉空字符串。
  • map(word => (word, 1)):将每个单词映射为一个二元组(word, 1),其中word是单词,1表示该单词出现了一次。
  • keyBy(_._1):根据二元组的第一个元素(即单词)进行分组,以便后续对每个单词进行独立的统计。
  • .timeWindow(Time.seconds(10), Time.seconds(5)):定义一个滑动窗口,窗口长度为 10 秒,滑动步长为 5 秒。这意味着每 5 秒会生成一个新的窗口,每个窗口包含最近 10 秒内的数据。
  • .sum(1):对每个窗口内的二元组的第二个元素(即计数)进行求和,得到每个单词在该窗口内的出现次数。

 


 

 四、总结

恭喜你!通过本篇文章,你已经对 Scala 版 Flink DataStream API 的编程基础有了初步的了解。我们学习了 DataStream API 的核心组成部分:数据源、数据转换和数据输出,并通过 WordCount 示例进行了实践。同时,我们也初步接触了窗口的概念、时间语义和基本的窗口计算。

Flink DataStream API 的功能远不止于此,还有更高级的转换算子、更灵活的窗口操作、状态管理、容错机制等等等待我们去探索。在接下来的文章中,我们将继续深入学习这些更高级的主题,带你逐步成为 Flink 流处理的专家!

希望这篇文章能够帮助你迈出 Flink Scala 编程的第一步。如果你有任何问题或建议,欢迎在评论区留言交流。让我们一起在 Flink 的世界里扬帆起航!

 如果这篇文章对你有所启发,期待你的点赞关注!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2337274.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML5好看的水果蔬菜在线商城网站源码系列模板5

文章目录 1.设计来源1.1 主界面1.2 关于我们1.3 商品服务1.4 果蔬展示1.5 联系我们1.6 商品具体信息1.7 登录注册 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板,程序开发,在线开发,在线沟通 作者:xcLeigh 文章地址&#…

宜搭与金蝶互通——连接器建立

一、 进入连接器工厂 图1 连接器入口 二、 新建连接器 图2 新建连接器第一步 1、 连接器显示名,如图2中①所示; 2、 图2中②域名,是金蝶系统API接口里面的“完整服务地址”com之前的信息,不含“https”,如图3中①所示; 3、 Base Url通常为“/”,如图2…

SP7733:HPYNOS - Happy Numbers I(参考我之前的文章,哈希)

题目大意 我们定义“破坏”整数的过程是对其每一位上的数字的平方求和成为一个新数,如果一个数在经过若干次“破坏”以后变成了 1,那么这个数就是一个高兴的数字,输出变化次数,否则如果永远不会变成 1,输出 −1。 例如…

【JavaWeb】详细讲解 HTTP 协议

文章目录 一、HTTP简介1.1 概念1.2 特点 二、协议2.1 HTTP-请求协议(1)GET方式(2)POST方式(3)GET和POST的区别: 2.2 HTTP-响应协议(1)格式(2)响应…

“星睿O6” AI PC开发套件评测 - Windows on Arm 安装指南和性能测评

引言 Radxa联合此芯科技和安谋科技推出全新的"星睿O6"迷你 ITX 主板。该系统搭载了 CIX P1(CD8180)12 核 Armv9 处理器,拥有高达30T算力的NPU和高性能的GPU,最高配备64GB LPDDR内存,并提供了如 5GbE、HDMI …

Python 调用 YOLOv11 ONNX

Python 调用 YOLO ONNX 1 下载ONNX文件2 Python代码 1 下载ONNX文件 ONNX下载地址 2 Python代码 import cv2 from ultralytics import YOLOdef check(yolo:str, path:str):# 加载 YOLOv11model YOLO(yolo)# 读取图片img cv2.imread(path)# 推理(可以传文件路径…

数据通信学习笔记之OSPF路由汇总

区域间路由汇总 路由汇总又被称为路由聚合,即是将一组前缀相同的路由汇聚成一条路由,从而达到减小路由表规模以及优化设备资源利用率的目的,我们把汇聚之前的这组路由称为精细路由或明细路由,把汇聚之后的这条路由称为汇总路由或…

ASP.NET Core Web API 配置系统集成

文章目录 前言一、配置源与默认设置二、使用步骤1)创建项目并添加配置2)配置文件3)强类型配置类4)配置Program.cs5)控制器中使用配置6)配置优先级测试7)动态重载配置测试8)运行结果示…

如何判断单片机性能极限?

目录 1、CPU 负载 2、内存使用情况 3、实时性能 4、外设带宽 5、功耗与温度 在嵌入式系统设计中,当系统变得复杂、功能增加时,单片机可能会逐渐逼近其性能极限。及时识别这些极限点对于保证产品质量、稳定性和用户体验至关重要。 当你的嵌入式系统…

AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析

以下是 AI在多Agent协同领域的核心概念、技术方法、应用场景及挑战 的详细解析: 1. 多Agent协同的定义与核心目标 多Agent系统(MAS, Multi-Agent System): 由多个独立或协作的智能体(Agent)组成&#xff…

1.凸包、极点、极边基础概念

目录 1.凸包 2.调色问题 3.极性(Extrem) 4.凸组合(Convex Combination) 5.问题转化(Strategy)​编辑 6.In-Triangle test 7.To-Left-test 8.极边(Extream Edges) 1.凸包 凸包就是上面蓝色皮筋围出来的范围 这些钉子可以转换到坐标轴中&#xff0…

OSCP - Proving Grounds - DriftingBlues6

主要知识点 路径爆破dirtycow内核漏洞提权 具体步骤 总体来讲,这台靶机还是比较直接的,没有那么多的陷阱,非常适合用来学习 依旧是nmap开始,只开放了80端口 Nmap scan report for 192.168.192.219 Host is up (0.42s latency). Not shown: 65534 cl…

深度理解指针之例题

文章目录 前言题目分析与讲解涉及知识点 前言 对指针有一定了解后,讲一下一道初学者的易错题 题目分析与讲解 先定义一个数组跟一个指针变量 然后把数组名赋值给指针变量————也就是把首地址传到pulPtr中 重点是分析这一句: *(pulPtr…

LeetCode算法题(Go语言实现)_51

题目 给你两个下标从 0 开始的整数数组 nums1 和 nums2 ,两者长度都是 n ,再给你一个正整数 k 。你必须从 nums1 中选一个长度为 k 的 子序列 对应的下标。 对于选择的下标 i0 ,i1 ,…, ik - 1 ,你的 分数 …

Solon AI MCP Server 入门:Helloworld (支持 java8 到 java24。国产解决方案)

目前网上能看到的 MCP Server 基本上都是基于 Python 或者 nodejs ,虽然也有 Java 版本的 MCP SDK,但是鲜有基于 Java 开发的。 作为Java 开发中的国产顶级框架 Solon 已经基于 MCP SDK 在进行 Solon AI MCP 框架开发了,本文将使用 Solon AI …

公司内部自建知识共享的方式分类、详细步骤及表格总结,分为开源(对外公开)和闭源(仅限内部),以及公共(全员可访问)和内部(特定团队/项目组)四个维度

以下是公司内部自建知识共享的方式分类、详细步骤及表格总结,分为开源(对外公开)和闭源(仅限内部),以及公共(全员可访问)和内部(特定团队/项目组)四个维度&am…

Oracle 19c部署之初始化实例(三)

上一篇文章中,我们已经完成了数据库软件安装,接下来我们需要进行实例初始化工作。 一、初始化实例的两种方式 1.1 图形化初始化实例 描述:图形化初始化实例是通过Oracle的Database Configuration Assistant (DBCA)工具完成的。用户通过一系…

医疗设备预测性维护合规架构:从法规遵循到技术实现的深度解析

在医疗行业数字化转型加速推进的当下,医疗设备预测性维护已成为提升设备可用性、保障医疗安全的核心技术。然而,该技术的有效落地必须建立在严格的合规框架之上。医疗设备直接关乎患者生命健康,其维护过程涉及医疗法规、数据安全、质量管控等…

Openfeign的最佳实践

文章目录 问题引入一、继承的方式1. 建立独立的Moudle服务2. 服务调用方继承jar包中的接口3. 直接注入继承后的接口进行使用 二、抽取的方式1. 建立独立的Moudle服务2.服务调用方依赖注入 问题引入 openfeign接口的实现和服务提供方的controller非常相似,例如&…

Buildroot编译过程中下载源码失败

RK3588编译一下recovery,需要把buildroot源码编译一遍。遇到好几个文件都下载失败,如下所示 pm-utils 1.4.1这个包下载失败,下载地址http://pm-utils.freedesktop.org/releases 解决办法,换个网络用windows浏览器下载后&#xff…