Flink算子
一、基础转换算子最常用这类算子用于对数据流进行基础的格式转换、过滤、映射是处理数据的第一步。1. map一对一转换作用将数据流中的每个元素转换为另一个元素输入 1 个输出 1 个。场景字段提取、格式转换如字符串转对象。java运行// 示例提取订单金额并转为Double类型 DataStreamDouble amountStream orderStream .map(line - { // 按逗号拆分每行数据 String[] fields line.split(,); // 提取第3个字段金额并转为Double return Double.parseDouble(fields[2]); }); // 输出100.0, 200.0, 150.0, 300.02. flatMap一对多转换作用将一个元素转换为 0 个、1 个或多个元素输入 1 个输出多个。场景数据拆分如一行拆多行、脏数据过滤。java运行// 示例拆分订单信息为Tuple2用户ID, 金额并过滤支付失败的订单 DataStreamTuple2String, Double userAmountStream orderStream .flatMap(new FlatMapFunctionString, Tuple2String, Double() { Override public void flatMap(String line, CollectorTuple2String, Double out) throws Exception { String[] fields line.split(,); String userId fields[1]; double amount Double.parseDouble(fields[2]); String status fields[3]; // 只保留支付成功的订单 if (pay_success.equals(status)) { out.collect(Tuple2.of(userId, amount)); } } }); // 输出(user_01,100.0), (user_01,150.0), (user_03,300.0)3. filter数据过滤作用根据条件筛选出符合要求的元素。场景脏数据过滤、业务规则筛选如只保留大额订单。java运行// 示例过滤出金额大于200的成功订单 DataStreamString highAmountStream orderStream .filter(line - { String[] fields line.split(,); double amount Double.parseDouble(fields[2]); String status fields[3]; // 条件支付成功 且 金额200 return pay_success.equals(status) amount 200; }); // 输出order_004,user_03,300,pay_success二、聚合算子核心统计聚合算子需结合keyBy使用先分组再聚合是实时统计的核心。1. keyBy数据分组作用按指定字段将数据流分组类似 SQL 的 GROUP BY是聚合的前提。注意keyBy返回KeyedStream只能在 KeyedStream 上执行聚合。java运行// 示例按用户ID分组 KeyedStreamTuple2String, Double, String keyedStream userAmountStream // 按Tuple2的第一个字段用户ID分组 .keyBy(tuple - tuple.f0);2. sum/avg/max/min基础聚合作用对分组后的数据进行求和、平均值、最大值、最小值计算。场景实时统计用户累计消费、订单最大金额等。java运行// 示例统计每个用户的累计消费金额 DataStreamTuple2String, Double sumStream keyedStream // 对Tuple2的第二个字段金额求和 .sum(1); // 输出(user_01,100.0) → (user_01,250.0) → (user_03,300.0) // 示例统计每个用户的平均消费金额 DataStreamTuple2String, Double avgStream keyedStream .avg(1); // 输出(user_01,100.0) → (user_01,125.0) → (user_03,300.0)3. reduce自定义聚合作用自定义聚合逻辑比 sum/avg 更灵活支持增量聚合。场景复杂统计如累计金额 订单数。java运行// 示例统计每个用户的累计金额和订单数Tuple3用户ID, 累计金额, 订单数 DataStreamTuple3String, Double, Integer reduceStream keyedStream .reduce((t1, t2) - { // t1历史聚合结果t2新到来的元素 String userId t1.f0; double totalAmount t1.f1 t2.f1; // 累计金额 int orderCount 1 (t1.f2 null ? 0 : t1.f2); // 订单数 return Tuple3.of(userId, totalAmount, orderCount); }, () - Tuple3.of(, 0.0, 0)); // 初始值 // 输出(user_01,100.0,1) → (user_01,250.0,2) → (user_03,300.0,1)三、窗口算子实时统计核心Flink 是流式计算窗口用于将无限流切分为有限的 “批次” 进行统计结合keyBy使用。1. 滚动窗口Tumbling Window作用窗口大小固定无重叠如每 5 分钟统计一次。场景固定周期统计如每小时用户消费总额。java运行// 示例5秒滚动窗口统计每个用户的消费总额 DataStreamTuple2String, Double tumblingWindowStream keyedStream // 5秒滚动窗口 .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 对金额求和 .sum(1);2. 滑动窗口Sliding Window作用窗口大小固定有重叠如每 2 分钟统计最近 5 分钟的数据。场景高频统计如实时监控每 10 秒统计最近 1 分钟的订单量。java运行// 示例滑动窗口窗口5秒滑动2秒 DataStreamTuple2String, Double slidingWindowStream keyedStream .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2))) .sum(1);3. 会话窗口Session Window作用按用户会话划分窗口如用户连续操作 30 秒内为一个会话。场景用户行为分析如统计用户一次会话内的消费金额。java运行// 示例会话窗口超时时间3秒无操作3秒则窗口关闭 DataStreamTuple2String, Double sessionWindowStream keyedStream .window(ProcessingTimeSessionWindows.withGap(Time.seconds(3))) .sum(1);四、连接 / 拆分算子1. union合并同类型数据流作用将多个同类型的数据流合并为一个字段结构必须完全一致。场景合并多来源的同类型数据如多个省份的订单流。java运行// 模拟第二个订单流 DataStreamTuple2String, Double orderStream2 env.fromElements( Tuple2.of(user_02, 180.0), Tuple2.of(user_03, 50.0) ); // 合并两个数据流 DataStreamTuple2String, Double unionStream userAmountStream.union(orderStream2);2. connect连接不同类型数据流作用连接两个不同类型的数据流支持自定义处理逻辑。场景关联补充数据如订单流 用户信息流。java运行// 模拟用户信息流用户ID, 用户名 DataStreamTuple2String, String userInfoStream env.fromElements( Tuple2.of(user_01, 张三), Tuple2.of(user_02, 李四) ); // 连接订单流和用户信息流 ConnectedStreamsTuple2String, Double, Tuple2String, String connectedStreams userAmountStream.connect(userInfoStream) // 按用户ID分组两个流的关联键 .keyBy(t1 - t1.f0, t2 - t2.f0); // 处理连接后的数据流关联用户名和消费金额 DataStreamString resultStream connectedStreams .map(new CoMapFunctionTuple2String, Double, Tuple2String, String, String() { Override public String map1(Tuple2String, Double order) throws Exception { // 处理订单流暂时无用户名先返回默认值 return order.f0 ,未知用户, order.f1; } Override public String map2(Tuple2String, String user) throws Exception { // 处理用户信息流暂时无金额先返回默认值 return user.f0 , user.f1 ,0.0; } });总结基础转换map一对一、flatMap一对多、filter过滤是数据预处理的核心几乎所有 Flink 任务都会用到聚合统计先keyBy分组再用sum/avg/reduce做聚合是实时统计的基础窗口核心滚动窗口无重叠、滑动窗口有重叠、会话窗口按会话是流式统计的关键需结合业务场景选择。
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2431347.html
如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!