Flink水位线-详细说明

news2025/7/11 3:18:00

文章目录

  • 时间语义
    • Flink 中的时间语义?
    • 哪种时间语义更重要?
  • 1. 水位线(Watermark)
    • 1.1 什么是水位线?
    • 1.2 如何生成水位线?
    • 1.3 水位线的传递
    • 1.4 水位线的计算

💎💎💎💎💎

githubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithubgithub

更多资源链接,欢迎访问作者gitee仓库:https://gitee.com/fanggaolei/learning-notes-warehouse/tree/master

时间语义

在理解水位线概念之前我们应该先了解时间语义的内容

Flink 中的时间语义?

image-20221117163355711

1.处理时间(Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

2.事件时间(Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

哪种时间语义更重要?

  实际应用中,数据产生的时间处理的时间可能是完全不同的。很长时间收集起来的数据,处理或许只要一瞬间;也有可能数据量过大、处理能力不足,短时间堆了大量数据处理不完,产生“背压”(back pressure)。

   通常来说,处理时间是我们计算效率的衡量标准,而事件时间会更符合我们的业务计算逻辑。所以更多时候我们使用事件时间;不过处理时间也不是一无是处。对于处理时间而言,由于没有任何附加考虑,数据一来就直接处理,因此这种方式可以让我们的流处理延迟降到最低,效率达到最高。

1. 水位线(Watermark)

image-20221117165859376

1.1 什么是水位线?

   在Flink中,水位线是一种衡量Event Time进展的机制,用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流事件,到Flink的source,再到多个oparator处理数据,过程中会受到网络延迟、背压等多种因素影响造成数据乱序。

​    具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。 而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了。

image-20221119172614899

1.有序流中的水位线

image-20221118170851128

2.乱序流中的水位线

  这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,主要就是基于数据的产生时间而言的。

image-20221118171008781

  我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线

image-20221118171221753

  如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

image-20221118171234685

  为了让窗口能够正确收集到迟到的数据,我们也可以等上 2 秒;也就是用当前已有数据的最大时间戳减去 2 秒,就是要插入的水位线的时间戳

image-20221118171246223

3.水位线的特性

⚫ 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据

⚫ 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展

⚫ 水位线是基于数据的时间戳生成的

⚫ 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进

⚫ 水位线可以通过设置延迟,来保证正确处理乱序数据

⚫ 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

1.2 如何生成水位线?

1.生成水位线的总体原则

  我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,==我们只能尽量去保证水位线的正确。如果对结果正确性要求很高、想要让窗口收集到所有数据,==我们该怎么做呢?

  一个字,等。由于网络传输的延迟不确定,为了获取所有迟到数据,我们只能等待更长的时间。作为筹划全局的程序员,我们当然不会傻傻地一直等下去。那到底等多久呢?这就需要对相关领域有一定的了解了。比如,如果我们知道当前业务中事件的迟到时间不会超过 5 秒,那就可以将水位线的时间戳设为当前已有数据的最大时间戳减去 5 秒,相当于设置了 5 秒的延迟等待。

2.水位线生成策略(Watermark Strategies)

import com.fang.chapter05.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class WatermarkTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setAutoWatermarkInterval(100);

        //从元素中读取数据
         SingleOutputStreamOperator<Event> stream = env.fromElements(
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./home", 1100L),
                new Event("Marry", "./home", 1000L),
                new Event("Bob", "./prod?id=1", 1000L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3200L)
                //有序流的watermark生成
//        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
//        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
//            @Override
//            public long extractTimestamp(Event element, long recordTimestamp) {
//                return element.timestamp;
//            }
//        })  //提取时间戳,生成水位线
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );
        env.execute();
    }
}

它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间

.assignTimestampsAndWatermarks()

  .assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就
是 所 谓 的 “ 水 位 线 生 成 策 略 ”

  WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

3.Flink 内置水位线生成器

(1)有序流

stream.assignTimestampsAndWatermarks(
 WatermarkStrategy.<Event>forMonotonousTimestamps()
 .withTimestampAssigner(new SerializableTimestampAssigner<Event>() 
{
 		@Override
		public long extractTimestamp(Event element, long recordTimestamp) 
         {
 			return element.timestamp;
 		   }
 		 })
);

(2)乱序流

  由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间(Fixed Amount of Lateness)

.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

1.3 水位线的传递

  水位线定义的本质了:它表示的是“当前时间之前的数据,都已经到齐了”。这是一种保证,告诉下游任务“只要你接到这个水位线,就代表之后我不会再给你发更早的数据了,你可以放心做统计计算而不会遗漏数据”。所以如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?

  当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位。

image-20221118174258154

1.4 水位线的计算

  水位线的默认计算公式:水位线 = 观察到的最大事件时间 – 最大延迟时间 – 1 毫秒

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/18602.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C#编程流程控制与集合类型

目录 选择语句 if-else语句 switch语句 集合一览 数组 列表 字典 迭代 for循环 foreach循环 while循环 超越无限 总结 本文主要来自<<C#实践入门>>哈里森.费隆 著&#xff0c;仅用为做笔记。 本章将专注以下主题: 选择语句。使用数组(Array)、字典(…

高级UI——Path测量

前言 在Path在UI体系当中不论是在自定义View还是动画&#xff0c;都占有举足轻重的地位。绘制Path&#xff0c;可以通过Android提供的API&#xff0c;或者是贝塞尔曲线、数学函数、图形组合等等方式&#xff0c;而要获取Path上每一个构成点的坐标&#xff0c;一般需要知道Path…

力扣刷题记录120.1-----718. 最长重复子数组

目录一、题目二、代码三、运行结果一、题目 二、代码 class Solution { public://dp[i][j]表示以 i j为末尾 最长公共子序列int findLength(vector<int>& nums1, vector<int>& nums2) {int i,j;int return_int0;vector<vector<int>> dp(n…

数据可视化模块 Matplotlib详解

本文主要介绍python 数据可视化模块 Matplotlib&#xff0c;并试图对其进行一个详尽的介绍。 通过阅读本文&#xff0c;你可以&#xff1a; 了解什么是 Matplotlib掌握如何用 Matplotlib 绘制各种图形&#xff08;柱状图、饼状图、直方图等&#xff09;掌握如何定制图形的颜色和…

WiFi连接满格信号但是不能上网?

WiFi已经成为人们日常生活中离不开的东西了&#xff0c;不论是手机还是笔记本电脑。但是有时候会遇到WiFi连接满格信号但是无法上网的情况&#xff0c;这是怎么回事呢&#xff1f;下面就和小编一起来看看吧。 WiFi满信号但是无法上网可能是这几个原因&#xff1a; 1、路由器网络…

使用 Docker 快速搭建 Rust 的 Jupyter Notebook

在 Jupyter notebook 上面运行 Python 程序非常&#xff0c;实际上 Jupyter 也支持其他的内核。 我们可以使用 docker 运行一个已经安装好 Rust Conda Jupyter Notebook 的的容器。 如下&#xff1a; docker run --name jupyter-rust -d -p 8899:8899 -v pwd:/opt/noteboo…

JavaScript作用域(作用域概述、变量的作用域、作用域链)、JavaScript预解析(特殊案例)

目录 JavaScript作用域 作用域概述 变量的作用域 作用域链 JavaScript预解析 特殊案例 JavaScript作用域 作用域概述 通常来说&#xff0c;一段程序代码中所用到的名字并不总是有效和可用的&#xff0c;而限定这个名字的可用性的代码范围就是这个名字的作用域。作用域的…

【C语言经典例题】——程序员必须会的经典基础例题(三)

关于C语言的一些基础经典题目放在专栏&#xff1a;[C语言刷题] 小菜坤日常上传gitee代码&#xff1a;https://gitee.com/qi-dunyan ❤❤❤ 个人简介&#xff1a;双一流非科班的一名小白&#xff0c;期待与各位大佬一起努力&#xff01; 推荐网站&#xff1a;cplusplus.com 目录…

LeNet-5学习笔记

LeNet-5 网络结构 输入→卷积&#xff08;C1&#xff09;→池化&#xff08;S2&#xff09;→卷积&#xff08;C3&#xff09;→池化&#xff08;S4&#xff09;→全连接(F5)→全连接&#xff08;F6&#xff09;→输出&#xff08;Output&#xff09; 卷积神经网络的构成 输…

力扣(LeetCode)18. 四数之和(C++)

双指针 快排使 numsnumsnums 正序。 设置四个指针 iii 指向 numsnumsnums 第一个数&#xff0c;jjj 指向 numsnumsnums 第二个数&#xff0c;从前往后枚举 nums[i]nums[i]nums[i] 和 nums[j]nums[j]nums[j] &#xff0c; lll 从 nums[j1]nums[j1]nums[j1] 往后&#xff0c;指…

AI写作文案的技巧:Wordhero AI写作SOP

文案引用自AI Content Hacker Tips 7步成文&#xff1a;2000单词SEO文案写作 | Wordhero AI Editor大更新心态&#xff1a;用AI写作的正确态度 人工智能 (AI) 的兴起导致写作世界发生了一些有趣的变化。许多人现在正在使用人工智能工具来帮助他们写作。一些专家认为&#xff0…

向毕业妥协系列之深度学习笔记(一)浅层神经网络

目录 一.神经网络杂记 二.计算图&#xff08;反向传播求导的几个实例&#xff09; 1.普通式子反向传播求导 2.逻辑回归中的梯度下降 3.m个样本的梯度下降 三.向量化 四.python广播 五.激活函数 六.随机初始化 深度学习系列的文章也可以结合下面的笔记来看&#xff1a;…

java计算机毕业设计装修设计管理系统设计与实现(附源码、数据库)

java计算机毕业设计装修设计管理系统设计与实现&#xff08;附源码、数据库&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xf…

【论文阅读】时序动作检测系列论文精读(2020年)

文章目录1. DBG: Fast Learning of Temporal Action Proposal via Dense Boundary Generator论文目的——拟解决问题、贡献——创新实现流程详细方法2. PBR-Net: Progressive Boundary Refinement Network for Temporal Action Detection论文目的——拟解决问题贡献——创新实现…

08.初级指针

一、指针 指针理解的2个要点&#xff1a; 1. 指针是内存中一个最小单元的编号&#xff0c;也就是地址 2. 平时口语中说的指针&#xff0c;通常指的是指针变量&#xff0c;是用来存放内存地址的变量 总结&#xff1a;指针就是地址&#xff0c;口语中说的指针通常指的是指针变…

VLSI 半定制设计方法 与 全定制设计方法【VLSI】

VLSI 半定制设计方法 与 全定制设计方法【VLSI】VLSI 半定制设计方法1. standard cell 设计方法Standard Cell library设计方法与步骤特点2. 门阵列(gate array)设计方法gate array特点与FPGA的区别PLA3. 门海设计方法(sea-of-gates styles)全定制&#xff1a;无约束设计方法&a…

希望计算机专业同学都知道这些老师

C语言教程——翁凯老师、赫斌 翁恺老师是土生土长的浙大码农&#xff0c;从本科到博士都毕业于浙大计算机系&#xff0c;后来留校教书&#xff0c;一教就是20多年。 翁恺老师的c语言课程非常好&#xff0c;讲解特别有趣&#xff0c;很适合初学者学习。 郝斌老师的思路是以初学…

【UML】活动图Activity Diagram、状态机图State Machine Diagram、顺序图Sequence Diagram

一、活动图 1、简述 活动图和流程图很相似&#xff0c;但是流程图不属于UML图的一种。 类图是一种静态图&#xff0c;属于结构建模&#xff1b;活动图是一个动态图&#xff0c;属于行为建模。 2、元素 2.1 开始、结束、判读、活动、合并 流程图的元素很简单&#xff1a;圆…

[附源码]java毕业设计社区新冠疫情防控网站

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

WebSocket 和 Socket 的区别

WebSocket 和 Socket 的区别就像Java和JavaScript&#xff0c;并没有什么太大的关系&#xff0c;但又不能说完全没关系。可以这么说&#xff1a; 1.命名方面&#xff0c;Socket是一个深入人心的概念&#xff0c;WebSocket借用了这一概念&#xff1b;2.使用方面&#xff0c;完全…