基于Scala实现Flink的三种基本时间窗口操作

news2025/6/9 8:07:47

目录

代码结构

代码解析

(1) 主程序入口

(2) 窗口联结(Window Join)

(3) 间隔联结(Interval Join)

(4) 窗口同组联结(CoGroup)

(5) 执行任务

代码优化

(1) 时间戳分配

(2) 窗口大小

(3) 输出格式

(4) 并行度

优化后的代码


 

这段代码展示了 Apache Flink 中三种不同的流联结操作:窗口联结(Window Join)间隔联结(Interval Join) 和 窗口同组联结(CoGroup)。以下是对代码的详细解析和说明: 

代码结构

  • 包声明package transformplus
    定义了代码所在的包。

  • 导入依赖
    导入了 Flink 相关类库,包括流处理 API、窗口分配器、时间语义等。

  • WindowJoin 对象
    主程序入口,包含三种流联结操作的实现。

package transformplus

import java.lang

import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Event

/**
 *
 * @PROJECT_NAME: flink1.13
 * @PACKAGE_NAME: transformplus
 * @author: 赵嘉盟-HONOR
 * @data: 2023-12-05 12:05
 * @DESCRIPTION
 *
 */
object WindowJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //TODO 窗口联结(join)
    val stream1 = env.fromElements(
      ("a", 1000L),
      ("b", 1000L),
      ("a", 2000L),
      ("b", 6000L)
    ).assignAscendingTimestamps(_._2)
    val stream2 = env.fromElements(
      ("a", 3000L),
      ("b", 3000L),
      ("a", 4000L),
      ("b", 8000L)
    ).assignAscendingTimestamps(_._2)

    stream1.join(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply((e1,e2)=>e1+"->"+e2)
      .print("Join")

    //TODO 间隔联结:用户行为事件联系(intervalJoin)
    // 订单事件流
    val orderStream: DataStream[(String, String, Long)] = env
      .fromElements(
        ("Mary", "order-1", 5000L),
        ("Alice", "order-2", 5000L),
        ("Bob", "order-3", 20000L),
        ("Alice", "order-4", 20000L),
        ("Cary", "order-5", 51000L)
      ).assignAscendingTimestamps(_._3)
    // 点击事件流
    val pvStream: DataStream[Event] = env
      .fromElements(
        Event("Bob", "./cart", 2000L),
        Event("Alice", "./prod?id=100", 3000L),
        Event("Alice", "./prod?id=200", 3500L),
        Event("Bob", "./prod?id=2", 2500L),
        Event("Alice", "./prod?id=300", 36000L),
        Event("Bob", "./home", 30000L),
        Event("Bob", "./prod?id=1", 23000L),
        Event("Bob", "./prod?id=3", 33000L)
      ).assignAscendingTimestamps(_.timestamp)

    orderStream.keyBy(_._1)
      .intervalJoin(pvStream.keyBy(_.user))
      .between(Time.seconds(-5),Time.seconds(10))
      .process(new ProcessJoinFunction[(String,String,Long),Event,String] {
        override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {
          collector.collect(in1+"=>"+in2)
        }
      }).print("intervalJoin")


    //TODO 窗口同组联结: coGroup(iterable)
    stream1.coGroup(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply(new CoGroupFunction[(String,Long),(String,Long),String] {
        override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {
          collector.collect(iterable+"=>"+iterable1)
        }
      }).print("coGroup")
    env.execute("windowJoin")
  }
}

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setParallelism(1)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment,并设置并行度为 1。
(2) 窗口联结(Window Join)
val stream1 = env.fromElements(
  ("a", 1000L),
  ("b", 1000L),
  ("a", 2000L),
  ("b", 6000L)
).assignAscendingTimestamps(_._2)

val stream2 = env.fromElements(
  ("a", 3000L),
  ("b", 3000L),
  ("a", 4000L),
  ("b", 8000L)
).assignAscendingTimestamps(_._2)

stream1.join(stream2)
  .where(_._1)
  .equalTo(_._1)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .apply((e1, e2) => e1 + "->" + e2)
  .print("Join")
  • 数据流:定义了两个流 stream1 和 stream2,分别包含键值对 (String, Long)
  • 时间戳分配:使用 assignAscendingTimestamps 方法为事件分配时间戳。
  • 窗口联结
    • 使用 join 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件对拼接成字符串并输出。
(3) 间隔联结(Interval Join)
val orderStream: DataStream[(String, String, Long)] = env
  .fromElements(
    ("Mary", "order-1", 5000L),
    ("Alice", "order-2", 5000L),
    ("Bob", "order-3", 20000L),
    ("Alice", "order-4", 20000L),
    ("Cary", "order-5", 51000L)
  ).assignAscendingTimestamps(_._3)

val pvStream: DataStream[Event] = env
  .fromElements(
    Event("Bob", "./cart", 2000L),
    Event("Alice", "./prod?id=100", 3000L),
    Event("Alice", "./prod?id=200", 3500L),
    Event("Bob", "./prod?id=2", 2500L),
    Event("Alice", "./prod?id=300", 36000L),
    Event("Bob", "./home", 30000L),
    Event("Bob", "./prod?id=1", 23000L),
    Event("Bob", "./prod?id=3", 33000L)
  ).assignAscendingTimestamps(_.timestamp)

orderStream.keyBy(_._1)
  .intervalJoin(pvStream.keyBy(_.user))
  .between(Time.seconds(-5), Time.seconds(10))
  .process(new ProcessJoinFunction[(String, String, Long), Event, String] {
    override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {
      collector.collect(in1 + "=>" + in2)
    }
  }).print("intervalJoin")
  • 数据流:定义了两个流 orderStream(订单事件)和 pvStream(点击事件)。
  • 时间戳分配:为事件分配时间戳。
  • 间隔联结
    • 使用 intervalJoin 方法将两个流按键(_._1 和 user)联结。
    • 使用 between 方法定义时间间隔(前 5 秒到后 10 秒)。
    • 使用 process 方法将匹配的事件对拼接成字符串并输出。
(4) 窗口同组联结(CoGroup)
stream1.coGroup(stream2)
  .where(_._1)
  .equalTo(_._1)
  .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  .apply(new CoGroupFunction[(String, Long), (String, Long), String] {
    override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {
      collector.collect(iterable + "=>" + iterable1)
    }
  }).print("coGroup")
  • 窗口同组联结
    • 使用 coGroup 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件集合拼接成字符串并输出。
(5) 执行任务
env.execute("windowJoin")
  • 启动 Flink 流处理任务,任务名称为 windowJoin

代码优化

(1) 时间戳分配
  • assignAscendingTimestamps 方法假设事件时间戳是严格递增的。如果时间戳可能乱序,应使用 assignTimestampsAndWatermarks 方法:

    java

    stream1.assignTimestampsAndWatermarks(
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )
(2) 窗口大小
  • 窗口大小(5 秒)可能不适合所有场景。应根据实际需求调整窗口大小。
(3) 输出格式
  • 输出格式较为简单,可以优化为更易读的形式:

    java

    collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")
(4) 并行度
  • 并行度设置为 1,可能影响性能。可以根据集群资源调整并行度:

    java

    env.setParallelism(4)

优化后的代码

以下是优化后的完整代码:

package transformplus

import java.lang
import java.time.Duration

import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Event

object WindowJoin {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(4)

    // 窗口联结
    val stream1 = env.fromElements(
      ("a", 1000L),
      ("b", 1000L),
      ("a", 2000L),
      ("b", 6000L)
    ).assignTimestampsAndWatermarks(
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )

    val stream2 = env.fromElements(
      ("a", 3000L),
      ("b", 3000L),
      ("a", 4000L),
      ("b", 8000L)
    ).assignTimestampsAndWatermarks(
      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
        .withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )

    stream1.join(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply((e1, e2) => s"${e1._1} (${e1._2}) -> ${e2._1} (${e2._2})")
      .print("Join")

    // 间隔联结
    val orderStream: DataStream[(String, String, Long)] = env
      .fromElements(
        ("Mary", "order-1", 5000L),
        ("Alice", "order-2", 5000L),
        ("Bob", "order-3", 20000L),
        ("Alice", "order-4", 20000L),
        ("Cary", "order-5", 51000L)
      ).assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
          .withTimestampAssigner((event: (String, String, Long), timestamp: Long) => event._3)
      )

    val pvStream: DataStream[Event] = env
      .fromElements(
        Event("Bob", "./cart", 2000L),
        Event("Alice", "./prod?id=100", 3000L),
        Event("Alice", "./prod?id=200", 3500L),
        Event("Bob", "./prod?id=2", 2500L),
        Event("Alice", "./prod?id=300", 36000L),
        Event("Bob", "./home", 30000L),
        Event("Bob", "./prod?id=1", 23000L),
        Event("Bob", "./prod?id=3", 33000L)
      ).assignTimestampsAndWatermarks(
        WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5))
          .withTimestampAssigner((event: Event, timestamp: Long) => event.timestamp)
      )

    orderStream.keyBy(_._1)
      .intervalJoin(pvStream.keyBy(_.user))
      .between(Time.seconds(-5), Time.seconds(10))
      .process(new ProcessJoinFunction[(String, String, Long), Event, String] {
        override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {
          collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")
        }
      }).print("intervalJoin")

    // 窗口同组联结
    stream1.coGroup(stream2)
      .where(_._1)
      .equalTo(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
      .apply(new CoGroupFunction[(String, Long), (String, Long), String] {
        override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {
          collector.collect(s"Stream1: ${iterable.toString}, Stream2: ${iterable1.toString}")
        }
      }).print("coGroup")

    env.execute("windowJoin")
  }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2405140.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

c++对halcon的动态链接库dll封装及调用(细细讲)

七个部分(是个大工程) 一,halcon封装函数导出cpp的内容介绍 二,c++中对halcon环境的配置 三,在配置环境下验证halcon代码 四,dll项目创建+环境配置 五,编辑dll及导出 六,调用打包好的动态链接库的配置 七,进行测试 一,halcon的封装及导出cpp的介绍 1,我这里…

【优选算法】分治

一&#xff1a;颜色分类 class Solution { public:void sortColors(vector<int>& nums) {// 三指针法int n nums.size();int left -1, right n, i 0;while(i < right){if(nums[i] 0) swap(nums[left], nums[i]);else if(nums[i] 2) swap(nums[--right], num…

【图片识别改名】如何批量将图片按图片上文字重命名?自动批量识别图片文字并命名,基于图片文字内容改名,WPF和京东ocr识别的解决方案

应用场景 在日常工作和生活中&#xff0c;我们经常会遇到需要对大量图片进行重命名的情况。例如&#xff0c;设计师可能需要根据图片内容为设计素材命名&#xff0c;文档管理人员可能需要根据扫描文档中的文字对图片进行分类命名。传统的手动重命名方式效率低下且容易出错&…

RabbitMQ 的高可用性

RabbitMQ 是比较有代表性的&#xff0c;因为是基于主从&#xff08;非分布式&#xff09;做高可用的RabbitMQ 有三种模式&#xff1a;单机模式、普通集群模式、镜像集群模式。 单机模式 单机模式,生产几乎不用。 普通集群模式&#xff08;无高可用性&#xff09; 普通集群模…

AI架构师修炼之道

1 AI时代的架构革命 与传统软件开发和软件架构师相比&#xff0c;AI架构师面临着三重范式转换&#xff1a; 1.1 技术维度&#xff0c;需处理异构算力调度与模型生命周期管理的复杂性&#xff1b; 1.2 系统维度&#xff0c;需平衡实时性与资源约束的矛盾&#xff1b; 1.3 价…

iview组件库:当后台返回到的数据与使用官网组件指定的字段不匹配时,进行修改某个属性名再将response数据渲染到页面上的处理

1、需求导入 当存在前端需要的数据的字段渲染到表格或者是一些公共的表格组件展示数据时的某个字段名与后台返回的字段不一致时&#xff0c;那么需要前端进行稍加处理&#xff0c;而不能直接this.list res.data;这样数据是渲染不出来的。 2、后台返回的数据类型 Datalist(pn) …

服务器 | Centos 9 系统中,如何部署SpringBoot后端项目?

系列文章目录 虚拟机 | Ubuntu 安装流程以及界面太小问题解决 虚拟机 | Ubuntu图形化系统&#xff1a; open-vm-tools安装失败以及实现文件拖放 虚拟机 | Ubuntu操作系统&#xff1a;su和sudo理解及如何处理忘记root密码 文章目录 系列文章目录前言一、环境介绍二、 使用syst…

(2025)Windows修改JupyterNotebook的字体,使用JetBrains Mono

(JetBrains Mono字体未下载就配置,这种情况我不知道能不能行,没做过实验,因为我电脑已经下载了,不可能删了那么多字体做实验,我的建议是下载JetBrains Mono字体,当你使用VsCode配置里面的JetBrains字体也很有用) 首先参考该文章下载字体到电脑上 VSCode 修改字体为JetBrains …

小番茄C盘清理:专业高效的电脑磁盘清理工具

在使用电脑的过程中&#xff0c;我们常常会遇到系统盘空间不足、磁盘碎片过多、垃圾文件堆积等问题&#xff0c;这些问题不仅会导致电脑运行缓慢&#xff0c;还可能引发系统崩溃。为了解决这些问题&#xff0c;小番茄C盘清理应运而生。它是一款专业的C盘清理软件&#xff0c;能…

AUTOSAR实战教程--标准协议栈实现DoIP转DoCAN的方法

目录 软件架构 关键知识点 第一:PDUR的缓存作用 第二:CANTP的组包拆包功能 第三:流控帧的意义 配置过程 步骤0:ECUC模块中PDU创建 步骤1:SoAD模块维持不变 步骤2:DoIP模块为Gateway功能添加Connection ​步骤3:DoIP模块为Gateway新增LA/TA/SA ​步骤4:PDUR模…

【MySQL系列】MySQL 导出表数据到文件

博客目录 一、使用 SELECT INTO OUTFILE 语句基本语法参数详解注意事项实际示例 二、使用 mysqldump 工具基本语法常用选项实际示例 三、使用 MySQL Workbench 导出导出步骤高级选项 四、其他导出方法1. 使用 mysql 命令行客户端2. 使用 LOAD DATA INFILE 的逆向操作3. 使用编程…

vue3:十五、管理员管理-页面搭建

一、页面效果 实现管理员页面,完成管理员对应角色的中文名称显示,实现搜索栏,表格基本增删改查,分页等功能 二、修改问题 1、修改搜索框传递参数问题 (1)问题图示 如下图,之前搜索后,传递的数据不直接是一个value值,而是如下图的格式 查询可知这里传递的数据定义的是…

基于51单片机的红外防盗及万年历仿真

目录 具体实现功能 设计介绍 资料内容 全部内容 资料获取 具体实现功能 具体功能&#xff1a; &#xff08;1&#xff09;实时显示年、月、日、时、分、秒、星期信息&#xff1b; &#xff08;2&#xff09;红外传感器&#xff08;仿真中用按键模拟&#xff09;检测是否有…

【飞腾AI加固服务器】全国产化飞腾+昇腾310+PCIe Switch的AI大模型服务器解决方案

以下是全国产化飞腾AI加固服务器采用飞腾昇腾PCIe Switch解决方案&#xff1a; &#x1f5a5;️ 一、硬件架构亮点 ‌国产算力双擎‌ ‌飞腾处理器‌&#xff1a;搭载飞腾FT2000/64核服务器级CPU&#xff08;主频1.8-2.2GHz&#xff09;&#xff0c;支持高并发任务与复杂计算&a…

应用层协议:HTTPS

目录 HTTPS&#xff1a;超文本传输安全协议 1、概念 2、通信过程及关键技术 2.1 通信过程 1> TLS握手协商&#xff08;建立安全通道&#xff09; 2> 加密数据传输 2.2 关键技术 1> 对称加密算法 2> 非对称加密 3> 对称加密和非对称加密组合 4> 数…

【ArcGIS技巧】—村庄规划规划用地规划状态字段生成工具

"国土空间规划后续也是走向数据治理&#xff0c;数据建库已经是涉及到城市规划、建筑、市政、农业、地理信息、测绘等等方方面面。不得不说以后数据库建设跟维护&#xff0c;是很多专业的必修课。小编就湖南省的村庄规划建库过程中规划用地用海中规划状态字段写了个小工具…

【PCIe总线】-- inbound、outbound配置

PCI、PCIe相关知识整理汇总 【PCIe总线】 -- PCI、PCIe相关实现 由之前的PCIe基础知识可知&#xff0c;pcie的组成有&#xff1a;RC&#xff08;根节点&#xff09;、siwtch&#xff08;pcie桥&#xff09;、EP&#xff08;设备&#xff09;。 RC和EP&#xff0c;以及EP和EP能…

分布式锁实战:Redisson vs. Redis 原生指令的性能对比

分布式锁实战&#xff1a;Redisson vs. Redis 原生指令的性能对比 引言 在DIY主题模板系统中&#xff0c;用户可自定义聊天室的背景、图标、动画等元素。当多个运营人员或用户同时修改同一模板时&#xff0c;若没有锁机制&#xff0c;可能出现“甲修改了背景色&#xff0c;乙…

react+taro 开发第五个小程序,解决拼音的学习

1.找一个文件夹 cmd 2.taro init 3.vscode 找开该文件夹cd help-letters 如&#xff1a;我的是(base) PS D:\react\help-letters> pnpm install 4.先编译一下吧。看下开发者工具什么反应。 pnpm dev:weapp 5.开始规则。我用cursor就是不成功。是不是要在这边差不多了&…

kafka(windows)

目录 介绍 下载 配置 测试 介绍 Kafka是一个分布式流媒体平台&#xff0c;类似于消息队列或企业信息传递系统。 下载 Kafka对于Zookeeper是强依赖&#xff0c;所以安装Kafka之前必须先安装zookeeper 官网&#xff1a;Apache Kafka 下载此安装包并解压 配置 新建log…