文章目录
- step 1:构建ExecutionJobVertex节点
- step 2:创建ExecutionEdge,按照拓扑模式进行连接
- 总结
JobGraph由JobVertex(顶点)和IntermediateDataSet(中间结果数据集)组成,其中JobVertex表示对数据进行的转换操作,而IntermediateDataSet则是由JobVertex产生的中间结果,既是上游JobVertex的输出,又是下游JobVertex的输入。
ExecutionGraph是JobManager根据JobGraph生成的,可以将其理解为是JobGraph的“并行化”的产物。ExecutionGraph由ExecutionJobVertex(若干个ExecutionVertex子节点)和IntermediateResult(若干个IntermediateResultPartition)组成。
构建ExecutionGraph的本质就是将每个JobVertex节点都转换成ExecutionJobVertex节点,关键词:“拆分”并行度
/**
* 初始化ExecutionGraph
*/
public static ExecutionGraph buildGraph(
@Nullable ExecutionGraph prior,
JobGraph jobGraph,
Configuration jobManagerConfig,
ScheduledExecutorService futureExecutor,
Executor ioExecutor,
SlotProvider slotProvider,
ClassLoader classLoader,
CheckpointRecoveryFactory recoveryFactory,
Time rpcTimeout,
RestartStrategy restartStrategy,
MetricGroup metrics,
BlobWriter blobWriter,
Time allocationTimeout,
Logger log,
ShuffleMaster<?> shuffleMaster,
JobMasterPartitionTracker partitionTracker,
FailoverStrategy.Factory failoverStrategyFactory) throws JobExecutionException, JobException {
// 省略部分代码...
/**
* 核心:生成ExecutionJobVertex,并根据并行度来生成对应数量的ExecutionVertex
*/
executionGraph.attachJobGraph(sortedTopology);
// 省略部分代码...
}
遍历每个JobVertex节点,每当处理一个JobVertex节点,就为其生成对应的ExecutionJobVertex节点,并进行“逻辑连接”
/**
* 根据有序添加的List<JobVertex>集合,为每个JobVertex对应生成一个ExecutionJobVertex,并根据并行度来生成对应数量的ExecutionVertex
*/
public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
// 省略部分代码...
// 遍历有序添加的List<JobVertex>集合
for (JobVertex jobVertex : topologiallySorted) {
if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
/**
* 核心:为每个JobVertex节点,对应创建一个ExecutionJobVertex节点
* (内部会创建对应数量的ExecutionVertex子节点和一定数量的IntermediateResultPartition)
*/
ExecutionJobVertex ejv = new ExecutionJobVertex(
this,
jobVertex,
1,
maxPriorAttemptsHistoryLength,
rpcTimeout,
globalModVersion,
createTimestamp);
/**
* (在Flink 1.13中 ExecutionEdge的概念被优化,将由ConsumedPartitionGroup和ConsumedVertexGroup来代替)
* 核心:这里就是让“上游IntermediateResult”的每个IntermediateResultPartition,遵从拓扑模式去跟ExecutionEdge相连,
* 所谓相连,就是以全局变量“持有”的方式,使双方产生联系
*/
ejv.connectToPredecessors(this.intermediateResults);
// 省略部分代码...
}
// 省略部分代码...
}
step 1:构建ExecutionJobVertex节点
每当遍历到一个JobVertex节点,就会调用以下方法,创建对应(JobGraph中的中间结果数据集的)数量的IntermediateResult,创建一个ExecutionJobVertex节点及其所属的(JobVertex并行度数量的)ExecutionVertex子节点
/**
* 遍历JobVertex节点时,构建ExecutionJobVertex节点(和JobVertex节点一一对应),同时创建ExecutionVertex子节点
* 关键词:“拆”并行度
*/
public ExecutionJobVertex(
ExecutionGraph graph,
JobVertex jobVertex,
int defaultParallelism,
int maxPriorAttemptsHistoryLength,
Time timeout,
long initialGlobalModVersion,
long createTimestamp) throws JobException {
if (graph == null || jobVertex == null) {
throw new NullPointerException();
}
this.graph = graph;
this.jobVertex = jobVertex;
// JobVertex内的并行度
int vertexParallelism = jobVertex.getParallelism();
// 对JobVertex的并行度进行安全判断
int numTaskVertices = vertexParallelism > 0 ? vertexParallelism : defaultParallelism;
// JobVertex的最大并行度
final int configuredMaxParallelism = jobVertex.getMaxParallelism();
this.maxParallelismConfigured = (VALUE_NOT_SET != configuredMaxParallelism);
// 设置最大并行度
setMaxParallelismInternal(maxParallelismConfigured ?
configuredMaxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism(numTaskVertices));
// 如果JobVertex的并行度大于最大并行度,抛异常
if (numTaskVertices > maxParallelism) {
throw new JobException(
String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
jobVertex.getName(),
numTaskVertices,
maxParallelism));
}
// JobVertex最终确定下来的并行度
this.parallelism = numTaskVertices;
this.resourceProfile = ResourceProfile.fromResourceSpec(jobVertex.getMinResources(), MemorySize.ZERO);
// 容纳当前ExecutionJobVertex节点内的ExecutionVertex的专用数组
this.taskVertices = new ExecutionVertex[numTaskVertices];
// 当前JobVertex节点内所有(经过链化形成的OperatorChain中的StreamOperator)的OperatorID
this.operatorIDs = Collections.unmodifiableList(jobVertex.getOperatorIDs());
this.userDefinedOperatorIds = Collections.unmodifiableList(jobVertex.getUserDefinedOperatorIDs());
this.inputs = new ArrayList<>(jobVertex.getInputs().size());
this.slotSharingGroup = jobVertex.getSlotSharingGroup();
this.coLocationGroup = jobVertex.getCoLocationGroup();
if (coLocationGroup != null && slotSharingGroup == null) {
throw new JobException("Vertex uses a co-location constraint without using slot sharing");
}
// 根据JobVertex节点的中间结果数据集--IntermediateDateaSets(输出)数量,初始化IntermediateResult的空集合(指定大小 = 中间结果数据集--IntermediateDataSet的数量,下面会用IntermediateResult将其“填满”)
// 这个JobVertex有几个IntermediateDataSet的数量(中间结果数据集),这个IntermediateResult集合的size就是几个
// 比如:3个输出边(中间结果数据集),就有3个IntermediateResult(每个IntermediateResult都有N个IntermediateResultPartition)
this.producedDataSets = new IntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];
/**
* 为这个JobVertex节点的N个IntermediateDataSet(中间结果数据集),创建对应数量的IntermediateResult
*/
for (int i = 0; i < jobVertex.getProducedDataSets().size(); i++) {
final IntermediateDataSet result = jobVertex.getProducedDataSets().get(i);
// 创建IntermediateResult:将生成IntermediateResult,“填”到IntermediateResult数组的指定位置
this.producedDataSets[i] = new IntermediateResult(
result.getId(),
this,
// JobVertex节点的并行度
numTaskVertices,
result.getResultType());
}
/**
* 根据JobVertex节点的并行度,创建对应数量的ExecutionVertex(如果这个JobVertex的并行度=3,就创建3个ExecutionVertex)
*/
for (int i = 0; i < numTaskVertices; i++) {
// 核心:创建ExecutionVertex
ExecutionVertex vertex = new ExecutionVertex(
this,
i,
producedDataSets,
timeout,
initialGlobalModVersion,
createTimestamp,
maxPriorAttemptsHistoryLength);
// 将生成的ExecutionVertex,“填”到数组的指定index位置
this.taskVertices[i] = vertex;
}
for (IntermediateResult ir : this.producedDataSets) {
if (ir.getNumberOfAssignedPartitions() != parallelism) {
throw new RuntimeException("The intermediate result's partitions were not correctly assigned.");
}
}
try {
@SuppressWarnings("unchecked")
InputSplitSource<InputSplit> splitSource = (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();
if (splitSource != null) {
Thread currentThread = Thread.currentThread();
ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(graph.getUserClassLoader());
try {
// 根据JobVertex的并行度(即ExecutionVertex的数量),切分输入
inputSplits = splitSource.createInputSplits(numTaskVertices);
if (inputSplits != null) {
splitAssigner = splitSource.getInputSplitAssigner(inputSplits);
}
} finally {
currentThread.setContextClassLoader(oldContextClassLoader);
}
}
else {
inputSplits = null;
}
}
catch (Throwable t) {
throw new JobException("Creating the input splits caused an error: " + t.getMessage(), t);
}
}
首先根据当前JobVertex节点的中间结果数据集–IntermediateDateaSets的数量,确定出一个固定size的IntermediateResult数组。因为JobGraph中的IntermediateDateaSet是跟ExecutionGraph中的IntermediateResult一一对应的。
接着还是根据JobVertex节点的中间结果数据集–IntermediateDateaSets的数量,new出来对应数量的IntermediateResult,并将其保存到刚刚准备好的IntermediateResult数组中。
然后根据这个JobVertex节点的并行度,new出对应数量的ExecutionVertex子节点,并将其逐个“填充”到ExecutionVertex数组的对应位置上(new ExecutionJobVertex时会按照JobVertex的并行度,初始化出一个对应容量的ExecutionVertex数组)。
/**
* 根据JobVertex节点的并行度,创建对应数量的ExecutionVertex:
* 1.每当创建一个ExecutionVertex,就会为所有的IntermediateResult创建一个IntermediateResultPartition
* 2.将需要部署的Task的相关信息包装成Execution,方便发送给TaskExecutor(TaskExecutor会基于它来启动Task)
*/
public ExecutionVertex(
ExecutionJobVertex jobVertex,
int subTaskIndex,
IntermediateResult[] producedDataSets,
Time timeout,
long initialGlobalModVersion,
long createTimestamp,
int maxPriorExecutionHistoryLength) {
this.jobVertex = jobVertex;
this.subTaskIndex = subTaskIndex;
this.executionVertexId = new ExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);
this.taskNameWithSubtask = String.format("%s (%d/%d)",
jobVertex.getJobVertex().getName(), subTaskIndex + 1, jobVertex.getParallelism());
// 根据JobVertex节点的并行度,创建有序集合--LinkedHashMap
this.resultPartitions = new LinkedHashMap<>(producedDataSets.length, 1);
// JobVertex有3个并行度,就会循环创建3次ExecutionVertex。由于ExecutionJobVertex中的ExecutionVertex子节点,
// 和IntermediateResult中的IntermediateResultPartition是一一对应的,因此每当创建1个ExecutionVertex时,
// 就会为每个IntermediateResult“安排”1个IntermediateResultPartition
for (IntermediateResult result : producedDataSets) {
// 根据IntermediateResult,创建IntermediateResultPartition(用来接收ExecutionVertex的输出)
IntermediateResultPartition irp = new IntermediateResultPartition(result, this, subTaskIndex);
result.setPartition(subTaskIndex, irp);
// 将创建好的IntermediateResultPartition,保存到LinkedHashMap中
resultPartitions.put(irp.getPartitionId(), irp);
}
// 根据JobVertex节点的所有JobEdge(输入边)的数量,准备一个固定大小的ExecutionEdge二元数组,记录的是每个“JobVertex输入”对应的ExecutionEdge的数量
this.inputEdges = new ExecutionEdge[jobVertex.getJobVertex().getInputs().size()][];
this.priorExecutions = new EvictingBoundedList<>(maxPriorExecutionHistoryLength);
// JobMaster拿到对应TM节点的Slot资源后,会把部署Task所需的信息包装成Execution,
// 后续会调用Execution#deploy()方法执行部署:调用RPC请求,将Execution内包装的信息发送给TaskExecutor。
// TaskExecutor收到后,将其包装成Task对象,然后启动这个Task
this.currentExecution = new Execution(
getExecutionGraph().getFutureExecutor(),
this,
0,
initialGlobalModVersion,
createTimestamp,
timeout);
CoLocationGroup clg = jobVertex.getCoLocationGroup();
if (clg != null) {
this.locationConstraint = clg.getLocationConstraint(subTaskIndex);
}
else {
this.locationConstraint = null;
}
getExecutionGraph().registerExecution(currentExecution);
this.timeout = timeout;
this.inputSplits = new ArrayList<>();
}
在为ExecutionJobVertex创建ExecutionVertex子节点过程中,由于ExecutionVertex和IntermediateResult是一一对应的,因此只要new一个ExecutionVertex子节点,就会为每个IntermediateResult都“准备”1个IntermediateResultPartition。
接着根据这个JobVertex节点的输入边(JobEdge)的数量,确定好ExecutionEdge数组的容量大小,后面会创建ExecutionEdge并保存其中。
step 2:创建ExecutionEdge,按照拓扑模式进行连接
/**
* JobVertex经过“拆分”并行度后,会创建一个对应的ExecutionJobVertex,内含N个ExecutionVertex(个数等于并行度)。
* 自然IntermediateResult也就有对应数量的IntermediateResultPartition,该方法就是让这个IntermediateResult的每个IntermediateResultPartition,
* 通过指定的数据分发模式,去跟ExecutionEdge相连
*/
public void connectToPredecessors(Map<IntermediateDataSetID, IntermediateResult> intermediateDataSets) throws JobException {
// 这个JobVertex节点的输入边的集合
List<JobEdge> inputs = jobVertex.getInputs();
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.", jobVertex.getID(), jobVertex.getName(), inputs.size()));
}
// 遍历这个JobVertex节点的所有输入边--JobEdge
for (int num = 0; num < inputs.size(); num++) {
// 拿到一个JobEdge
JobEdge edge = inputs.get(num);
if (LOG.isDebugEnabled()) {
if (edge.getSource() == null) {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num, jobVertex.getID(), jobVertex.getName(), edge.getSourceId()));
} else {
LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num, jobVertex.getID(), jobVertex.getName(), edge.getSource().getProducer().getID(), edge.getSource().getProducer().getName()));
}
}
// 这个JobEdge的“上线”--中间结果数据集,对应的中间结果IntermediateResult
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
if (ires == null) {
throw new JobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "
+ edge.getSourceId());
}
// 将IntermediateResult添加到List集合中
this.inputs.add(ires);
int consumerIndex = ires.registerConsumer();
// 遍历这个JobVertex的并行度(创建ExecutionJobVertex时,会根据JobVertex并行度创建对应数量的ExecutionVertex)
for (int i = 0; i < parallelism; i++) {
// 从数组中取出这个ExecutionJobVertex内的ExecutionVertex
ExecutionVertex ev = taskVertices[i];
// 根据数据分发模式,让这个IntermediateResult的IntermediateResultPartition,去跟ExecutionEdge进行连接
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
遍历这个JobVertex节点的所有输入边–JobEdge,每当拿到一个“上游JobEdge”,根据JobEdge拿到上游的IntermediateDataSet,根据IntermediateDataSet拿到对应的IntermediateResult。接着就要用这个“上游IntermediateResult”去创建ExecutionEdge了。
首先要拿着这个“上游IntermediateResult”,获取它所有的IntermediateResultPartition。然后根据拓扑模式,创建ExecutionEdge。
/**
* 遍历JobVertex的每个并行度,确定需要创建的ExecutionEdge的个数
*/
public void connectSource(int inputNumber, IntermediateResult source, JobEdge edge, int consumerNumber) {
// 这个JobEdge的数据分发模式
final DistributionPattern pattern = edge.getDistributionPattern();
// 获取当前ExecutionJobVertex节点的“上游IntermediateResult”的IntermediateResultPartition数组
final IntermediateResultPartition[] sourcePartitions = source.getPartitions();
// 容纳ExecutionEdge的数组
ExecutionEdge[] edges;
// 根据拓扑模式,创建ExecutionEdge
switch (pattern) {
case POINTWISE:
// 1V1
edges = connectPointwise(sourcePartitions, inputNumber);
break;
case ALL_TO_ALL:
// 笛卡尔积:为每个IntermediateResult的每个IntermediateResultPartition,创建ExecutionEdge
edges = connectAllToAll(sourcePartitions, inputNumber);
break;
default:
throw new RuntimeException("Unrecognized distribution pattern.");
}
// 将创建好的ExecutionEdge,保存到二元数组中
// 该数组在创建ExecutionVertex子节点时,就已经按照“JobVertex输入”准备好了
inputEdges[inputNumber] = edges;
for (ExecutionEdge ee : edges) {
// Source就是IntermediateResultPartition,
ee.getSource().addConsumer(ee, consumerNumber);
}
}
以ALL_TO_ALL为例,
/**
* 拓扑模式:ALL_TO_ALL
*/
private ExecutionEdge[] connectAllToAll(IntermediateResultPartition[] sourcePartitions, int inputNumber) {
// 根据当前ExecutionJobVertex节点的“上游IntermediateResult”的IntermediateResultPartition数组的length,确定ExecutionEdge数组的长度
ExecutionEdge[] edges = new ExecutionEdge[sourcePartitions.length];
// 遍历“上游IntermediateResult”的每个IntermediateResultPartition,创建ExecutionEdge
for (int i = 0; i < sourcePartitions.length; i++) {
// 拿到“上游IntermediateResult”的所有IntermediateResultPartition
IntermediateResultPartition irp = sourcePartitions[i];
// 创建ExecutionEdge,并保存到ExecutionEdge[] 数组中
edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
return edges;
}
总结
假设当前处理的JobVertex节点是FlatMap,那么首先就会为其转换ExecutionJobVertex节点:
- 1.准备固定size的IntermediateResult数组:由于FlatMap只有1个输出,因此IntermediateResult数组的size为1
- 2.创建IntermediateResult:由于FlatMap只有1个输出,因此只会生成1个IntermediateResult,并保存到IntermediateResult数组中
- 3.创建ExecutionVertex子节点:由于FlatMap的并行度为2,因此此时经过for循环new出来2个ExecutionVertex
在2次for循环为这个FlatMap创建2个ExecutionVertex子节点的过程中,会为当前ExecutionJobVertex节点的每个IntermediateResult,都new出1个IntermediateResultPartition。由于FlatMap只有1个IntermediateResult,因此2轮for循环分别为这个IntermediateResult各自new出来1个IntermediateResultPartition。
接着,准备容纳ExecutionEdge的数组:由于这个FlatMap只有1个输入边,因此这个ExecutionEdge数组的size=1。
至此,JobVertex节点向ExecutionJobVertex节点的转换就完成了!接下来就是参考当前JobVertex节点的上游输入边、拓扑模式,创建ExecutionEdge并确定连接!
由于FlatMap的输入边 only one,于是就通过这个仅有的JobEdge,拿到对应的IntermediateDataSet,并取出它对应的IntermediateResult,也就是Source的IntermediateResult,作为FlatMap的“上游IntermediateResult”。
构造ExecutionEdge:由于FlatMap的并行度为2,因此会构造ExecutionEdge的方法会循环2次。由于FlatMap的“上游IntermediateResult”的IntermediateResultPartition个数=1,因此2次循环中,每次会构造出1个size为1的ExecutionEdge数组,且会new出1个ExecutionEdge并保存到ExecutionEdge数组中。
所谓的连接就是建立联系,在new ExecutionEdge时会以全局变量的方式“持有”当前ExecutionEdge相连的“上游IntermediateResultPartition”和“下游ExecutionVertex子节点”。