0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)

news2025/7/26 14:22:57

在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》我们介绍了不会有重复数据的时间滚动窗口。本节我们将介绍存在重复计算数据的时间滑动窗口。
关于滑动窗口,可以先看下《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》。下图就是个数滑动窗口示意图。
在这里插入图片描述
我们看到个数滑动窗口也会因为窗口内数据不够而不被触发。但是时间滑动窗口则可以解决这个问题,我们只要把窗口改成时间类型即可。
相应的代码我们参考《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》,只要把TumblingProcessingTimeWindows改成SlidingProcessingTimeWindows,并增加一个偏移参数(Time.milliseconds(1))即可。这意味着我们将运行一个时间长度为2毫秒,每次递进1毫秒的窗口。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import  TimeWindow, SlidingProcessingTimeWindows
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(SlidingProcessingTimeWindows.of(Time.milliseconds(2), Time.milliseconds(1))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

运行结果

运行两次上述代码,我们发现每次都不同,而且有重叠计算的元素。

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773292650, end=1698773292652)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) TimeWindow(start=1698773292651, end=1698773292653)
(A,3)
(A,11)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773292652, end=1698773292654)
(A,17)

在这里插入图片描述

(‘A’, 2) (‘A’, 1) (‘A’, 4) TimeWindow(start=1698773319933, end=1698773319935)
(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698773319934, end=1698773319936)
(A,3)
(A,12)
(‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698773319935, end=1698773319937)
(A,17)

在这里插入图片描述

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.SlidingProcessingTimeWindows.html#pyflink.datastream.window.SlidingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1156859.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【热带气旋】基本介绍:定义、标准、结构等

热带气旋基本介绍 热带气旋(Tropical Cyclone, TC)1 热带气旋定义2 热带气旋标准2.1 热带低压(Tropical Depression)2.2 热带风暴(Tropical storm)2.3 强热带风暴(Severe tropical storm&#x…

2023 DevFest 开发者大会 | 精彩进行时!

DevFest 是由全球各地的谷歌开发者社区 (Google Developer Groups,GDG) 主导的,为期数月的系列 Google 技术交流活动。DevFest 为参与者提供一个与 Google 员工、GDE 谷歌开发者专家、社区 KOL、行业开发者和问题解决者面对面交流的机会。 今年&#xff…

[SUCTF 2019]EasySQL 1

题目环境: 把你的旗子给我,我会告诉你旗子是不是对的。 判断注入类型1回显结果 不是字符型SQL注入 1回显结果 数字型SQL注入 查所有数据库,采用堆叠注入1;show databases;查看所有数据表1;show tables;尝试爆Flag数据表的字段1;show columns from Flag; …

机器人仿真-gazebo学习笔记(3)URDF和机器人模型

1.URDF简介 URDF(统一机器人麦哦书格式)是ROS中的重要机器人模型描述格式,ROS提供了URDF文件的c解析器,可以解析URDF文件中使用XML格式的机器人模型。 urdf - ROS Wiki 自己查阅ros官方对URDF的介绍其实会强于大部分网上流传的文章。 1.URDF文件常用的…

电脑硬件坏了,如何维修?

在电子设备日益普及的今天,电脑已成为很多人生活和工作中不可或缺的工具,然而在使用过程中很容易遇见电脑故障之类的问题,这些问题十有八九来自硬件,那么针对电脑硬件问题,该如何维修? 一般来说&#xff0c…

深入探究Vue.js生命周期及其应用场景

当谈到Vue.js的生命周期时,我们指的是组件在创建、更新和销毁过程中发生的一系列事件。了解Vue的生命周期对于开发人员来说是至关重要的,因为它们提供了一个机会来执行特定任务,并在不同的阶段处理组件。 Vue的生命周期可以分为八个不同的阶…

项目管理-挣值管理例题-使用SV进度偏差和CV成本偏差来判断进度和成本是否合适

基础概念介绍 CV和SV的计算公式 在财务分析中,常常会用到CV和SV这两个指标。CV是成本偏差,SV是进度偏差。它们的计算公式如下: CV EV - AC SV EV - PV 其中,EV是挣值,AC是实际成本,PV是计划价值。 …

Android Studio中配置Git

安装Git 在安装Android Studio之前,需要先安装Git。可以从Git官网下载并安装Git:https://git-scm.com/downloads 在Android Studio中配置Git 在Android Studio中,依次点击“File” -> “Settings”,在弹出的窗口中选择“Ver…

软件产品如何进行跨浏览器测试?

跨浏览器测试是确保Web应用程序的功能在不同浏览器、浏览器版本和操作系统之间保持一致的过程,从而为其用户提供轻松的用户体验。跨浏览器测试涉及浏览器和操作系统的组合,以测试应用程序的响应能力和兼容性。 一、跨浏览器测试的作用   1、发现兼容性…

Postman测试金蝶云星空Webapi【协同开发云】

文章目录 Postman测试金蝶云星空Webapi【协同开发云】环境说明业务背景大致流程具体操作请求登录接口请求标准接口查看保存提交审核反审核撤销 请求自定义接口参数是字符串参数是实体类单个实体类实体类是集合 其他 Postman测试金蝶云星空Webapi【协同开发云】 环境说明 金蝶…

面向对象【this关键字】

文章目录 this关键字基本作用调用变量调用方法调用构造器this 关键字的限制 this关键字 它在方法(实例方法或非 static 的方法)内部使用,表示调用该方法的对象它在构造器内部使用,表示该构造器正在初始化的对象。 基本作用 引用…

如何将 ruby 打包类似于jdk在另一台相同架构的机器上面开箱即用

需求 目前工作中使用到了ruby作为java 项目的中转语言,但是部署ruby的时候由于环境的不同会出现安装依赖包失败的问题,如何找到一种开箱即用的方式类似于java 中的jdk内置jvm这种方式 解决 TruffleRuby 完美解决问题,TruffleRuby 是使用 T…

5.13.Post方法进行线程切换

在上节课中呢,我向你介绍接口调用过程中啊,曾经看到过post方法。当时我已经向你解释过pose方法呢,就是从一个线程切换到另外一个线程,那整个的过程呢,非常简单,就是从发送线程创建一个消息。塞到接收线程的…

指纹识别之dns

https://ephen.me/2017/dns-tcp/ https://c.biancheng.net/view/6457.html https://www.jianshu.com/p/b483300378af https://www.cnblogs.com/549294286/p/5172448.html wireshark数据包分析 Packet Details Pane(数据包详细信息), 在数据包列表中选择指定数据包,…

nodejs+vue智慧补助系统的设计与实现-计算机毕业设计

随着网络技术的不断发展,多媒体技术应用渐渐的出现在教育领域中,智慧补助系统已经成为教育发展的一个热门话题。 在众多网络开发技术中,nodejs是当前很热门的一种软件,因为它可以进行数据库操作及方便用户控制管理。 在各学校的教…

广播域与冲突域详解

广播域与冲突域详解 一般普遍认为一个HUB(集线器)就是一个冲突域,而使用交换机就可以隔离冲突域。但是无论是HUB 还是交换机它们都具有广播域。HUB 和交换机的区别:同一个 HUB 的所有端口都在同一个广播域和同一个冲突域内的。而…

NIFI1.23.2_最新版_性能优化通用_技巧积累_随时更新---大数据之Nifi工作笔记0063

nifi好用,但是对机器的性能要求也高,如果性能达不到,就会导致,问题发生,比如,队列里显示有内容,但是实际上队列是空的,清也清不掉,只能重启,很麻烦. 关于优化:1.配置前端页面刷新的间隔时间默认30秒,我们可以自己需要看的时候手动刷新我们改成300sec 2.修改CPU阻塞时间,提高CPU…

C++-实现一个简单的菜单程序

C-实现一个简单的菜单程序 1,if-else语句实现1.1,代码实现1.2,功能检测 2,switch语句实现2.1,代码实现2.2,功能检测 1,if-else语句实现 实现一个简单的菜单程序,运行时显示"Men…

节日活动软文怎么写?媒介盒子为您解答

不管是春节、除夕这类传统节日,还是万圣节、情人节这类舶来节日,又或者是双十一、618这类电商节。品牌方只要在节日中举办活动,都能够提升品牌曝光率,还能有效减少运营时间成本提高效率,节日活动软文能够帮助商家宣传活…

Java实验四

要求:设计一个文字字体设置窗体,在该窗体中可以设置要显示文字的字体内容,包括字体名称、字体大小、粗体和斜体等字体风格。并模拟在不同操作系统下的显示效果。添加事件处理机制,要求实现如下功能: 当在文本框中输入…