0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)

news2025/7/28 6:00:21

大纲

  • map
  • reduce
  • 完整代码
  • 参考资料

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中,我们发现如果窗口内元素个数没有达到窗口大小时,计算个数的函数是不会被调用的。如下图中红色部分
在这里插入图片描述
那么有没有办法让上图中(B,2)和(D,5)也会被计算呢?
这就可以使用本节介绍的时间滚动窗口。它不依赖于窗口中元素的个数,而是窗口的时间,即窗口时间到了,计算就会进行。
我们稍微修改下《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》的例子,让元素集中在“A”上。

map

class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 

reduce

    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

这儿我们的Window使用的是滚动时间窗口,其中参数Time.milliseconds(2)是指窗口时长,即2毫秒一个窗口。
我们运行多次代码可以得到不同的结果

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) TimeWindow(start=1698771761164, end=1698771761166)
(A,12)
(‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771761166, end=1698771761168)
(A,8)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) TimeWindow(start=1698771731386, end=1698771731388)
(A,16)
(‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771731388, end=1698771731390)
(A,4)

在这里插入图片描述

或者

(‘A’, 2) (‘A’, 1) (‘A’, 4) (‘A’, 3) (‘A’, 6) (‘A’, 5) (‘A’, 7) (‘A’, 8) (‘A’, 9) (‘A’, 10) (‘A’, 11) (‘A’, 12) (‘A’, 13) (‘A’, 14) (‘A’, 15) (‘A’, 16) (‘A’, 17) (‘A’, 18) (‘A’, 19) (‘A’, 20) TimeWindow(start=1698771714992, end=1698771714994)
(A,20)

在这里插入图片描述

可以发现结果并不稳定。但是可以发现,每个元素都参与了计算,而不像个数滚动窗口那样部分数据没有被触发计算。

完整代码

from typing import Iterable
import time
from pyflink.common import Types, Time
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import TimeWindow, TumblingProcessingTimeWindows
   
class SumWindowFunction(WindowFunction[tuple, tuple, str, TimeWindow]):
    def apply(self, key: str, window: TimeWindow, inputs: Iterable[tuple]):
        print(*inputs, window)
        return [(key,  len([e for e in inputs]))]


word_count_data = [("A",2),("A",1),("A",4),("A",3),("A",6),("A",5),("A",7),("A",8),("A",9),("A",10),
                   ("A",11),("A",12),("A",13),("A",14),("A",15),("A",16),("A",17),("A",18),("A",19),("A",20)]

def word_count():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    # write all the data to one file
    env.set_parallelism(1)

    source_type_info = Types.TUPLE([Types.STRING(), Types.INT()])
    # define the source
    # mappging
    source = env.from_collection(word_count_data, source_type_info)
    # source.print()

    # keying
    keyed=source.key_by(lambda i: i[0]) 
    
    # reducing
    reduced=keyed.window(TumblingProcessingTimeWindows.of(Time.milliseconds(2))) \
                    .apply(SumWindowFunction(),
                        Types.TUPLE([Types.STRING(), Types.INT()]))
        
    # # define the sink
    reduced.print()

    # submit for execution
    env.execute()

if __name__ == '__main__':
    word_count()

参考资料

  • https://nightlies.apache.org/flink/flink-docs-master/api/python/reference/pyflink.datastream/api/pyflink.datastream.window.TumblingProcessingTimeWindows.html#pyflink.datastream.window.TumblingProcessingTimeWindows

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1156522.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

一次不接受ElasticSearch官方建议导致的事故

记录一下 一次Elasticsearch集群事故分析、排查、处理 背景介绍 事故发生的ElasticSearch集群共有7台机器: 127.0.204.193127.0.204.194127.0.204.195127.0.220.73127.0.220.74127.0.220.220127.0.220.221 其中193、194、195的机器配置一样,具体如下&…

百度地图直接用的封装好的--自用vue的(每次项目都要有百度地图,还是搞个封装的差不多的以后可以直接拿来用)

自用的封装好的,有弹窗,轨迹回放,画点画地图 完整代码使用 百度地图的官方文档 百度地图必须的三个引用 完整代码 <template><AButton style"background-color: #3ba7ea;color: white;width: 100px;float: right" click"buttonClick">轨迹回放…

图书馆书目推荐数据分析与可视化

目 录 摘 要 I ABSTRACT II 目 录 II 第1章 绪论 1 1.1背景及意义 1 1.2 国内外研究概况 1 1.3 研究的内容 1 第2章 相关技术 3 2.1 nodejs简介 4 2.2 express框架介绍 6 2.4 MySQL数据库 4 第3章 系统分析 5 3.1 需求分析 5 3.2 系统可行性分析 5 3.2.1技术可行性&#xff1a;…

瑞萨e2studio(28)----SPI 驱动WS2812灯珠

瑞萨e2studio.28--SPI 驱动WS2812灯珠 概述视频教学样品申请芯片级联方法数据传输时序新建工程软件准备保存工程路径芯片配置开始SPI配置SPI属性配置时钟配置SPI配置CPHA配置代码hal_entry.cws2812.cws2812.h 概述 本文介绍了如何使用瑞萨RA微控制器&#xff0c;结合E2STUDIO…

基于热交换算法的无人机航迹规划-附代码

基于热交换算法的无人机航迹规划 文章目录 基于热交换算法的无人机航迹规划1.热交换搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用热交换算法来优化无人机航迹规划。 1.热交换…

【设计模式】第22节:行为型模式之“状态模式”

一、简介 状态模式一般用来实现状态机&#xff0c;而状态机常用在游戏、工作流引擎等系统开发中。不过&#xff0c;状态机的实现方式有多种&#xff0c;除了状态模式&#xff0c;比较常用的还有分支逻辑法和查表法。该模式允许对象内部状态改变使改变它的行为。 二、适用场景…

【NI-DAQmx入门】计数器

1.计数器的作用 NI产品的计数器一般来说兼容TTL信号&#xff0c;定义如下&#xff1a;0-0.8V为逻辑低电平&#xff0c;2~5V为高电平&#xff0c;0.8-2V为高阻态&#xff0c;最大上升下降时间为50ns。 计数器可以感测上升沿&#xff08;从逻辑低到逻辑高的转变&#xff09;和下降…

【电源专题】POE连接方式与功率等级划分

在文章【电源专题】什么是POE&#xff1f;中我们讲到了&#xff1a;PoE&#xff08;Power over Ethernet&#xff09;是指通过网线传输电力的一种技术&#xff0c;借助现有以太网口通过网线同时为终端设备&#xff08;如&#xff1a;IP电话、AP、IP摄像头等&#xff09;进行数据…

web:[GYCTF2020]Blacklist

题目 点开靶机&#xff0c;页面显示为 查看源码 没有其他线索 先提交1试一下 猜测是sql注入&#xff0c;先测试 同时注意到url 提交为3-1&#xff0c;发现页面回显为空白 可以判断为字符型注入 输入select&#xff0c;看是否存在回显 回显了黑名单限制的关键字 但是发现没有…

第五章 I/O管理 九、磁盘的结构

目录 一、概念 二、磁盘的物理地址 1、定义&#xff1a; 2、图像&#xff1a; 如何读取一个“块”&#xff1a; 三、磁盘的分类 四、总结 一、概念 磁盘是由多个盘片和读写磁头组成的&#xff0c;每个盘片都有自己的读写磁头。盘片表面被划分成许多同心圆的磁道&#xff…

并发编程-CPU缓存架构详解 Disruptor的高性能设计方案

1.CPU缓存架构详解 1.1 CPU高速缓存概念 CPU缓存即高速缓冲存储器&#xff0c;是位于CPU与主内存间的一种容量较小但速度很高的存储器。CPU高 速缓存可以分为一级缓存&#xff0c;二级缓存&#xff0c;部分高端CPU还具有三级缓存&#xff0c;每一级缓存中所储存的全部数 据都…

深入了解 RocketMQ:高性能消息中间件

二、RocketMQ基本概念 2.1 消息模型&#xff08;Message Model&#xff09; RocketMQ主要由Producer、Broker、Consumer三部分组成&#xff0c;其中Producer负责生产消息&#xff0c;Consumer负责消费消息&#xff0c;Broker负责存储消息。Broker在实际部署过程中对应一台服务…

硬件知识积累 RS422接口

1. RS422 基本介绍 EIA-422&#xff08;过去称为RS-422&#xff09;是一系列的规定采用4线&#xff0c;全双工&#xff0c;差分传输&#xff0c;多点通信的数据传输协议。它采用平衡传输采用单向/非可逆&#xff0c;有使能端或没有使能端的传输线。和RS-485不同的是EIA-422不允…

高精度5米分辨率DEM数字高程数据

​5米分辨率DEM/DSM(无控)&#xff0c;以多颗高分辨率卫星数据为原始数据&#xff0c;基于智能立体模型构建与点云密集匹配&#xff0c;利用网络分布式与多核并行计算技术&#xff0c;三维点云融合与地形提取技术&#xff0c;辅以智能化的人机交互编辑等手段&#xff0c;处理和…

Android开发知识学习——Kotlin基础

函数声明 声明函数要用用 fun 关键字&#xff0c;就像声明类要用 class 关键字一样 「函数参数」的「参数类型」是在「参数名」的右边 函数的「返回值」在「函数参数」右边使用 : 分隔&#xff0c;没有返回值时可以省略 声明没有返回值的函数&#xff1a; fun main(){println…

阿里云发布通义千问2.0,性能超GPT-3.5,加速追赶GPT-4

10月31日&#xff0c;阿里云正式发布千亿级参数大模型通义千问2.0。在10个权威测评中&#xff0c;通义千问2.0综合性能超过GPT-3.5&#xff0c;正在加速追赶GPT-4。当天&#xff0c;通义千问APP在各大手机应用市场正式上线&#xff0c;所有人都可通过APP直接体验最新模型能力。…

聊一聊B端产品和C端产品的区别

To C 和 To B 的产品究竟有什么区别&#xff1f;难道仅仅只是使用对象和买单者不一样嘛&#xff1f;刚入行的产品经理是不是傻傻分不清楚&#xff1f;做产品经理这么久的你是否思考过这个问题&#xff1f; 作为一名产品经理&#xff0c;也设计过To B 和 To C的产品&#xff0c…

13年测试老鸟,软件测试经验总结分享,这几年你走了多少坑...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 1、测试阶段划分 …

个性化医疗:数字孪生的未来之路

数字孪生技术已经成为医疗领域的一项重要创新&#xff0c;为医疗保障提供了全新的可能性。它基于数学、物理和计算机科学原理&#xff0c;通过创建数字化模型和仿真来模拟生物系统和医疗设备。 1. 个性化治疗 数字孪生技术可创建患者的个性化模型&#xff0c;以更好地了解疾病…

Python 自动化(十六)静态文件处理

准备工作 将不同day下的代码分目录管理&#xff0c;方便后续复习查阅 (testenv) [rootlocalhost projects]# ls day01 day02 (testenv) [rootlocalhost projects]# mkdir day03 (testenv) [rootlocalhost projects]# cd day03 (testenv) [rootlocalhost day03]# django-admi…