Flink Process Function

news2025/7/14 3:45:25

处理函数: ProcessFunction: 含有状态流的特性

处理函数面对的是数据流中的最基本元素: 数据事件 event, 状态 state, 时间 time
在这里插入图片描述

文章目录

      • 1.基本处理函数 ProcessFunction
        • 1.1 处理函数的功能和使用
        • 1.2 ProcessFunction 解析
      • 2.处理函数的分类
        • 2.1 按键分区处理函数 KeyedProcessFunction
        • 2.2 窗口处理函数 ProcessWindowFunction
      • 3.TopN案例
        • 3.1 ProcessAllWindowFunctionTopN
        • 3.2 KeyedProcessWindowFunctionTopN

1.基本处理函数 ProcessFunction

1.1 处理函数的功能和使用

处理函数提供了一个“定时服务”(TimerService),我们可以通过它访问流中的事件(event)、时间戳(timestamp)、水位线(watermark),甚至可以注册定时事件。

在这里插入图片描述

继承了AbstractRichFunction, 可以获取到访问状态state和运行信息。

在这里插入图片描述

ProcessFunction 函数有点像 FlatMapFunction 的升级版。可以实现 Map、Filter、FlatMap 的所有功能。很明显,处理函数非常强大,能够做很多之前做不到的事情。

1.2 ProcessFunction 解析

抽象类 ProcessFunction 继承了 AbstractRichFunction

内部单独定义了两个方法:一个是必须要实现的抽象方法.processElement(), 另一个是非抽象方法.onTimer()

public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
 ...
	public abstract void processElement(I value, Context ctx, Collector<O> out)throws Exception;
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
...
}

processElement(): 处理元素, 输入数据值 value, 上下文 ctx, 以及“收集器”(Collector)out。

  • value: 当前流中的输入元素, 也就是正在处理的数据, 类型与流中数据类型一致。
  • ctx: 类型是 ProcessFunction 中定义的内部抽象类 Context
public abstract class Context {
 // 时间戳
 public abstract Long timestamp();
 // 定时服务
 public abstract TimerService timerService();
 // 侧输出流
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

onTimer(): 定时器服务, 用来触发定时器

public class ProcessTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .keyBy(data -> data.user)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long curTs = ctx.timerService().currentProcessingTime();
                        out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
                        ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
                    }
                })
                .print();

        environment.execute();
    }
}

在这里插入图片描述

2.处理函数的分类

  1. ProcessFunction
  2. KeyedProcessFunction
  3. ProcessWindowFunction
  4. ProcessAllWindowFunction
  5. CoProcessFunction
  6. ProcessJoinFunction
  7. BroadcastProcessFunction
  8. KeyedBroadcastProcessFunction

2.1 按键分区处理函数 KeyedProcessFunction

KeyedProcessFunction 可以看作是ProcessFunction的扩展, 我们只要基于 keyBy 之后的 KeyedStream, 直接调用.process()方法。

在这里插入图片描述

public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction 
{
...
	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
	public abstract class Context {...}
...
}

与 ProcessFunction 的定义几乎完全一样

使用处理时间定时器的具体示例

1.处理时间的定时器案例

public class ProcessTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .keyBy(data -> data.user)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long curTs = ctx.timerService().currentProcessingTime();
                        out.collect(ctx.getCurrentKey() + "数据到达, 到达时间:" + new Timestamp(curTs));
                        ctx.timerService().registerProcessingTimeTimer(curTs + 10 * 1000L);
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey() + " 定时触发, 时间为:" + new Timestamp(timestamp));
                    }
                })
                .print();

        environment.execute();
    }
}

在这里插入图片描述

2.事件事件的定时器

public class EventTimeTimerTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                )
                .keyBy(data -> data.url)
                .process(new KeyedProcessFunction<String, Event, String>() {
                    @Override
                    public void processElement(Event value, Context ctx, Collector<String> out) throws Exception {
                        long currTs = ctx.timerService().currentWatermark();

                    }
                })
                .print();

        environment.execute();
    }
}

2.2 窗口处理函数 ProcessWindowFunction

除了KeyedProcessFunction,另外一大类常用的处理函数,就是基于窗口的ProcessWindowFunction 和 ProcessAllWindowFunction

stream.keyBy( t -> t.f0 )
 .window( TumblingEventTimeWindows.of(Time.seconds(10)) )
 .process(new MyProcessWindowFunction())

ProcessWindowFunction 是WindowedStream调用process()

ProcessWindowFunction 既是处理函数又是全窗口函数

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
 extends AbstractRichFunction {
...
public abstract void process(
 KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
public void clear(Context context) throws Exception {}
public abstract class Context implements java.io.Serializable {...}
}

Context 为上下文内部类

public abstract class Context implements java.io.Serializable {
 public abstract W window();
 public abstract long currentProcessingTime();
 public abstract long currentWatermark();
 public abstract KeyedStateStore windowState();
 public abstract KeyedStateStore globalState();
 public abstract <X> void output(OutputTag<X> outputTag, X value);
}

3.TopN案例

  • ProcessAllWindowFunctionTopN
  • KeyedProcessWindowFunctionTopN

3.1 ProcessAllWindowFunctionTopN

方式1

public class ProcessAllWindowTopN {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event event, long l) {
                                return event.timestamp;
                            }
                        }));

        dataStream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.url;
            }
        })
                .windowAll(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
                    @Override
                    public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
                        HashMap<String, Long> urlCountMap = new HashMap<>();
                        // 遍历窗口中数据,将浏览量保存到一个 HashMap 中
                        for (String url : elements) {
                            if (urlCountMap.containsKey(url)) {
                                long count = urlCountMap.get(url);
                                urlCountMap.put(url, count + 1L);
                            } else {
                                urlCountMap.put(url, 1L);
                            }
                        }
                        ArrayList<Tuple2<String, Long>> mapList = new
                                ArrayList<Tuple2<String, Long>>();

                        // 将浏览量数据放入 ArrayList,进行排序
                        for (String key : urlCountMap.keySet()) {
                            mapList.add(Tuple2.of(key, urlCountMap.get(key)));
                        }
                        mapList.sort(new Comparator<Tuple2<String, Long>>() {
                            @Override
                            public int compare(Tuple2<String, Long> o1, Tuple2<String,
                                    Long> o2) {
                                return o2.f1.intValue() - o1.f1.intValue();
                            }
                        });

                        // 取排序后的前两名,构建输出结果
                        StringBuilder result = new StringBuilder();

                        result.append("========================================\n");
                        for (int i = 0; i < 2; i++) {
                            Tuple2<String, Long> temp = mapList.get(i);
                            String info = "浏览量 No." + (i + 1) +
                                    " url:" + temp.f0 +
                                    " 浏览量:" + temp.f1 +
                                    " 窗 口 结 束 时 间 : " + new
                                    Timestamp(context.window().getEnd()) + "\n";
                            result.append(info);
                        }
                        result.append("========================================\n");
                        out.collect(result.toString());

                    }
                })
                .print();

        environment.execute();
    }
}

不区分 url 链接, 全部的访问数据收集起来, 统一统计计算, 不用keyBy(), 直接开窗。

利用HashMap 保存 url 访问, 最后转为ArrayList, 然后排序, 取出前两名。

3.2 KeyedProcessWindowFunctionTopN

TopN: 方式2

public class TopNKeyedProcessFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.setParallelism(1);

        SingleOutputStreamOperator<Event> dataStream = environment.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        })
                );

        SingleOutputStreamOperator<UrlCountView> urlCountStream = dataStream.keyBy(data -> data.url)
                .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
                .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());

        urlCountStream.keyBy(data -> data.end)
                .process(new TopN(2))
                .print();


        environment.execute();
    }
    public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long>{

        @Override
        public Long createAccumulator() {
            return 0l;
        }

        @Override
        public Long add(Event value, Long accumulator) {
            return accumulator + 1;
        }

        @Override
        public Long getResult(Long accumulator) {
            return accumulator;
        }

        @Override
        public Long merge(Long a, Long b) {
            return null;
        }
    }

    public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlCountView, String, TimeWindow>{

        @Override
        public void process(String s, Context context, Iterable<Long> elements, Collector<UrlCountView> out) throws Exception {
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect(new UrlCountView(s, elements.iterator().next(), start, end));
        }
    }

    public static class TopN extends KeyedProcessFunction<Long, UrlCountView, String>{
        private Integer count;

        private ListState<UrlCountView> urlViewCountListState;

        public TopN(Integer count){
            this.count = count;
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            urlViewCountListState = getRuntimeContext().getListState(
                    new ListStateDescriptor<UrlCountView>("url-view-count-list",
                            Types.POJO(UrlCountView.class)));
        }

        @Override
        public void processElement(UrlCountView value, Context ctx, Collector<String> out) throws Exception {
            urlViewCountListState.add(value);

            ctx.timerService().registerEventTimeTimer(ctx.getCurrentKey() + 1);
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            // 将数据从列表状态变量中取出,放入 ArrayList,方便排序
            ArrayList<UrlCountView> urlViewCountArrayList = new ArrayList<>();
            for (UrlCountView urlViewCount : urlViewCountListState.get()) {
                urlViewCountArrayList.add(urlViewCount);
            }
            // 清空状态,释放资源
            urlViewCountListState.clear();

            urlViewCountArrayList.sort(new Comparator<UrlCountView>() {
                @Override
                public int compare(UrlCountView o1, UrlCountView o2) {
                    return o2.count.intValue() - o1.count.intValue();
                }
            });

            // 取前两名,构建输出结果
            StringBuilder result = new StringBuilder();
            result.append("========================================\n");
            result.append("窗口结束时间:" + new Timestamp(timestamp - 1) + "\n");
            for (int i = 0; i < this.count; i++) {
                UrlCountView UrlViewCount = urlViewCountArrayList.get(i);
                String info = "No." + (i + 1) + " "
                        + "url:" + UrlViewCount.url + " "
                        + "浏览量:" + UrlViewCount.count + "\n";
                result.append(info);
            }
            result.append("========================================\n");
            out.collect(result.toString());

        }
    }
}

在这里插入图片描述

在这里插入图片描述

========================================
窗口结束时间:2022-12-22 19:17:30.0
No.1 url:./fav 浏览量:2
No.2 url:./home 浏览量:2
========================================

========================================
窗口结束时间:2022-12-22 19:17:35.0
No.1 url:./home 浏览量:4
No.2 url:./fav 浏览量:3
========================================

========================================
窗口结束时间:2022-12-22 19:17:40.0
No.1 url:./home 浏览量:5
No.2 url:./fav 浏览量:2
========================================

这里利用了状态去保存访问次数

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/108880.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

LaTex期刊模板下载与使用

1 LaTex期刊模板下载与使用 接上文介绍了LaTex的下载安装和基本语法使用规则。 上文地址&#xff1a;科研人快速入门LaTex到日常使用&#xff0c;下载安装配置&#xff0c;语法使用说明等 一般来说&#xff0c;LaTeX主要用在论文提交&#xff0c;书籍排版过程中&#xff0c;提…

Kubernetes:Pod

文章目录1、Pod 定义2、Pod 使用2.1、init 容器2.2、容器生命周期处理函数2.3、容器的探测2.3.1、探测机制2.3.2、探测结果2.3.3、探测类型startupProbereadinessProbelivenessProbe2.3.4、案例2.4、测试代码3、Pod 的部署3.1、Deployment3.2、DaemonSets3.3、静态 pod4、参考p…

我国金属包装行业企业数量下降 经济效益整体表现不佳 但亏损额减少

根据观研报告网发布的《中国金属包装市场发展趋势研究与未来投资预测报告&#xff08;2022-2029年&#xff09;》显示&#xff0c;金属包装是指采用金属薄板,针对不同用途制作的各种不同形式的薄壁包装容器,相较于其它包装&#xff0c;金属包装因为其材质特性&#xff0c;比一般…

3DEXPERIENCE平台2023新功能揭秘!Governance云端数据管理解决方案

3DEXPERIENCE平台更新版本已经与大家见面&#xff0c;今天众联亿诚与大家分享Governance新功能。 多年来&#xff0c;我们一直在寻找SOLIDWORKS数据管理的更优解决方案。但就是感觉很艰难&#xff0c;硬件投资是昂贵的&#xff0c;实施是资源密集型的&#xff0c;更重要的是&a…

【TypeScript】TS入门(一)

&#x1f431;个人主页&#xff1a;不叫猫先生 &#x1f64b;‍♂️作者简介&#xff1a;前端领域新星创作者、华为云享专家、阿里云专家博主&#xff0c;专注于前端各领域技术&#xff0c;共同学习共同进步&#xff0c;一起加油呀&#xff01; &#x1f4ab;系列专栏&#xff…

Hook原理

对于会Hook的人来说,Hook其实也就那么回事.对于没有Hook过的人来说,会感觉Hook很高大上(其实也没毛病). 那么今天我们就来探讨一些Hook的原理是什么. 我认为任何Hook都可以分为以下三步(简称WFH): 需要Hook的是什么,在哪里(后面简称Where). 寻找到Hook的地方.(后面简称Find)…

JavaScript基础(15)_数组

对象分为三种&#xff1a;内建对象、宿主对象、自定义对象。 内建对象 内建对象是指由ECMAScript事先提供的、不依赖于宿主环境的对象&#xff0c;这些对象在程序运行之前就已经存在&#xff0c;并可以直接在程序中任何地方任何时候拿来使用。常见的内建对象可以直接通过new调…

【JavaEE】Servlet

努力经营当下&#xff0c;直至未来明朗&#xff01; 文章目录【Servlet】1.0Servlet概述写一个Servlet程序1. 创建项目2. 引入Servlet依赖3. 创建目录结构4. 编写代码5. 打包程序6. 部署程序7. 验证程序【Servlet 2.0】访问出错【小结】追求想要的一定很酷&#xff01; 【Serv…

docker rootless安装

rootless 简介 rootless模式允许以非root用户身份运行Docker守护程序和容器&#xff0c;以减轻守护程序和容器运行时中的潜在漏洞。只要满足先决条件&#xff0c;即使在Docker守护程序安装期间&#xff0c;无根模式也不需要root特权。无根模式是Docker Engine v19.03中引入的一…

【俄罗斯方块】单机游戏-微信小程序项目开发入门

这是一个仿俄罗斯方块小游戏的微信小程序&#xff0c;只需要写一小段代码就实现出来了&#xff0c;有兴趣的同学完全可以自己动手开发&#xff0c;来看看实现过程是怎样的呢&#xff0c;边写边做&#xff0c;一起来回忆小时候玩过的经典俄罗斯方块游戏吧。 文章目录创建小程序页…

certbot生成证书,配置nginx,利用脚本自动续期

踩了大量坑&#xff0c;做下记录。以下适用于博主本人&#xff0c;但是未必会适用于所有人 单域名与泛域名证书生成 sudo certbot certonly --standalone --email 邮箱 -d 域名# 单域名certbot certonly --preferred-challenges dns --manual -d *.baidu.com(修改这里) --ser…

【檀越剑指大厂—Springboot】Springboot高阶

一.整体介绍 1.什么是 Springboot? Springboot 是一个全新的框架&#xff0c;简化 Spring 的初始搭建和开发过程&#xff0c;使用了特定的方式来进行配置&#xff0c;让开发人员不再需要定义样板化的配置。此框架不需要配置 xml&#xff0c;依赖于 maven 这样的构建系统。 …

嵌入式分享合集125

一、多层板PCB设计中电源平面相对地平面要进行内缩&#xff1f; 有一些人绘制的PCB&#xff0c;在GND层和电源层会进行一定程度的内缩设计&#xff0c;那么大家有没有想过为什么要内缩呢。 需要搞清楚这个问题&#xff0c;我们需要来先了解一个知识点&#xff0c;那就是“20H”…

matlab 功率谱分析

谱分析介绍 谱分析是一种用于研究函数的数学方法。在数学中&#xff0c;谱分析的基本概念是将函数分解成不同的频率成分&#xff0c;以便更好地理解其行为。这些频率成分可以表示为正弦或余弦函数的级数和&#xff0c;称为谱线。 谱分析常用于信号处理、音频信息处理和图像处…

Windows系统增强优化工具

计算机系统优化的作用很多&#xff0c;它可以清理WINDOWS临时文件夹中的临时文件&#xff0c;释放硬盘空间&#xff1b;可以清理注册表里的垃圾文件&#xff0c;减少系统错误的产生&#xff1b;它还能加快开机速度&#xff0c;阻止一些程序开机自动执行&#xff1b;还可以加快上…

数据也能开口说话?这次汇报,老板疯狂给我点赞

年底了&#xff0c;大家的工作汇报进行得怎么样了&#xff1f; 是不是少不了各种数据&#xff1f;饼图、柱形图、条形图、折线图、散点图有没有充斥在你的 PPT 中&#xff1f; 我们出版社的数据统计一般截止到 12 月中下旬&#xff0c;所以前两天&#xff0c;我已经做完了年终…

白话说Java虚拟机原理系列【第三章】:类加载器详解

文章目录jvm.dllBootstrapLoader&#xff1a;装载系统类ExtClassLoader&#xff1a;装载扩展类AppClassLoader&#xff1a;装载自定义类双亲委派模型类加载器加载类的方式类加载器特性类加载器加载字节码到JVM的过程自定义/第三方类加载器类加载器加载字节码到哪&#xff1f;Cl…

浅谈冯诺依曼体系,操作系统和进程概念

文章目录浅谈冯诺依曼体系结构和操作系统冯诺依曼体系结构冯诺依曼体系结构图操作系统进程task_struct内容分类进程内核数据结构&#xff08;task_struct)进程对应的磁盘代码查看进程ps 列出系统中运行的进程ps ajx 查看系统中所有运行的进程ps ajx | grep 程序名 &#xff1a;…

【Linux操作系统】——在Ubuntu20.04上安装MySQL数据库

在Ubuntu上安装MySQL MySQL是一个开源数据库管理系统&#xff0c;通常作为流行的LAMP&#xff08;Linux&#xff0c;Apache&#xff0c;MySQL&#xff0c;PHP / Python / Perl&#xff09;堆栈的一部分安装。它使用关系数据库和SQL&#xff08;结构化查询语言&#xff09;来管…

类美团外卖、骑手、类快递取餐柜、整合菜品供应商、前厅、后厨、配送、智能厨电设备的智慧餐饮业务

一种商业模型之类美团外卖、骑手、类快递取餐柜、整合前厅、后厨、智能厨电设备智慧餐饮业务架构 涉及到&#xff1a; 0、基础数据管理 1、菜谱创错 2、菜谱编译 3、菜谱商业化 4、厨电管理 5、后厨管理 6、前厅管理 …