AggregateFunction结合自定义触发器实现点击率计算

news2025/5/13 8:49:09

背景:

接上一篇文章,ProcessWindowFunction 结合自定义触发器会有状态过大的问题,本文就使用AggregateFunction结合自定义触发器来实现,这样就不会导致状态过大的问题了

AggregateFunction结合自定义触发器实现

在这里插入图片描述
flink对于每个窗口只需要维护一个状态:不像ProcessWindowFunction那样需要把窗口内收到的所有消息都作为状态存储起来

在这里插入图片描述
完整代码参见:

package wikiedits.func;


import java.text.SimpleDateFormat;
import java.util.Date;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;



public class AggregateFunctionAndTiggerDemo {

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 使用处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///D:/tmp/flink/checkpoint/aggregatetrigger"));

        // 并行度为1
        env.setParallelism(1);
        // 设置数据源,一共三个元素
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(new SourceFunction<Tuple2<String, Integer>>() {
            @Override
            public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
                int xxxNum = 0;
                int yyyNum = 0;
                for (int i = 1; i < Integer.MAX_VALUE; i++) {
                    // 只有XXX和YYY两种name
                    String name = (0 == i % 2) ? "XXX" : "YYY";
                    // 更新aaa和bbb元素的总数
                    if (0 == i % 2) {
                        xxxNum++;
                    } else {
                        yyyNum++;
                    }
                    // 使用当前时间作为时间戳
                    long timeStamp = System.currentTimeMillis();
                    // 将数据和时间戳打印出来,用来验证数据
                    if (xxxNum % 2000 == 0) {
                        System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n", name,
                                time(timeStamp), xxxNum, yyyNum));
                    }
                    // 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(new Tuple2<String, Integer>(name, 1), timeStamp);
                    // 每发射一次就延时1秒
                    Thread.sleep(1);
                }
            }

            @Override
            public void cancel() {}
        });

        // 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunction
        SingleOutputStreamOperator<Tuple2<String, Integer>> mainDataStream = dataStream
                // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种
                .keyBy(value -> value.f0)
                // 5秒一次的滚动窗口
                .timeWindow(Time.minutes(5))
                // 10s触发一次计算,更新统计结果
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
                // 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子
                .aggregate(
                        new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {

                            // 1、初始值
                            // 定义累加器初始值
                            @Override
                            public Tuple2<String, Integer> createAccumulator() {
                                return new Tuple2<String, Integer>("", 0);
                            }

                            // 2、累加
                            // 定义累加器如何基于输入数据进行累加
                            @Override
                            public Tuple2<String, Integer> add(Tuple2<String, Integer> value,
                                    Tuple2<String, Integer> accumulator) {
                                accumulator.f0 = value.f0;
                                accumulator.f1 += value.f1;
                                return accumulator;
                            }

                            // 3、合并
                            // 定义累加器如何和State中的累加器进行合并
                            @Override
                            public Tuple2<String, Integer> merge(Tuple2<String, Integer> acc1,
                                    Tuple2<String, Integer> acc2) {
                                acc1.f1 += acc2.f1;
                                return acc1;
                            }

                            // 4、输出
                            // 定义如何输出数据
                            @Override
                            public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
                                return accumulator;
                            }
                        });

        // 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");

    }



    public static String time(long timeStamp) {
        return new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(timeStamp));
    }



}

通过这种方式我们就可以做到统计某个页面一天内至今为止的点击率,每10s输出一次点击率的结果,并且不会引起状态膨胀的问题

参考文献:
https://www.cnblogs.com/Springmoon-venn/p/13667023.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/980179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

前端使用elementui开发后台管理系统的常用功能(持续更新)

前言&#xff1a;本次的文章完全是自己开发中遇到的一些问题&#xff0c;经过不断的修改终于完成的一些功能&#xff0c;当个快捷的查看手册吧~ elementui开发后台管理系统常用功能 高级筛选的封装elementui的表格elementui的表格实现跨页多选回显elementui的表单elementui的日…

VIRTIO-BLK代码分析(3)数据流处理

VIRTIO-BLK整个过程数据流如下所示&#xff1a; IO请求发送过程 虚拟机中通过FIO等下发IO请求&#xff0c;IO请求通过VFS/filesystem&#xff0c;然后到BLOCK层&#xff0c;传递给virtio-blk驱动&#xff0c;virtio-blk驱动通过virtio_queue_rq()下发IO请求&#xff0c;并通过v…

视频过大如何压缩变小?想学的小伙伴不要错过机会

视频过大如何压缩变小&#xff1f;在当今社交媒体和直播平台的时代&#xff0c;视频内容的传播已经广泛而频繁&#xff0c;我们每天不仅刷视频打发无聊的时间&#xff0c;也会向一些自媒体平台分享自己拍摄的视频。如今人们对视频画质的要求不断提高&#xff0c;因此大多数下载…

武汉凯迪正大—断路器特性测试仪

一、凯迪正大高压开关测试仪产品概述 KDGK-F 断路器机械特性测试仪可用于各电压等级的真空、六氟化硫、少油、多油等电力系统高压开关的机械特性参数测试与测量。测量数据稳定&#xff0c;抗干扰性强&#xff0c;可在500KV等级及以下电站做实验&#xff0c;接线方便&#xff0…

知识图谱(2)词汇挖掘与实体识别

实体是指文本中的词汇或者短语&#xff08;比如"中药"&#xff09;&#xff0c;但不是所有词汇都是实体&#xff08;比如"新鲜的"&#xff09;&#xff0c;因此&#xff0c;从非结构化的文本构建知识图谱涉及两个基本步骤&#xff1a; 挖掘尽可能多的高质…

初次安装Pytorch过程

第一次安装Pytorch&#xff0c;刚开始安装的时候装错了CUDA的版本号 这里最高支持12.2.138&#xff0c; 但是我装了一个12.2.140的CUDA&#xff0c;导致不兼容我在测试时发现 import torch# if torch.cuda.is_available(): # print("GPU可用") # else: # p…

Xilinx FPGA 超温关机保护

在UG480文档&#xff0c;有关于FPGA芯片热管理的介绍。 首先需要理解XADC中的 Over Temperature&#xff08;OT&#xff09;和User Temperature的关系。片上温度测量用于关键温度警告&#xff0c;也支持自动关机&#xff0c;以防止设备被永久损坏。片上温度测量在预配置和自动关…

python flask框架 debug功能

从今天开始&#xff0c;准备整理一些基础知识&#xff0c;分享给需要的人吧 先整理个flask的debug功能&#xff0c;首先列举一下debug加与不加的区别&#xff0c;然后再上代码和图看看差异 区别&#xff1a; &#xff08;1&#xff09;加了debug后&#xff0c;修改js&#xf…

【电源专题】接地的类型

在工作和生活中我们往往都会碰到接地的概念,随着社会的发展不同的时期接地有着不同的意义。 其中包括从安全方面看,有物理接地和电气接地,物理接地是为了防雷连接到大地并提供浪涌电流路径,电气接地是为了保护人身安全而将电气设备外壳接接地的一种操作。从参考电位上看,…

异步编程 - 03 线程池ThreadPoolExecutor原理剖析源码详解

文章目录 线程池ThreadPoolExecutor原理剖析线程池类图结构成员变量ctl线程池的主要状态线程池的参数提交任务到线程池原理解析public void execute(Runnable command)public Future<?>submit(Runnable task)public Future submit(Runnable task&#xff0c;T result) 线…

【精品】商品规格 数据库表 设计

特点 同一类商品的规格项分组一样同一类商品的规格项一样不同商品的规格值是不同的 规格参数规格组规格项&#xff1a;规格值本博客对应的SQL文件下载地址&#xff1a;https://download.csdn.net/download/lianghecai52171314/88306884 方案一 数据库设计 查询17号商品的规…

PY32F003F18的中断线

PY32F003F18的中断线有30条&#xff0c;其中Line20~Line28保留不用&#xff0c;见下图&#xff1a; Line0~Line8需要配置&#xff0c;选择引脚和中断线连接&#xff0c;和GPIO引脚对应&#xff0c;Line9~Line15和PA9~PA15一一对应。见下图&#xff1a; 外部中断选择寄存器1(EXT…

国际慈善日 | 追寻大爱无疆,拓世科技集团的公益之路

每年的9月5日&#xff0c;是联合国大会正式选定的国际慈善日。这一天的设立&#xff0c;旨在通过提高公众对慈善活动的意识&#xff0c;鼓励慈善公益活动通过各种形式在全球范围内得到增强和发展。这是一个向慈善公益事业致敬的日子&#xff0c;同时也是呼吁全球团结一致共同发…

Swagger简介

一.导语&#xff1a; 相信无论是前端还是后端开发&#xff0c;都或多或少地被接口文档折磨过。前端经常抱怨后端给的接口文档与实际情况不一致。后端又觉得编写及维护接口文档会耗费不少精力&#xff0c;经常来不及更新。其实无论是前端调用后端&#xff0c;还是后端调用后端&…

尼尔森IQ :2023年中国商业养老服务供需洞察白皮书

核心观点 随着我国人口年龄结构发生巨大转变&#xff0c;老龄化问题成为未来较长时间内持续面临的挑战&#xff0c;积极应对老龄化已上升为国家战略&#xff0c;有效的、高质量的养老服务体系亟待建设。本章通过人口数据揭示我国老龄化进程&#xff0c;总结围绕养老领域出台的…

“银河护卫队总部”放大招!Milvus 核心组件再升级,主打就是一个低延迟、高准确度

熟悉我们的朋友都知道&#xff0c;在 Milvus 和 Zilliz Cloud 中&#xff0c;有一个至关重要的组件——Knowhere。 Knowhere 是什么&#xff1f;如果把向量数据库整体看作漫威银河护卫队宇宙&#xff0c;那么 Knowhere 就是名副其实的总部&#xff0c;它的主要功能是对向量精确…

linux 进程管理命令

进程管理命令 查看进程命令 ps命令 显示系统上运行的进程列表 # 查看系统中所有正在运行的系统ps aux# 获取占用内存资源最多的10个进程&#xff0c;可以使用如下命令组合&#xff1a;ps aux|head -1;ps aux|grep -v PID|sort -rn -k 4|head# 获取占用CPU资源最多的10个进程&am…

成本控制与电子元器件采购:实现效益的关键因素

成本控制在电子元器件采购中至关重要&#xff0c;它直接影响了组织的盈利能力和竞争力。以下是实现成本效益的关键因素&#xff1a; 供应商谈判&#xff1a; 成本控制的第一步是与供应商进行有效的谈判。这包括谈判价格、交货条件、质量标准和其他合同条款。有时长期合同可以帮…

【数学建模竞赛】优化类赛题常用算法解析

优化类建模 问题理解和建模&#xff1a;首先&#xff0c;需要深入理解问题&#xff0c;并将问题抽象为数学模型。这包括确定问题的目标函数、约束条件和决策变量。 模型分析和求解方法选择&#xff1a;对建立的数学模型进行分析&#xff0c;可以使用数学工具和方法&#xff0c;…

绘制钻头芯厚变化图

import numpy as np import matplotlib.pyplot as plt posnp.array([0.05,0.5,0.97,3]) data_m1np.array([0.088,0.093,0.098,0.116]) data_m2data_m1-0.01 data_m3data_m1-0.02 fig plt.figure(figsize(5, 4)) plt.rcParams[xtick.direction] in # 将x周的刻度线方向设置向…