Flink 详解(六):源码篇 Ⅰ
- 55、Flink 作业的提交流程?
- 56、Flink 作业提交分为几种方式?
- 57、Flink JobGraph 是在什么时候生成的?
- 58、那在 JobGraph 提交集群之前都经历哪些过程?
- 59、看你提到 PipeExecutor,它有哪些实现类?
- 60、Local 提交模式有啥特点,怎么实现的?
- 61、远程提交模式都有哪些?
- 62、Standalone 模式简单介绍一下?
- 63、yarn 集群提交方式介绍一下?
- 64、yarn - session 模式特点?
- 65、yarn - per - job 模式特点?
- 66、yarn - application 模式特点?
- 67、yarn - session 提交流程详细介绍一下?
- 68、yarn - per - job 提交流程详细介绍一下?
55、Flink 作业的提交流程?
Flink 的提交流程:
- 在
Flink Client中,通过反射启动jar中的main函数,生成 Flink StreamGraph 和 JobGraph,将 JobGraph 提交给 Flink 集群。 - Flink 集群收到 JobGraph(
JobManager收到)后,将 JobGraph 翻译成 ExecutionGraph,然后开始调度,启动成功之后开始消费数据。
总结来说:Flink 核心执行流程,对用户 API 的调用可以转为 StreamGraph → JobGraph → ExecutionGraph。
56、Flink 作业提交分为几种方式?
Flink 的作业提交分为两种方式:
- Local 方式:即本地提交模式,直接在 IDEA 运行代码。
- 远程提交方式:分为
standalone方式、yarn方式、K8s方式。其中,yarn方式又分为三种提交模式:yarn-per-job模式、yarn-session模式、yarn-application模式。
57、Flink JobGraph 是在什么时候生成的?
StreamGraph、JobGraph 全部是在 Flink Client 客户端生成的,即提交集群之前生成,原理图如下:

58、那在 JobGraph 提交集群之前都经历哪些过程?
- 用户通过启动 Flink 集群,使用命令行提交作业,运行
flink run -c WordCount xxx.jar。 - 运行命令行后,会通过
run脚本调用CliFrontend入口,CliFrontend会触发用户提交的jar文件中的main方法,然后交给PipelineExecuteor的execute方法,最终根据提交的模式选择触发一个具体的PipelineExecutor执行。 - 根据具体的
PipelineExecutor执行,将对用户的代码进行编译生成 StreamGraph,经过优化后生成 Jobgraph。
具体流程图如下:

59、看你提到 PipeExecutor,它有哪些实现类?
PipeExecutor 在 Flink 中被叫做 流水线执行器,它是一个接口,是 Flink Client 生成 JobGraph 之后,将作业提交给集群的重要环节。前面说过,作业提交到集群有好几种方式,最常用的是 yarn 方式,yarn 方式包含
3
3
3 种提交模式,主要使用 session 模式,per-job 模式。application 模式中 JobGraph 是在集群中生成。
所以 PipeExecutor 的实现类如下图所示:(在代码中按 CTRL+H 就会出来)

除了上述红框的两种模式外,在 IDEA 环境中运行 Flink MiniCluster 进行调试时,使用 LocalExecutor。
60、Local 提交模式有啥特点,怎么实现的?
Local 是在本地 IDEA 环境中运行的提交方式。不上集群。主要用于调试,原理图如下:

-
Flink 程序由
JobClient进行提交。 -
JobClient将作业提交给JobManager。 -
JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager。 -
TaskManager启动一个线程开始执行,TaskManager会向JobManager报告状态更改,如开始执 行,正在进行或者已完成。 -
作业执行完成后,结果将发送回客户端。
源码分析:通过 Flink 1.12.2 1.12.2 1.12.2 源码进行分析的。
(1)创建获取对应的 StreamExecutionEnvironment 对象:LocalStreamEnvironment。
调用 StreamExecutionEnvironment 对象的 execute 方法。





(2)获取 StreamGraph。

(3)执行具体的 PipeLineExecutor 得到 localExecutorFactory。

(4) 获取 JobGraph。
根据 localExecutorFactory 的实现类 LocalExecutor 生成 JobGraph。

上面这部分全部是在 Flink Client 生成的。由于是使用 Local 模式提交,所以接下来将创建 MiniCluster 集群,由 miniCluster.submitJob 指定要提交的 jobGraph。
(5)实例化 MiniCluster 集群。

(6)返回 JobClient 客户端。
在上面执行 miniCluster.submitJob 将 JobGraph 提交到本地集群后,会返回一个 JobClient 客户端,该 JobClient 包含了应用的一些详细信息,包括 JobID、应用的状态等等。最后返回到代码执行的上一层,对应类为 StreamExecutionEnvironment。

以上就是 Local 模式的源码执行过程。
61、远程提交模式都有哪些?
远程提交方式:分为 Standalone 方式、Yarn 方式、K8s 方式。
- Standalone:包含
session模式。 - Yarn 方式 分为三种提交模式:
yarn-per-job模式、yarn-Session模式、yarn-application模式。 - K8s 方式:包含
session模式。
62、Standalone 模式简单介绍一下?
Standalone 模式为 Flink 集群的 单机版提交方式,只使用一个节点进行提交,常用 Session 模式。

提交命令如下:
bin/flink run org.apache.flink.WordCount xxx.jar
Client客户端提交任务给JobManager。JobManager负责申请任务运行所需要的资源并管理任务和资源。JobManager分发任务给TaskManager执行。TaskManager定期向JobManager汇报状态。
63、yarn 集群提交方式介绍一下?
通过 yarn 集群提交分为 3 3 3 种提交方式:
session模式per-job模式application模式
64、yarn - session 模式特点?
提交命令如下:
./bin/flink run -t yarn-session \
-Dyarn.application.id=application_XXXX_YY xxx.jar
yarn-session 模式:所有作业共享集群资源,隔离性差,JM 负载瓶颈,main 方法在客户端执行。适合执行时间短,频繁执行的短任务,集群中的所有作业只有一个 JobManager,另外,Job 被随机分配给 TaskManager。
特点:session-cluster 模式需要先启动集群,然后再提交作业,接着会向 Yarn 申请一块空间后,资源永远保持不变。如果资源满了,下一个作业就无法提交,只能等到 Yarn 中的其中一个作业执行完成后,释放了资源,下个作业才会正常提交。所有作业共享 Dispatcher 和 ResourceManager,共享资源,适合规模小执行时间短的作业。

65、yarn - per - job 模式特点?
提交命令:
./bin/flink run -t yarn-per-job --detached xxx.jar
yarn-per-job 模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在客户端执行。在 per-job 模式下,每个 Job 都有一个 JobManager,每个 TaskManager 只有单个 Job。
特点:一个任务会对应一个 Job,每提交一个作业会根据自身的情况,都会单独向 Yarn 申请资源,直到作业执行完成,一个作业的失败与否并不会影响下一个作业的正常提交和运行。独享 Dispatcher 和 ResourceManager,按需接受资源申请。适合规模大长时间运行的作业。

66、yarn - application 模式特点?
提交命令如下:
./bin/flink run-application -t yarn-application xxx.jar
yarn-application 模式:每个作业单独启动集群,隔离性好,JM 负载均衡,main 方法在 JobManager 上执行。
在 yarn-per-job 和 yarn-session 模式下,客户端都需要执行以下三步,即:
- 获取作业所需的依赖项;
- 通过执行环境分析并取得逻辑计划,即
StreamGraph→JobGraph; - 将依赖项和 JobGraph 上传到集群中。

只有在这些都完成之后,才会通过 env.execute() 方法触发 Flink 运行时真正地开始执行作业。如果所有用户都在同一个客户端上提交作业,较大的依赖会消耗更多的带宽,而较复杂的作业逻辑翻译成 JobGraph 也需要吃掉更多的 CPU 和内存,客户端的资源反而会成为瓶颈。
为了解决它,社区在传统部署模式的基础上实现了 Application 模式。原本需要客户端做的三件事被转移到了 JobManager 里,也就是说 main() 方法在集群中执行(入口点位于 ApplicationClusterEntryPoint),客户端只需要负责发起部署请求了。

综上所述,Flink 社区比较推荐使用 yarn-per-job 或者 yarn-application 模式进行提交应用。
67、yarn - session 提交流程详细介绍一下?
提交流程图如下:

1、启动集群
Flink Client向Yarn ResourceManager提交任务信息。Flink Client将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。Flink Client向Yarn ResourceManager提交任务信息。
- Yarn 启动 Flink 集群,做
2
2
2 步操作:
- 通过
Yarn Client向Yarn ResourceManager提交 Flink 创建集群的申请,Yarn ResourceManager分配 Container 资源,并通知对应的NodeManager上启动一个ApplicationMaster(每提交一个 Flink Job 就会启动一个ApplicationMaster),ApplicationMaster会包含当前要启动的JobManager和 Flink 自己内部要使用的ResourceManager。 - 在
JobManager进程中运行YarnSessionClusterEntryPoint作为集群启动的入口。初始化Dispatcher,Flink 自己内部要使用的ResourceManager,启动相关 RPC 服务,等待Flink Client通过 Rest 接口提交 JobGraph。
- 通过
2、作业提交
-
Flink Client通过 Rest 向Dispatcher提交编译好的 JobGraph。Dispatcher是 Rest 接口,不负责实际的调度、指定工作。 -
Dispatcher收到 JobGraph 后,为作业创建一个JobMaster,将工作交给JobMaster,JobMaster负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph(JobGraph 的并行化版本,调度层最核心的数据结构)。
以上两步执行完后,作业进入调度执行阶段。
3、作业调度执行
-
JobMaster向ResourceManager申请资源,开始调度 ExecutionGraph。 -
ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的 Container 来启动TaskManager进程。 -
YarnResourceManager启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动TaskManager,TaskManager启动TaskExecutor。 -
TaskManager启动后,向ResourceManager注册,并把自己的 Slot 资源情况汇报给ResourceManager。 -
ResourceManager从等待队列取出 Slot 请求,向TaskManager确认资源可用情况,并告知TaskManager将 Slot 分配给哪个JobMaster。 -
TaskManager向JobMaster回复自己的一个 Slot 属于你这个任务,JobMaser会将 Slot 缓存到 SlotPool。 -
JobMaster调度 Task 到TaskMnager的 Slot 上执行。
68、yarn - per - job 提交流程详细介绍一下?
提交命令如下:
./bin/flink run -t yarn-per-job --detached xxx.jar
提交流程图如下所示:

1、启动集群
Flink Client向Yarn ResourceManager提交任务信息。Flink Client将应用配置(Flink-conf.yaml、logback.xml、log4j.properties)和相关文件(Flink Jar、配置类文件、用户 Jar 文件、JobGraph 对象等)上传至分布式存储 HDFS 中。Flink Client向Yarn ResourceManager提交任务信息。
- Yarn 启动 Flink 集群,做
2
2
2 步操作。
- 通过
Yarn Client向Yarn ResourceManager提交 Flink 创建集群的申请,Yarn ResourceManager分配 Container 资源,并通知对应的NodeManager上启动一个ApplicationMaster(每提交一个 Flink Job 就会启动一个ApplicationMaster),ApplicationMaster会包含当前要启动的JobManager和 Flink 自己内部要使用的ResourceManager。 - 在
JobManager进程中运行YarnJobClusterEntryPoint作为集群启动的入口。初始化Dispatcher,Flink 自己内部要使用的ResourceManager,启动相关 RPC 服务,等待Flink Client通过 Rest 接口提交 JobGraph。
- 通过
2、作业提交
ApplicationMaster启动Dispatcher,Dispatcher启动ResourceManager和JobMaster(该步和 Session 不同,JobMaster是由Dispatcher拉起,而不是 Client 传过来的)。JobMaster负责作业调度,管理作业和 Task 的生命周期,构建 ExecutionGraph(JobGraph 的并行化版本,调度层最核心的数据结构)。
以上两步执行完后,作业进入调度执行阶段。
3、作业调度执行
-
JobMaster向ResourceManager申请 Slot 资源,开始调度 ExecutionGraph。 -
ResourceManager将资源请求加入等待队列,通过心跳向YarnResourceManager申请新的 Container 来启动TaskManager进程。 -
YarnResourceManager启动,然后从 HDFS 加载 Jar 文件等所需相关资源,在容器中启动TaskManager。 -
TaskManager在内部启动TaskExecutor。 -
TaskManager启动后,向ResourceManager注册,并把自己的 Slot 资源情况汇报给ResourceManager。 -
ResourceManager从等待队列取出 Slot 请求,向TaskManager确认资源可用情况,并告知TaskManager将 Slot 分配给哪个JobMaster。 -
TaskManager向JobMaster回复自己的一个 Slot 属于你这个任务,JobMaser会将 Slot 缓存到 SlotPool。 -
JobMaster调度 Task 到TaskMnager的 Slot 上执行。



















