【Flink】基本转换算子使用之fliter、flatMap,键控流转换算子和分布式转换算子

news2025/8/9 13:02:56

文章目录

  • 一 Flink DataStream API
    • 1 基本转换算子的使用
      • (1)fliter
        • a 使用匿名类实现
        • b 使用外部类函数实现
        • b 使用flatMap实现
      • (2)flatMap
        • a 使用匿名类实现
        • b 使用匿名函数实现
    • 2 键控流转换算子
      • (1) keyBy
      • (2)滚动聚合
      • (3)reduce
      • (4)案例
    • 3 分布式转换算子
      • (1)Random
      • (2)Round-Robin
      • (3)Rescale
      • (4)Broadcast(常用)
      • (5)Global
      • (6)Custom

一 Flink DataStream API

1 基本转换算子的使用

基本转换算子的定义:作用在数据流中的每一条单独的数据上的算子。基本转换算子会针对流中的每一个单独的事件做处理,也就是说每一个输入数据会产生一个输出数据。单值转换,数据的分割,数据的过滤,都是基本转换操作的典型例子。

(1)fliter

在这里插入图片描述

a 使用匿名类实现

public class Example3 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.addSource(new ClickSource());

        stream.filter(r -> r.user.equals("Marry")).print();

        stream
                .filter(new FilterFunction<Event>() {
                    @Override
                    public boolean filter(Event event) throws Exception {
                        return event.user.equals("Marry");
                    }
                })
                .print();

        env.execute();
    }

b 使用外部类函数实现

stream
        .filter(new MyFilter())
        .print();
public static class MyFilter implements FilterFunction<Event>{
    @Override
    public boolean filter(Event event) throws Exception {
        return event.user.equals("Marry");
    }
}

b 使用flatMap实现

stream
        .flatMap(new FlatMapFunction<Event, Event>() {
            @Override
            public void flatMap(Event event, Collector<Event> collector) throws Exception {
                if(event.user.equals("Marry"))
                    collector.collect(event);
            }
        })
        .print();

输入一条元素,输出1个结果使用map,输出0 或 1 个结果使用filter,针对每一个条数据输出0 1 或者多个结果使用flatmap。

(2)flatMap

a 使用匿名类实现

public class Example4 {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 将white直接输出,将black复制,将gray过滤
        DataStreamSource<String> stream = env.fromElements("white", "black", "gray");

        // 当每一条数据进入到flatMap算子时,就会触发flatMap的调用
        // 在flink中程序只是定义了一个有向无环图,需要事件去驱动它的运行
        stream
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String s, Collector<String> collector) throws Exception {
                        if(s.equals("white")){
                            collector.collect(s);
                        }else if(s.equals("black")){
                            collector.collect(s);
                            collector.collect(s);
                        }
                    }
                })
                .print();

        env.execute();
    }
}

b 使用匿名函数实现

stream
        .flatMap((String s,Collector<String> collector) -> {
            if(s.equals("white")){
                collector.collect(s);
            }else if(s.equals("black")) {
                collector.collect(s);
                collector.collect(s);
            }
        })
        // Collector<String>会被擦除
        .returns(Types.STRING)
        .print();

2 键控流转换算子

很多流处理程序的一个基本要求就是要能对数据进行分组,分组后的数据共享某一个相同的属性。DataStream API 提供了一个叫做 KeyedStream 的抽象,此抽象会从逻辑上对 DataStream 进行分区,分区后的数据拥有同样的 Key 值,分区后的流互不相关。

针对 KeyedStream 的状态转换操作可以读取数据或者写入数据到当前事件 Key 所对应的状态中。这表明拥有同样 Key 的所有事件都可以访问同样的状态,也就是说所以这些事件可以一起处理。KeyedStream 可以使用 map,flatMap 和 filter 算子来处理。

在这里插入图片描述

DataStream KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同key的元素,在内部以hash的形式实现的。从逻辑上分区去对这些数据进行处理,物理上的位置无关紧要。不过最终同一个Key中的数据一定在一个任务槽中,这样会出现数据倾斜的问题。

(1) keyBy

keyBy 通过指定 key 来将 DataStream 转换成 KeyedStream。基于不同的 key,流中的事件将被分配到不同的分区中去。所有具有相同 key 的事件将会在接下来的操作符的同一个子任务槽中进行处理。拥有不同 key 的事件可以在同一个任务中处理。但是算子只能访问当前事件的 key 所对应的状态。keyBy() 方法接收一个参数,这个参数指定了 key 或者 keys,有很多不同的方法来指定 key。

如之前使用的匿名类方式,针对每一条数据指定key。

	KeyedStream<WordWithCount, String> keyedStream = mappedStream
        // 第一个泛型:流中元素的泛型
        // 第二个泛型:key的泛型
        .keyBy(new KeySelector<WordWithCount, String>() {
            public String getKey(WordWithCount value) throws Exception {
                return value.word;
            }
        });

只要存在分组,就一定存在聚合,所以提出了滚动聚合的概念。

(2)滚动聚合

滚动聚合算子由 KeyedStream 调用,并生成一个聚合以后的 DataStream,例如:sum,minimum,maximum。一个滚动聚合算子会为每一个观察到的 key 保存一个聚合的值。针对每一个输入事件,算子将会更新保存的聚合结果,并发送一个带有更新后的值的事件到下游算子。滚动聚合不需要用户自定义函数,但需要接受一个参数,这个参数指定了在哪一个字段上面做聚合操作。DataStream API 提供了以下滚动聚合方法。

  • sum():在输入流上对指定的字段做滚动相加操作。
  • min():在输入流上对指定的字段求最小值。
  • max():在输入流上对指定的字段求最大值。
  • minBy():在输入流上针对指定字段求最小值,并返回包含当前观察到的最小值的事件。
  • maxBy():在输入流上针对指定字段求最大值,并返回包含当前观察到的最大值的事件。

滚动聚合算子无法组合起来使用,每次计算只能使用一个单独的滚动聚合算子。

如以下例子按照key进行分组并聚合:

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    DataStreamSource<Tuple2<Integer, Integer>> stream = env
            .fromElements(
                    Tuple2.of(1, 3),
                    Tuple2.of(1, 4),
                    Tuple2.of(2, 3)
            );

    // 逻辑上进行分流
    KeyedStream<Tuple2<Integer, Integer>, Integer> keyedStream = stream.keyBy(r -> r.f0);

    // 针对第一个位置进行聚合
    keyedStream.sum(1).print();

    env.execute();
}

输出结果也同样体现出滚动聚合的效果:

(1,3)
(1,7)
(2,3)

(3)reduce

reduce 算子是滚动聚合的泛化实现。它将一个 ReduceFunction 应用到了一个 KeyedStream 上面去。reduce 算子将会把每一个输入事件和当前已经 reduce 出来的值做聚合计算。reduce 操作不会改变流的事件类型。输出流数据类型和输入流数据类型是一样的。

reduce 函数可以通过实现接口 ReduceFunction 来创建一个类。ReduceFunction 接口定义了 reduce() 方法,此方法接收两个输入事件,输出一个相同类型的事件。

如下同样可以实现sum的功能。

keyedStream
        .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
    @Override
    public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
        return Tuple2.of(value1.f0,value1.f1 + value2.f1);
    }
})
        .print();

(4)案例

求整数的平均值。

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .addSource(new SourceFunction<Integer>() {
                private boolean running = true;
                private Random random = new Random();
                @Override
                public void run(SourceContext<Integer> ctx) throws Exception {
                    while (running){
                        ctx.collect(random.nextInt(10));
                        Thread.sleep(100);
                    }
                }

                @Override
                public void cancel() {
                    running = false;
                }
            })
            .map(r -> Tuple2.of(r,1))
            .returns(Types.TUPLE(Types.INT,Types.INT))
            // reduce必须在keyBy之后使用
            // 如果想在一条流上直接使用滚动聚合
            // 将所有数据shuffle到同一个逻辑分区
            .keyBy(r -> true)
            .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() {
                @Override
                public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception {
                    return Tuple2.of(value1.f0 + value2.f0,value1.f1 + value2.f1);
                }
            })
            .map(new MapFunction<Tuple2<Integer, Integer>, Object>() {
                @Override
                public Object map(Tuple2<Integer, Integer> value) throws Exception {
                    return (double) value.f0 / value.f1;
                }
            })
            .print();

    env.execute();
}

总结:滚动聚合的要点在于每一个Key都有自己的累加器(状态变量),一条数据来到处理完成之后就丢弃了,向下游发送的数据是累加器中的数据,这样就不需要将所有的数据都保存下来,节省内存空间,性能高于批处理。

scala中为什么会出现伪递归:在纯正的函数式编程中是没有循环的,那么如何实现循环的功能呢?使用递归!那么使用递归又带来了一个问题,递归的栈会超过内存,造成内存溢出Stack Overflow,那么伪递归用新来的栈去覆盖原有的栈,栈的深度不变,所以可以使用伪递归来模拟循环,伪递归当中有累加器的存在。

3 分布式转换算子

分区操作对应于之前的“数据交换策略”。这些操作定义了事件如何分配到不同的任务中去。当使用 DataStream API 来编写程序时,系统将自动的选择数据分区策略,然后根据操作符的语义和设置的并行度将数据路由到正确的地方去。

有些时候,当需要在应用程序的层面控制分区策略,或者自定义分区策略时。例如,如果我们知道会发生数据倾斜,那么想要针对数据流做负载均衡,将数据流平均发送到接下来的操作符中去。又或者,应用程序的业务逻辑可能需要一个算子所有的并行任务都需要接收同样的数据。再或者,需要自定义分区策略的时候。

keyBy() 方法不同于分布式转换算子。所有的分布式转换算子将产生 DataStream 数据类型。而 keyBy() 产生的类型是 KeyedStream,它拥有自己的 keyed state。

综上,分布式转换算子可以对数据进行物理分区,也就是说可以将数据分配到不同的任务槽中。

(1)Random

Random随机数据交换由 DataStream.shuffle() 方法实现。shuffle 方法将数据随机的分配到下游算子的并行任务中去,可以将数据分配到不同的任务槽中。

	env
                .fromElements(1,2,3,4).setParallelism(1)
                .shuffle()
                .print("shuffle: ").setParallelism(2);

运行结果如下:第一任务槽中数据为1和3,第二个任务槽中数据为2和4。

shuffle: :1> 1
shuffle: :1> 3
shuffle: :2> 4
shuffle: :2> 2

(2)Round-Robin

rebalance() 方法使用 Round-Robin 负载均衡算法将输入流平均分配到随后的并行运行的任务中去。

	env
                .fromElements(1,2,3,4).setParallelism(1)
                .rebalance()
                .print("rebanlance: ").setParallelism(2);

(3)Rescale

rescale()方法使用的也是round-robin算法,但只会将数据发送到接下来的并行运行的任务中的一部分任务中。本质上,当发送者任务数量和接收者任务数量不一样时,rescale分区策略提供了一种轻量级的负载均衡策略。如果接收者任务的数量是发送者任务的数量的倍数时,rescale 操作将会效率更高。

rebalance() 和 rescale() 的根本区别在于任务之间连接的机制不同。rebalance() 将会针对所有发送者任务和所有接收者任务之间建立通信通道,而 rescale() 仅仅针对每一个任务和下游算子的一部分子并行任务之间建立通信通道。

两者的示意图如下:

在这里插入图片描述

(4)Broadcast(常用)

broadcast() 方法将输入流的所有数据复制并发送到下游算子的所有并行任务中去。

	env
        .fromElements(1,2,3,4).setParallelism(1)
        .broadcast()
        .print("broadcast: ").setParallelism(2);

(5)Global

global() 方法将所有的输入流数据都发送到下游算子的第一个并行任务中去。这个操作需要很谨慎,因为将所有数据发送到同一个 task,将会对应用程序造成很大的压力。

(6)Custom

当 Flink 提供的分区策略都不适用时,我们可以使用 partitionCustom() 方法来自定义分区策略。这个方法接收一个 Partitioner 对象,这个对象需要实现分区逻辑以及定义针对流的哪一个字段或者 key 来进行分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/34101.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mysql InnoDB存储引擎的锁机制

目录 前言 1. 锁的分类 1.1 实现方式 1.2 锁的粒度 2. 查询操作加锁方式 2.1 一致性非锁定读 2.2 一致性锁定读 3. 锁的算法 4. 锁的升级 5. 死锁 6.总结 前言 锁机制的目的是最大程度提高数据库的并发访问&#xff0c;另一方面确保可以以一致的方式读取和修改数据。…

Camunda工作流引擎简记

本文转载自玩转Camunda之实战篇-赶紧收藏起来吧_哔哩哔哩_bilibili 其中部分内容&#xff0c;经过本人修改 一、工作流相关介绍 BPM(BusinessProcessManagement)&#xff0c;业务流程管理是一种管理原则&#xff0c;通常也可以代指BPMS(BusinessProcessManagementSuite)&#…

火山引擎 DataTester 应用故事:一个A/B测试,将产品DAU提升了数十万

更多技术交流、求职机会&#xff0c;欢迎关注字节跳动数据平台微信公众号&#xff0c;回复【1】进入官方交流群 疫情让线下的需求大量转移到线上&#xff0c;催生出了远程办公、网络授课、线上健身等新的生态现象。如何更好地为用户服务&#xff0c;提升用户体验&#xff0c;成…

计算机的组成

文章目录五大部件1) 输入设备2) 存储器3) CPU&#xff08;中央处理器&#xff09;4) 输出设备五大部件 所有类型的计算机&#xff0c;其本质都是接收用户输入的原始数据&#xff0c;并将其加工、处理成对用户有用的数据&#xff0c;它们都支持执行如表 1 所示的 5 项基本操作。…

SpringCloud Sentinel 使用

哈喽~大家好&#xff0c;SpringCloud Sentinel 使用。 &#x1f947;个人主页&#xff1a;个人主页​​​​​ &#x1f948; 系列专栏&#xff1a;【微服务】 &#x1f949;推荐专栏&#xff1a; JavaEE框架 目录 一、前言 1、什么是Sentinel&…

时间复杂度和空间复杂度【一学就会】

目录 &#x1f947;1.算法效率 &#x1f50e;2.时间复杂度 &#x1f4d7;2.1 大O渐进表示法 &#x1f4d8;2.2 时间复杂度的练习&#xff08;没有说明即最坏情况&#xff09; &#x1f511;3.空间复杂度 &#x1f308;如何评价一个代码呢&#xff1f;它的效率高不高&#…

美创科技发布数据安全综合评估系统|推进安全评估高效开展

数字化深入的今天&#xff0c;数据价值和风险相伴相生&#xff0c;让数据要素发挥更大价值&#xff0c;提高风险预见预判&#xff0c;数据安全评估日益紧迫和必要。《数据安全法》提出&#xff1a;“重要数据处理者应对其数据处理活动定期开展风险评估&#xff0c;并向有关主管…

精彩回顾|关系网络赋能银行数字化转型的应用与实践

本文是根据11月10日Galaxybase图创课堂&#xff1a;乘金融科技之风&#xff1a;关系网络赋能银行数字化转型的应用与实践整理&#xff0c;错过的小伙伴们可以观看回放&#xff1a;https://uao.so/pct862806 精彩回顾 近年&#xff0c;知识图谱的重要性和实际应用逐步呈上升趋…

十大排序(总结+算法实现)

十大排序&#xff08;总结算法实现&#xff09; 十大排队的性能分析 冒泡排序 使用冒泡排序&#xff0c;时间复杂度为O(n^2),空间复杂度为O(1) 像气泡一样从低往上浮现 vector<int> bubbleSort(vector<int>nums) {int lengthnums.size();for(int i0;i<lengt…

PreScan快速入门到精通第三十七讲PreScan中使用深度摄像机

深度相机提供了一个 "相机"图像,其中包含深度值,而不是颜色。它提供的地面真实数据可用于校准或验证立体相机的深度计算。 对象配置 系统选项卡 变量 描述 …

antd——a-tree-select 树形选择控件 与 a-cascader 级联选择器 的对比——技能提升

在遇到 省市区多级联动数据的时候&#xff0c;经常会用到的就是 a-cascader级联选择器。 1.级联选择器的使用方法 1.1 需要指定数据源——options 数据结构是 对象数组嵌套——value/label/children <a-cascader :options"options" placeholder"Please sele…

SpringBoot3 正式发布:有哪些新玩法?

SpringBoot 3.0现已正式发布&#xff0c;可以在Maven Central中找到。 这是自四年前发布2.0以来的第一个主要版本。 它也是SpringBoot的第一个GA版本&#xff0c;提供了对Spring Framework 6.0和GraalVM的支持。 一文详解&#xff5c;从JDK8飞升到JDK17&#xff0c;再到未来…

Tesla M40 下Ubuntu anaconda pycharm安装

显卡&#xff1a;Tesla M40 24GB (2张&#xff09; 显卡驱动版本(推荐)&#xff1a;470.57.02 cuda版本&#xff1a;11.4 安装前需要&#xff01;&#xff01;&#xff01;&#xff01;&#xff01;禁用nouveau驱动&#xff01;&#xff01;&#xff01;&#xff01;&#x…

php中通用的excel导出方法实例

一.普遍导出方法 excel导出的方法网上有很多&#xff0c;在crm或是oa系统中导出excel是常有的事&#xff0c;做过的此功能人都知道&#xff0c;其主要操作其实是循环数据列表&#xff0c;然后一格一格地添加数据到固定的单元格中。只要做好了一次&#xff0c;其后只要复制相关…

外汇天眼:2022 年世界杯已经开始,但这将如何影响外汇交易?

关于 2022 年世界杯 2022年世界杯于2022年11月20日在卡塔尔拉开帷幕&#xff0c;将持续到2022年12月18日。2022年国际足联世界杯是由国际足联成员协会的男子国家队和第22届国际足联世界杯举办的国际足球锦标赛。这是第一次在阿拉伯世界举办的世界杯。 关于中东 尽管经历了疫情…

MCE | KRAS 突变型肺癌耐药性探索

KRAS 是一种致癌基因&#xff0c;编码 KRAS 蛋白 (一种 small GTPase 转导蛋白)。KRAS 通过结合 GTP/GDP 控制其活跃状态&#xff0c;进而控制其信号传递和下游级联反应。致癌性 KRAS 突变会破坏 GTPase 活性&#xff0c;进而使 KRAS 蛋白锁定在活跃状态下&#xff0c;从而使启…

【面试题】JS基础-异步

1. 异步 1.1 为什么要异步&#xff1f; JS是单线程语言&#xff0c;只能同时做一件事。JS和DOM渲染共用同一个线程&#xff0c;因为JS可修改DOM结构。当遇到等待的情况时&#xff0c;例如网络请求、定时任务&#xff0c;程序不能卡住。所以需要异步来解决JS单线程等待的问题&…

学完Spring框架回头再来看反射你会发现真的不一样

文章目录前言一.什么是反射&#xff1f;二.如何实现反射&#xff1f;2.1java.lang.Class2.2通过反射创建对象2.3通过反射获取类成员三.反射的性能四.反射是如何破坏单例模式的&#xff1f;五.如何避免单例模式被反射破坏&#xff1f;前言 你还记得你的Spring入门案例吗&#x…

查询利器—索引

目录 索引的优缺点 常见索引分类 MySQL数据操作的宏观过程 认识磁盘 正式理解索引结构 采用B树的原因 聚簇索引与非聚簇索引 索引操作 索引创建原则 索引的优缺点 优点&#xff1a;提高一个海量数据的检索速度 缺点&#xff1a;查询速度的提高是以插入、更新、删除…

pdf生成:wkhtmltopdf

wkhtmltopdf是开源&#xff08;LGPLv3&#xff09;命令行工具&#xff0c;使用Qt WebKit渲染引擎将HTML渲染为PDF和各种图像格式。这些完全以“headless”模式运行&#xff0c;不需要显示或显示服务wkhtmltoimage。建议&#xff1a; 不要将wkhtmltopdf与任何不受信任的HTML一起…