0基础学习PyFlink——事件时间和运行时间的窗口

news2025/7/10 10:03:34

大纲

  • 定制策略
  • 运行策略
  • Reduce
  • 完整代码
  • 滑动窗口案例
  • 参考资料

在 《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》一文中,我们使用的是运行时间(Tumbling ProcessingTimeWindows)作为窗口的参考时间:

    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))

而得到的结果也是不稳定的。
在这里插入图片描述
这是因为每次运行时,CPU等系统资源的繁忙程度是不一样的,这就影响了最后的运行结果。
为了让结果稳定,我们可以不依赖运行时间(ProcessingTime),而使用不依赖于运行环境,只依赖于数据的事件时间(EventTime)。
一般,我们需要大数据处理的数据,往往存在一个字段用于标志该条数据的“顺序”。这个信息可以是单调递增的ID,也可以是不唯一的时间戳。我们可以将这类信息看做事件发生的时间。
那如何让输入的数据中的“事件时间”参与到窗口时长的计算中呢?这儿就要引入Watermark(水印)的概念。
假如我们把数据看成一张纸上的内容,水印则是这张纸的背景。它并不影响纸上内容的表达,只是系统要用它来做更多的事情。
将数据中表达“顺序”的数据转换成“时间”,我们可以使用水印单调递增时间戳分配器

定制策略

class ElementTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp)-> int:
        return int(value[1])
 ……       
    # define the watermark strategy
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(ElementTimestampAssigner())

for_monotonous_timestamps会分配一个水印单调递增时间戳分配器,然后使用with_timestamp_assigner告知输入数据中“顺序”字段的值。这样系统就会根据这个字段的值生成一个单调递增的时间戳。这个时间戳相对顺序就和输入数据一样,是稳定的。
比如上图中,会分别用2,1,4,3……来计算时间戳。

运行策略

然后对原始数据使用该策略,这样source_with_wartermarks中的数据就包含了时间戳。

source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)

Reduce

这次我们使用TumblingEventTimeWindows,即事件时间(EventTime)窗口,而不是运行时间(ProcessingTime)窗口。

     # keying
    keyed=source_with_wartermarks.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

多运行几次,结果是稳定输出的。
我们再多关注下TimeWindow中的start和end,它们是不重叠的、步长为2、左闭右开的区间。这个符合滚动窗口特性。

完整代码

from typing import Iterable

from pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows, SlidingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner

class ElementTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp)-> int:
        return int(value[1])
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

     # define the watermark strategy
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(ElementTimestampAssigner())
    
    source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)
        
     # keying
    keyed=source_with_wartermarks.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(TumblingEventTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

滑动窗口案例

from typing import Iterable

from pyflink.common import Types, Time, WatermarkStrategy
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import SlidingEventTimeWindows, TimeWindow
from pyflink.common.watermark_strategy import TimestampAssigner

class ElementTimestampAssigner(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp)-> int:
        return int(value[1])
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("E",3),("E",1),("E",4),("E",2),("E",6),("E",5),("E",7),("E",8),("E",9),("E",10)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()
    
    # define the watermark strategy
    watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
        .with_timestamp_assigner(ElementTimestampAssigner())
    
    source_with_wartermarks=source.assign_timestamps_and_watermarks(watermark_strategy)
        
     # keying
    keyed=source_with_wartermarks.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(SlidingEventTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

(‘E’, 1) TimeWindow(start=0, end=2)
(‘E’, 1) (‘E’, 2) TimeWindow(start=1, end=3)
(‘E’, 3) (‘E’, 2) TimeWindow(start=2, end=4)
(‘E’, 3) (‘E’, 4) TimeWindow(start=3, end=5)
(‘E’, 4) (‘E’, 5) TimeWindow(start=4, end=6)
(‘E’, 6) (‘E’, 5) TimeWindow(start=5, end=7)
(‘E’, 6) (‘E’, 7) TimeWindow(start=6, end=8)
(‘E’, 7) (‘E’, 8) TimeWindow(start=7, end=9)
(‘E’, 8) (‘E’, 9) TimeWindow(start=8, end=10)
(‘E’, 9) (‘E’, 10) TimeWindow(start=9, end=11)
(‘E’, 10) TimeWindow(start=10, end=12)
(E,1)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,2)
(E,1)

通过TimeWindow的信息,我们看到这是一个步长为1、长度为2左闭右开的窗口。这个符合滑动窗口特点。

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/dev/datastream/event-time/built_in/
  • https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1158815.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

通过Xpath解析尝试多种方法提取文本

from lxml import etree# XML文档内容 xml_data <root><element attribute"value1">Text 1</element><element attribute"value2">Text 2</element><element attribute"value3">Text 3</element> &…

文件批量改名:字母随机重命名文件,高效又轻松

在日常工作中&#xff0c;我们经常需要处理大量的文件&#xff0c;其中最繁琐的任务之一就是给文件重命名。如果手动一个一个地重命名&#xff0c;不仅耗时而且容易出错。为了解决这个问题&#xff0c;我们可以使用云炫文件管理器批量改名&#xff0c;用字母随机重命名文件&…

猪八戒、程序员客栈、码市哪个更好用?

最近有很多程序员伙伴在用接单平台线上兼职&#xff0c;问题也来了&#xff1a;到底哪个更好用嘞? 选取了几个问的比较多的&#xff1a;猪八戒、程序员客栈、码市。进行了一下简单的比较。 优点: 猪八戒 第一&#xff0c;猪八戒的名气是毋庸置疑的。无论是它成立至今的时间…

【移远QuecPython】EC800M物联网开发板的GPIO流水灯配置

【移远QuecPython】EC800M物联网开发板的GPIO流水灯配置 文章目录 GPIO初始化GPIO配置GPIO流水灯附录&#xff1a;列表的赋值类型和py打包列表赋值BUG复现代码改进优化总结 py打包 GPIO初始化 GPIO库&#xff1a; from machine import Pin初始化函数&#xff1a; class mac…

数据结构笔记(一)绪论

&#x1f600;前言 本人是根据bi站王卓老师视频学习并且做了相关笔记希望可以帮助到大家 &#x1f3e0;个人主页&#xff1a;尘觉主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是尘觉&#xff0c;希望我的文章可以帮助到大家&#xff0c;您的满意是我的动力&a…

PHP 人才招聘管理系统mysql数据库web结构layUI布局apache计算机软件工程网页wamp

一、源码特点 PHP 人才招聘管理系统是一套完善的web设计系统 layUI技术布局 &#xff0c;对理解php编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。 php人才招聘管理系统 代码 https://download.csdn.net/download/qq_4…

Http代理与socks5代理有何区别?如何选择?(一)

了解SOCKS和HTTP代理之间的区别对于优化您的在线活动至关重要&#xff0c;无论您是技术娴熟的个人、现代互联网用户还是企业所有者。在使用代理IP时&#xff0c;您需要先了解这两种协议之间的不同。 一、了解HTTP代理 HTTP&#xff08;超文本传输协议&#xff09;代理专门设计…

RoCEv2网络部署----Mellanox网卡配置

Mellanox 网卡配置RoCEv2步骤&#xff0c; 1. 设置RDMA CM 模式v2 cma_roce_mode -d mlx5_1 -p 1 -m 2 检查RDMA CM的RoCE模式 2. 开启 DCQCN 在priority 3 echo 1 > /sys/class/net/ens1np0/ecn/roce_np/enable/3 echo 1 > /sys/class/net/ens1np0/ecn/roce_rp/enable…

实现右键出现菜单选项功能

文章目录 需求分析需求 实现鼠标右键显示菜单的功能 分析 分析该需求,流程如下 写一个 div 作为右键弹出的菜单选项——> 监听鼠标右键事件——> 得到坐标位置——> 在该位置对写好的 菜单选项 进行展示——> 选择完毕后关闭菜单——> 鼠标左键其他位置 点…

无需使用jadx-gui和mac电脑获取app备案公钥的方法

由于2023年&#xff0c;国家要求上架的app必须备案&#xff0c;因此app备案成为了很多公司迫切的需求。 备案的时候&#xff0c;需要填写app公钥&#xff0c;MD5值等参数&#xff0c;这些参数对于不熟悉加密技术的人来说&#xff0c;简直是无从下手&#xff0c;因为目前的开发…

王道408模拟8套卷(六)

紫色标记是认为有一定的思维难度或重点总结 红色标记是这次模拟做错的 橙色代表自己&#xff0c;对题目的看法和命题的失误之处 蓝色代表自己后续检查时检查出失误并改正的题 分数用时 选择部分 72/8045min25min大题部分40/70110min总分112155min25min&#xff08;检查&#x…

上班族如何做日程自律清单实现逆袭呢?电脑日程管理软件助力高效办公

越来越多的上班族都表示自己每天的工作任务非常多&#xff0c;经常从早忙到晚也无法按时完成工作&#xff0c;导致工作的拖延完成&#xff0c;这应该怎么办呢&#xff1f;其实对于职场人士来说&#xff0c;想要在工作中提升效率&#xff0c;就需要提前做好每天的工作日程安排&a…

概念解析 | 神经网络中的位置编码(Positional Encoding)

注1:本文系“概念解析”系列之一,致力于简洁清晰地解释、辨析复杂而专业的概念。本次辨析的概念是:Positional Encoding 神经网络中的位置编码(Positional Encoding) A Gentle Introduction to Positional Encoding in Transformer Models, Part 1 1.背景介绍 在自然语言处理任…

柯桥专升本学校,自考本科文凭的价值如何?

自考本科文凭的价值如何&#xff1f; 自考本科学历是通过独立学习和考试获得的一种本科学历。对于自考本科学历的价值&#xff0c;很多人感到困惑&#xff0c;那么究竟自考本科学历有多大的价值呢? 首先&#xff0c;在就业市场上&#xff0c;自考本科学历具有一定的竞争力。随…

Python爬虫收集今日热榜数据:聚合全网热点排行榜

pip install websocket-client 废话不多说数据展示&#xff1a; 代码&#xff1a; 创建工作簿和工作表 # 创建工作簿和工作表 workbook openpyxl.Workbook() sheet workbook.active sheet.title 实时热榜 设置标题行 titles ["序号", "平台", &qu…

【Cocos新手进阶】使用cocos 的预制体创建动态的滚动框组件。

本篇文章主要讲解&#xff0c;使用cocos 游戏引擎制作动态生成的滚动框实例教程。 日期&#xff1a;2023年11月1日 作者&#xff1a;任聪聪 引擎版本&#xff1a;2.4.3 至 2.4.11 关于预制体的说明和概念 cocos中的预制体的作用是能够让你使用数据的形式进行控制界面的变化&am…

阿里云双11活动正式启动,2核2G云服务器1年99元,新老用户均同享!

2023年阿里云双11活动已经正式启动了&#xff0c;这次阿里云可算是拿出了十足的诚意&#xff0c;推出了一款特价云服务器&#xff0c;2核2G3M云服务器1年99元&#xff0c;续费不涨价&#xff0c;新老用户同享&#xff01; 一、阿里云双十一活动入口 活动地址&#xff1a;传送门…

Aigtek的ATA-8000射频功率放大器对比进口品牌TC

一、公司介绍 中国安泰&#xff1a;西安安泰电子科技有限公司是国内专业从事测量仪器研发生产和销售的高科技企业&#xff0c;公司依托西安交大、西北工业大学组建的科研团队&#xff0c;专注功率放大器、功率信号源等产品为核心的相关行业测试解决方案的研究&#xff0c;拥有国…

【Leetcode】【消失的数字】【C语言】

方法一&#xff1a;按位异或&#xff08;找单身狗&#xff09; 我们知道&#xff1a;按位异或^操作原则&#xff1a;相同为零&#xff0c;相异为一 所以 0^aa a ^a0 a ^bb ^a int missingNumber(int* nums, int numsSize){ int i 0; int tem1 0,tem20; for (i 0;i < nu…

打字练习软件 Type Fu mac中文版技能介绍

Type Fu mac是一款打字练习和提高打字速度的应用程序。它旨在帮助用户通过练习键盘打字&#xff0c;提高打字准确性和速度。无论您是初学者还是想要提高打字技能的专业人士&#xff0c;Type Fu都是一个很好的选择&#xff01; Type Fu mac采用了一种互动&#xff0c;游戏化的方…