2.生成Transformation

news2025/7/19 8:50:49

目录

前言

Source

FlatMap

KeyBy

sum

总结


前言

以下面的WordCount为例

package com.wlh.p1;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        //
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("localhost", 7777)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] split = s.split(" ");
                        for (String word : split) {
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2.f0;
                    }
                })
                .sum(1)
                .print();

        //
        env.execute();

    }
}

上面是一个简单的WordCount程序,理解Flink之前,需要理解好Flink中的一些核心抽象概念

如下图所示,主要为3个:

(1)Transformation

(2)StreamOperator

(3)User-Defined Function

Transformation指的是在DateStream之间转换的操作,比如上面WordCount例子中的flatMap,它其实就对应着一个Transformation,表示从某个DataStream转换为另一个DataStream对应的Transformation。

以WordCount为例,先看一下对应的transformations,上述任务对应的transformations是一个list,list包含3个元素,但是元素对应的transformation的id是2/4/5。具体这些transformation是如何产生的是本文的重点。

Source

我们根据代码一步步跟进看一下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

当获取到流式执行环境后(如果在本地获取的是Local的执行环境),StreamExecutionEnvironment中会存在一个成员变量transformations,初始化为空集合。

env.socketTextStream("localhost", 7777)

跟进代码后,

可以看到new SocketTextStreamFunction

这就是上面说的User-Defined Function,跟进该方法,可以看到确实是继承了Function接口

跟进addSource(

        new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream");

这里有一个比较关键的参数Boundedness.CONTINUOUS_UNBOUNDED

表示该source是一个无界流,事实也是如此,socket流当然是无界的。

跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

注意TypeInformation<OUT> resolvedTypeInfo =

        getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);

TypeInformation是Flink中类型系统的核心类,该方法的内部逻辑是通过java提供的Type系统来提取的。debug跟进一下类型的提取

baseClass是SourceFunction;clazz是具体的实现类SocketTextStreamFunction。通过new TypeExtractor()对象提取该function具体的输出类型信息。

跟进new TypeExtractor()

        .privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);查看具体的类型提取代码,通过反射获取,关于反射获取具体的类型,自行学习了解。

获取到SocketTextStreamFunction的输出类型后,继续跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

刚刚已经获取输出类型之后,继续跟进后续代码

boolean isParallel = function instanceof ParallelSourceFunction; // 判断该source是不是可以并行的source,很明显这里的isParallel是false

clean(function); // 该行在做闭包清理/检查。如果不通过会报类似异常"Object " + obj + " is not serializable"

final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); 

通过StreamSource的继承关系可以看出,StreamSource其实是开头提到的二号人物StreamOperator,至此,User-Defined Function和StreamOperator都已经出现了,并且它们的关系是StreamOperator中包含User-Defined Function,和开头图示一致。

return new DataStreamSource<>(

        this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);

最终,构建了Transformation,并且将operator传入,至此Transformation、StreamOperator、User-Defined Function都已经出现在视野中。

注意:在看DataStreamSource的继承关系时,会看到它继承自SingleOutputStreamOperator,这里是我觉得Flink命名不太好的地方,SingleOutputStreamOperator会被误认为是StreamOperator,但其实不是,SingleOutputStreamOperator是继承自DataStream的,并且在注释中明确说明SingleOutputStreamOperator是transformation。

跟进DataStream,会看到DataStream中封装了environment环境和通过env.socketTextStream("localhost", 7777)定义的第一个transformation。后面的api操作,如flatMap等,都是基于该DataStream进行操作了。

FlatMap

在上面DataStream的基础上将后续的api都介绍一下,跟进.flatMap

形参flatMapper,即为用户在编程时自定义的function,代码逻辑很清晰,依然是先获取输出的类型信息

跟进return flatMap(flatMapper, outType);

StreamFlatMap是StreamOperator的子类

跟进return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

跟进return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));

首先new OneInputTransformation,创建一个新的Transformation,在构建的时候,传入了this.transformation,就是上面的LegacySourceTransformation。

new SingleOutputStreamOperator(environment, resultTransform); 创建一个新的DataStream,作为这一步API操作的返回值。

跟进getExecutionEnvironment().addOperator(resultTransform);

将transformation添加进env环境的transformations集合中,这个集合在未来会遍历生成StreamGraph。

KeyBy

在上面DataStream的基础上将后续的api都介绍一下,跟进.keyBy

直接new KeyedStream,跟进

获取key的类型信息,继续跟进构造方法

new PartitionTransformation创建了一个Transformation,将当前Datastream的Transformation作为PartitionTransformation的输入,并且将用户自定义的keySelector封装进KeyGroupStreamPartitioner。

继续跟进后,由于KeyedStream继承自DataStream,同样的,将env和当前的transformation封装进去。

至此KeyedStream构建完成,它的内容如下,

sum

在上面KeyedStream的基础上,继续跟进代码

 SumAggregator<T> extends AggregationFunction<T> 

SumAggerator就是User-Defined Function

跟进return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));

方法的签名为AggeragationFunction

跟进return reduce(aggregate).name("Keyed Aggregation");

这里出现了transformation

getExecutionEnvironment().addOperator(reduce); 这一行代码在之前说过,将该transformation加入到env的transformations中。

这里重点来看一下ReduceTransformation

构造方法如下:包含了聚合算子必须的一些信息,reducer是聚合的函数,input是之前的transformation,keySelector是分组key的提取方式。

值得注意的一行代码updateManagedMemoryStateBackendUseCase(true);

这里是在设置状态后端,这里第一次提到了状态的概念,状态是Flink得以流行的重要原因之一,有状态的流式计算。

print

这里是WordCount例子中的最后一步了,和前面的算子都非常类似,看到这里应该是可以举一反三了。

熟悉的味道,创建Function

创建StreamOperator

在创建DateStream时,创建了transformation

总结

本文介绍了Flink是如何将用户的api转换为Transformation,这是Flink的核心抽象,DataStream是面向用户的,Transformation并不面向用户,在Flink触发执行时,transformation会被Flink转换为更贴近底层执行的各种有向无环图,即常说的DAG。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2259288.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PCL点云库入门——PCL库可视化之CloudViewer类简单点云信息显示

1、前言 可视化&#xff08;visualization&#xff09;涉及运用计算机图形学和图像处理技术&#xff0c;将数据转换成图像并在屏幕上展示&#xff0c;同时支持交互式处理。在PCL库中&#xff0c;一系列强大的可视化工具可供使用&#xff0c;其中较为流行的包括CloudViewer和PCL…

AI大模型学习笔记|多目标算法梳理、举例

多目标算法学习内容推荐&#xff1a; 1.通俗易懂讲算法-多目标优化-NSGA-II(附代码讲解)_哔哩哔哩_bilibili 2.多目标优化 (python pyomo pareto 最优)_哔哩哔哩_bilibili 学习笔记&#xff1a; 通过网盘分享的文件&#xff1a;多目标算法学习笔记 链接: https://pan.baidu.com…

印闪网络:阿里云数据库MongoDB版助力金融科技出海企业降本增效

客户背景 上海印闪网络科技有限公司&#xff0c;于2017年1月成立&#xff0c;投资方包括红杉资本等多家国际知名风投公司。公司业务聚焦东南亚普惠金融&#xff0c;常年稳居行业头部。创始团队来自腾讯&#xff0c;中国团队主要由运营、风控及产研人员组成&#xff0c;核心成员…

路由引入问题(双点双向路由回馈问题)

简介 总所周知&#xff0c;路由引入import又称路由重分发redistribute&#xff0c;为了解决不同路由协议进程间路由信息不互通而使用的技术&#xff0c;由于不同路由协议的算法、机制、开销等因素的差异&#xff0c;它们之间无法直接交换路由信息。因此&#xff0c;路由引入技…

windows安装gradle

目录 1. gradle的简介2. 安装操作2.1 下载2.2 配置环境变量2.3 测试验证 3. 总结 1. gradle的简介 Gradle 是一个开源的项目自动化构建工具&#xff0c;专注于灵活性和性能。它基于 Apache Ant 和 Apache Maven 的概念&#xff0c;但采用了 Groovy 或 Kotlin 作为领域特定语言…

数据库中的代数运算

这些代数基本运算通常被封装在数据库查询语言中&#xff0c;如SQL中的SELECT、FROM、WHERE等子句&#xff0c;使得用户可以更方便地对数据库进行查询和处理。 下面的介绍基于以下两个关系来说明&#xff1a; 传统的集合运算 并&#xff08;∪&#xff09; 合并两个关系中的元组…

Linux驱动开发(12):中断子系统–按键中断实验

本章我们以按键为例讲解在驱动程序中如何使用中断&#xff0c; 在学习本章之前建议先回顾一下关于中断相关的裸机部分相关章节&#xff0c; 这里主要介绍在驱动中如何使用中断&#xff0c;对于中断的概念及GIC中断控制器相关内容不再进行讲解。 本章配套源码和设备树插件位于“…

智能家居WTR096-16S录放音芯片方案,实现语音播报提示及录音留言功能

前言&#xff1a; 在当今社会的高速运转之下&#xff0c;夜幕低垂之时&#xff0c;许多辛勤工作的父母尚未归家。对于肩负家庭责任的他们而言&#xff0c;确保孩童按时用餐与居家安全成为心头大事。此时&#xff0c;家居留言录音提示功能应运而生&#xff0c;恰似家中的一位无形…

【Qt】信号、槽

目录 一、信号和槽的基本概念 二、connect函数&#xff1a;关联信号和槽 三、自定义信号和槽 1.自定义槽函数 2.自定义信号函数 例子&#xff1a; 四、带参的信号和槽 例子&#xff1a; 五、Q_OBJECT宏 六、断开信号和槽的连接 例子&#xff1a; 一、信号和槽的基本…

Zemax 中的 LED 阵列模型

LED 阵列的光学特性 LED 阵列由多个发光二极管 &#xff08;LED&#xff09; 组成&#xff0c;这些二极管以特定模式或配置排列&#xff0c;以实现均匀照明、更高强度或特定照明特性。这些阵列广泛用于显示器、照明系统、光通信和传感等应用。 LED 阵列的光学特性对于了解它如…

Qt编写区位码gb2312、机内码、国标码————附带详细介绍和编码实现

文章目录 0 背景1 了解编码1.1 ASCII码1.2 机内码、国标码、区位码1.2.1 区位码1.2.2 国标码&#xff08;GB 2312-80&#xff09;1.2.3 汉字机内码&#xff08;GB 2312&#xff09; 1.3 GBK和GB2312的区别2 编码实现2.1 QString数据转QByteArray类型2.1.1 使用QTextCodec2.1.2 …

【Linux系统】—— 权限的概念

【Linux系统】—— 权限的概念 1 权限1.1 什么是权限1.2 为什么要有权限1.3 理解权限 2 文件的权限2.1 文件角色2.2 文件权限2.3 修改文件权限2.3.1 修改目标属性2.3.1.1 字符修改法2.3.1.2 8进制修改法 2.3.2 修改角色 3 文件权限补充知识点3.1 只能修改自己的文件权限3.2 没有…

js:我要在template中v-for循环遍历这个centrerTopdata,我希望自循环前面三个就可以了怎么写

问&#xff1a; 我按在要在template中v-for循环遍历这个centrerTopdata&#xff0c;我希望自循环前面三个就可以了怎么写&#xff1f; 回答&#xff1a; 问&#xff1a; <div v-for"(item, index) in centrerTopdata.slice(0, 3)" :key"index"> d…

016 在路由器上配置 DHCP

配置路由器端口IP地址 将路由器的端口地址配置好&#xff0c; 左边的网络地址是 192.168.1.0 右边的网络地址是 192.168.2.0 配置路由器的DHCP服务 打开命令窗口&#xff0c;进入特权模式 进入全局配置 conf t创建一个DHCP地址池&#xff1b; po1 是地址池的名称&#xf…

使用IP自签名SSL证书

最近需要创建WebSocket服务器并使用SSL证书&#xff0c;由于是内网测试&#xff0c;所以需要使用指定IP的自签SSL证书。 其实笔者前面博文 使用nexus3作为Docker镜像仓库 解决nexus3登录x509: certificate has expired or is not yet valid 中有创建过相应的证书&#xff0c;这…

多模态大模型(二)——用Transformer Encoder和Decoder的方法(BLIP、CoCa、BEiTv3)

文章目录 BLIP: Bootstrapping Language-Image Pre-training for Unified Vision-Language Understanding and Generation 理解、生成我都要&#xff0c;一个很有效的、根据图片生成caption的工具1. BLIP的研究动机2. BLIP的模型结构3. CapFilt Model4. BLIP的训练过程 CoCa: C…

vue季度选择器(antd2.0 版本无此控件,单独写一个)

vue季度选择器 效果显示 效果显示 <template><div><a-popoverplacement"bottom"overlayClassName"season-picker"trigger"click"v-model"showSeason"><template #content><div class"season-picker-b…

基于Spring Boot + Vue的摄影师分享交流社区的设计与实现

博主介绍&#xff1a;java高级开发&#xff0c;从事互联网行业六年&#xff0c;熟悉各种主流语言&#xff0c;精通java、python、php、爬虫、web开发&#xff0c;已经做了多年的设计程序开发&#xff0c;开发过上千套设计程序&#xff0c;没有什么华丽的语言&#xff0c;只有实…

利用GeoWave导入矢量数据到HBase/Accumulo数据库

前言 最近在做有关地理时空大数据的实验&#xff0c;本文将介绍如何利用geowave框架&#xff0c;将矢量数据导入到HBase或Accumulo等NoSQL数据库中。 软件版本&#xff1a; Hadoop: 2.10.2 Zookeeper: 3.6.4 geowave: 1.2.0 Accumulo&#xff1a;1.9.3 HBase: 1.4.0 Ja…

常回家看看之Tcache Stashing Unlink Attack

前言&#xff1a; 在开始了解这个攻击手法的前提&#xff0c;需要先了解一个函数也就是calloc函数&#xff0c;众所周知&#xff0c;当libc版本大于等于2.27的时候会引入tcachebin&#xff0c;而Tcache Stashing Unlink Attack就是发生在2.27版本以上&#xff0c;那么这个和ca…