揭秘Flink四种执行图(上)——StreamGraph和JobGraph

云平台

  Flink的Task任务调度执行 1、Graph的概念 Flink 中的执行图可以分成四层:StreamGraph ->JobGraph -> ExecutionGraph -> 物理执行图。 StreamGraph:是根据用户通过 Stream API 编写的代码生成的最初的图。用来表示程序的拓扑结构。 JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。主要的优化为,将多个符合条件的节点 chain 在一起作为一个节点,这样可以减少数据在节点之间流动所需要的序列化/反序列化/传输消耗。 ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。 物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。 例如example里的SocketTextStreamWordCount并发度为2(Source为1个并发度)的四层执行图的演变过程如下图所示: public static void main(String[] args) throws Exception { // 检查输入 final ParameterTool params =ParameterTool.fromArgs(args); ... // set up the execution environment final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment(); // get input data DataStream<String> text = env.socketTextStream(params.get("hostname"),params.getInt("port"), '\n', 0); DataStream<Tuple2<String,Integer>> counts = // split up the lines in pairs (2-tuples)containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field"0" and sum up tuple field "1" .keyBy(0) .sum(1); counts.print(); // execute program env.execute("WordCount fromSocketTextStream Example");} 名词解释: 1)StreamGraph:根据用户通过 Stream API 编写的代码生成的最初的图。 (1)StreamNode:用来代表 operator 的类,并具有所有相关的属性,如并发度、入边和出边等。 (2)StreamEdge:表示连接两个StreamNode的边。 2)JobGraph:StreamGraph经过优化后生成了 JobGraph,提交给 JobManager 的数据结构。 (1)JobVertex:经过优化后符合条件的多个StreamNode可能会chain在一起生成一个JobVertex,即一个JobVertex包含一个或多个operator,JobVertex的输入是JobEdge,输出是IntermediateDataSet。 (2)IntermediateDataSet:表示JobVertex的输出,即经过operator处理产生的数据集。producer是JobVertex,consumer是JobEdge。 (3)JobEdge:代表了job graph中的一条数据传输通道。source 是IntermediateDataSet,target 是 JobVertex。即数据通过JobEdge由IntermediateDataSet传递给目标JobVertex。 3)ExecutionGraph:JobManager 根据 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。 (1)ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有和并发度一样多的 ExecutionVertex。 (2)ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输出是IntermediateResultPartition。 (3)IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。 (4)IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是ExecutionVertex,consumer是若干个ExecutionEdge。 (5)ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,target是ExecutionVertex。source和target都只能是一个。 (6)Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下 ExecutionVertex 可能会有多个ExecutionAttemptID。一个 Execution 通过 ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过ExecutionAttemptID 来确定消息接受者。 从这些基本概念中,也可以看出以下⼏点: 由于每个 JobVertex 可能有多个IntermediateDataSet,所以每个ExecutionJobVertex可能有多个IntermediateResult,因此,每个ExecutionVertex也可能会包含多个IntermediateResultPartition;ExecutionEdge 这里主要的作⽤是把ExecutionVertex 和 IntermediateResultPartition 连接起来,表示它们之间的连接关系。4)物理执行图:JobManager 根据 ExecutionGraph 对 Job 进行调度后,在各个TaskManager 上部署 Task 后形成的“图”,并不是一个具体的数据结构。 (1)Task:Execution被调度后在分配的 TaskManager 中启动对应的 Task。Task 包裹了具有用户执行逻辑的 operator。 (2)ResultPartition:代表由一个Task的生成的数据,和ExecutionGraph中的IntermediateResultPartition一一对应。 (3)ResultSubpartition:是ResultPartition的一个子分区。每个ResultPartition包含多个ResultSubpartition,其数目要由下游消费 Task 数和DistributionPattern 来决定。 (4)InputGate:代表Task的输入封装,和JobGraph中JobEdge一一对应。每个InputGate消费了一个或多个的ResultPartition。 (5)InputChannel:每个InputGate会包含一个以上的InputChannel,和ExecutionGraph中的ExecutionEdge一一对应,也和ResultSubpartition一对一地相连,即一个InputChannel接收一个ResultSubpartition的输出。 2、StreamGraph在Client生成 调用用户代码中的StreamExecutionEnvironment.execute() -> execute(getJobName()) ->execute(getStreamGraph(jobName)) ->getStreamGraph(jobName, true) StreamExecutionEnvironment.java public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) { StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate(); if (clearTransformations) { this.transformations.clear(); } return streamGraph;} public StreamGraph generate() { streamGraph = newStreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings); shouldExecuteInBatchMode =shouldExecuteInBatchMode(runtimeExecutionMode); configureStreamGraph(streamGraph); alreadyTransformed = new HashMap<>(); for (Transformation<?>transformation: transformations) { transform(transformation); } final StreamGraph builtStreamGraph =streamGraph; alreadyTransformed.clear(); alreadyTransformed = null; streamGraph = null; return builtStreamGraph;}一个关键的参数是List<Transformation<?>> transformations。Transformation代表了从一个或多个DataStream生成新DataStream的操作。DataStream的底层其实就是一个 Transformation,描述了这个DataStream是怎么来的。 DataStream 上常见的 transformation 有 map、flatmap、filter等。这些transformation会构造出一棵StreamTransformation 树,通过这棵树转换成 StreamGraph。以map为例,分析List<Transformation<?>>transformations的数据: DataStream.java public <R>SingleOutputStreamOperator<R> map(MapFunction<T,R> mapper) { // 通过javareflection抽出mapper的返回值类型 TypeInformation<R> outType =TypeExtractor.getMapReturnTypes(clean(mapper), getType(), Utils.getCallLocationName(),true); return map(mapper, outType);} public <R>SingleOutputStreamOperator<R> map(MapFunction<T,R> mapper, TypeInformation<R> outputType) { // 返回一个新的DataStream,SteramMap 为 StreamOperator 的实现类 return transform("Map", outputType, new StreamMap<>(clean(mapper)));} public <R>SingleOutputStreamOperator<R> transform( String operatorName, TypeInformation<R>outTypeInfo, OneInputStreamOperator<T,R> operator) { return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));} protected <R> SingleOutputStreamOperator<R> doTransform( String operatorName, TypeInformation<R>outTypeInfo, StreamOperatorFactory<R>operatorFactory) { // read the output type of the inputTransform to coax out errors about MissingTypeInfo transformation.getOutputType(); // 新的transformation会连接上当前DataStream中的transformation,从而构建成一棵树 OneInputTransformation<T, R> resultTransform = newOneInputTransformation<>( this.transformation, operatorName, operatorFactory, outTypeInfo, environment.getParallelism()); @SuppressWarnings({"unchecked","rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform); // 所有的transformation都会存到 env 中,调用execute时遍历该list生成StreamGraph getExecutionEnvironment().addOperator(resultTransform); return returnStream;}从上方代码可以了解到,map转换将用户自定义的函数MapFunction包装到StreamMap这个Operator中,再将StreamMap包装到OneInputTransformation,最后该transformation存到env中,当调用env.execute时,遍历其中的transformation集合构造出StreamGraph。其分层实现如下图所示: 另外,并不是每一个 StreamTransformation 都会转换成 runtime 层中物理操作。有一些只是逻辑概念,比如 union、split/select、partition等。如下图所示的转换树,在运行时会优化成下方的操作图。 union、split/select(1.12已移除)、partition中的信息会被写入到 Source –> Map 的边中。通过源码也可以发现UnionTransformation,SplitTransformation(1.12移除),SelectTransformation(1.12移除),PartitionTransformation由于不包含具体的操作所以都没有StreamOperator成员变量,而其他StreamTransformation的子类基本上都有。 接着分析StreamGraph生成的源码: StreamExecutionEnvironment.java-> generator() -> transform() // 对每个transformation进行转换,转换成 StreamGraph 中的 StreamNode 和 StreamEdge// 返回值为该transform的id集合,通常大小为1个(除FeedbackTransformation)private Collection<Integer> transform(Transformation<?>transform) { if(alreadyTransformed.containsKey(transform)) { return alreadyTransformed.get(transform); } LOG.debug("Transforming " +transform); if (transform.getMaxParallelism() <=0) { // if the max parallelismhasn't been set, then first use the job wide max parallelism // from the ExecutionConfig. int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism(); if (globalMaxParallelismFromConfig> 0) { transform.setMaxParallelism(globalMaxParallelismFromConfig); } } // call at least once to triggerexceptions about MissingTypeInfo // 为了触发 MissingTypeInfo 的异常 transform.getOutputType(); @SuppressWarnings("unchecked") final TransformationTranslator<?,Transformation<?>> translator = (TransformationTranslator<?,Transformation<?>>) translatorMap.get(transform.getClass()); Collection<Integer>transformedIds; if (translator != null) { transformedIds = translate(translator, transform); } else { transformedIds =legacyTransform(transform); } // need this check because the iteratetransformation adds itself before // transforming the feedback edges if (!alreadyTransformed.containsKey(transform)){ alreadyTransformed.put(transform,transformedIds); } return transformedIds;} private Collection<Integer> translate( finalTransformationTranslator<?, Transformation<?>> translator, final Transformation<?>transform) { checkNotNull(translator); checkNotNull(transform); final List<Collection<Integer>> allInputIds =getParentInputIds(transform.getInputs()); // the recursive call might havealready transformed this if(alreadyTransformed.containsKey(transform)) { returnalreadyTransformed.get(transform); } final String slotSharingGroup =determineSlotSharingGroup( transform.getSlotSharingGroup(), allInputIds.stream() .flatMap(Collection::stream) .collect(Collectors.toList())); final TransformationTranslator.Contextcontext = new ContextImpl( this, streamGraph,slotSharingGroup, configuration); return shouldExecuteInBatchMode ?translator.translateForBatch(transform, context) : translator.translateForStreaming(transform,context);}SimpleTransformationTranslator.java public Collection<Integer> translateForStreaming(final Ttransformation, final Context context) { checkNotNull(transformation); checkNotNull(context); final Collection<Integer>transformedIds = translateForStreamingInternal(transformation,context); configure(transformation, context); return transformedIds;}Abstract OneInputTransformationTranslator.java protected Collection<Integer> translateInternal( final Transformation<OUT>transformation, final StreamOperatorFactory<OUT> operatorFactory, final TypeInformation<IN> inputType, @Nullable final KeySelector<IN, ?> stateKeySelector, @Nullable final TypeInformation<?> stateKeyType, final Context context) { checkNotNull(transformation); checkNotNull(operatorFactory); checkNotNull(inputType); checkNotNull(context); final StreamGraph streamGraph =context.getStreamGraph(); final String slotSharingGroup =context.getSlotSharingGroup(); final int transformationId =transformation.getId(); final ExecutionConfig executionConfig =streamGraph.getExecutionConfig(); // 添加StreamNode streamGraph.addOperator( transformationId, slotSharingGroup, transformation.getCoLocationGroupKey(), operatorFactory, inputType, transformation.getOutputType(), transformation.getName()); if (stateKeySelector != null) { TypeSerializer<?>keySerializer = stateKeyType.createSerializer(executionConfig); streamGraph.setOneInputStateKey(transformationId,stateKeySelector, keySerializer); } int parallelism =transformation.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?transformation.getParallelism() :executionConfig.getParallelism(); streamGraph.setParallelism(transformationId,parallelism); streamGraph.setMaxParallelism(transformationId,transformation.getMaxParallelism()); final List<Transformation<?>> parentTransformations =transformation.getInputs(); checkState( parentTransformations.size()== 1, "Expected exactly oneinput transformation but found " + parentTransformations.size()); // 添加StreamEdge for (Integer inputId: context.getStreamNodeIds(parentTransformations.get(0))){ streamGraph.addEdge(inputId, transformationId,0); } return Collections.singleton(transformationId);}该函数首先会对该transform的上游transform进行递归转换,确保上游的都已经完成了转化。然后通过transform构造出StreamNode,最后与上游的transform进行连接,构造出StreamNode。 最后再来看下对逻辑转换(partition、union等)的处理,如下是transformPartition函数的源码: PartitionTransformationTranslator.java protected Collection<Integer> translateForStreamingInternal( final PartitionTransformation<OUT> transformation, final Context context) { return translateInternal(transformation, context);} private Collection<Integer> translateInternal( final PartitionTransformation<OUT> transformation, final Context context) { checkNotNull(transformation); checkNotNull(context); final StreamGraph streamGraph =context.getStreamGraph(); final List<Transformation<?>> parentTransformations =transformation.getInputs(); checkState( parentTransformations.size()== 1, "Expected exactlyone input transformation but found " + parentTransformations.size()); final Transformation<?> input =parentTransformations.get(0); List<Integer> resultIds = newArrayList<>(); for (Integer inputId:context.getStreamNodeIds(input)) { // 生成一个新的虚拟id final int virtualId = Transformation.getNewNodeId(); // 添加一个虚拟分区节点,不会生成 StreamNode streamGraph.addVirtualPartitionNode( inputId, virtualId, transformation.getPartitioner(), transformation.getShuffleMode()); resultIds.add(virtualId); } return resultIds;}对partition的转换没有生成具体的StreamNode和StreamEdge,而是添加一个虚节点。当partition的下游transform(如map)添加edge时(调用StreamGraph.addEdge),会把partition信息写入到edge中。接前面map的流程: AbstractOneInputTransformationTranslator.java-> translateInternal() public void addEdge(Integer upStreamVertexID,Integer downStreamVertexID, int typeNumber) { addEdgeInternal(upStreamVertexID, downStreamVertexID, typeNumber, null, newArrayList<String>(), null, null); } private void addEdgeInternal(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber, StreamPartitioner<?>partitioner, List<String>outputNames, OutputTag outputTag, ShuffleMode shuffleMode) { // 当上游是侧输出时,递归调用,并传入侧输出信息 if (virtualSideOutputNodes.containsKey(upStreamVertexID)){ int virtualId =upStreamVertexID; upStreamVertexID =virtualSideOutputNodes.get(virtualId).f0; if (outputTag == null) { outputTag =virtualSideOutputNodes.get(virtualId).f1; } addEdgeInternal(upStreamVertexID,downStreamVertexID, typeNumber, partitioner, null, outputTag, shuffleMode); //当上游是partition时,递归调用,并传入partitioner信息 } else if (virtualPartitionNodes.containsKey(upStreamVertexID)){ int virtualId = upStreamVertexID; upStreamVertexID =virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner =virtualPartitionNodes.get(virtualId).f1; } shuffleMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal(upStreamVertexID,downStreamVertexID, typeNumber, partitioner, outputNames, outputTag,shuffleMode); } else { // 真正构建StreamEdge StreamNode upstreamNode =getStreamNode(upStreamVertexID); StreamNode downstreamNode =getStreamNode(downStreamVertexID); // If no partitioner wasspecified and the parallelism of upstream and downstream // operator matches useforward partitioning, use rebalance otherwise. // 未指定partitioner的话,会为其选择 forward 或 rebalance 分区。 if (partitioner == null&& upstreamNode.getParallelism() == downstreamNode.getParallelism()) { partitioner = newForwardPartitioner<Object>(); } else if (partitioner ==null) { partitioner = newRebalancePartitioner<Object>(); } // 健康检查,forward 分区必须要上下游的并发度一致 if (partitioner instanceofForwardPartitioner) { if(upstreamNode.getParallelism() != downstreamNode.getParallelism()) { throw new UnsupportedOperationException("Forward partitioning does not allow "+ "changeof parallelism. Upstream operation: " + upstreamNode + " parallelism:" + upstreamNode.getParallelism() + ",downstream operation: " + downstreamNode + " parallelism: " +downstreamNode.getParallelism() + "You must use another partitioning strategy, such as broadcast, rebalance,shuffle or global."); } } if (shuffleMode == null) { shuffleMode =ShuffleMode.UNDEFINED; } // 创建StreamEdge StreamEdge edge = newStreamEdge(upstreamNode, downstreamNode, typeNumber, partitioner,outputTag, shuffleMode); // 将该StreamEdge 添加到上游的输出,下游的输入 getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); }}实例分析: 看一个实例:如下程序,是一个从 Source 中按行切分成单词并过滤输出的简单流程序,其中包含了逻辑转换:随机分区shuffle。分析该程序是如何生成StreamGraph的。 DataStream<String>text = env.socketTextStream(hostName, port);text.flatMap(newLineSplitter()).shuffle().filter(new HelloFilter()).print();首先会在env中生成一棵transformation树,用List<Transformation<?>>保存。其结构图如下: 其中符号*为input指针,指向上游的transformation,从而形成了一棵transformation树。然后,通过调用StreamGraphGenerator.generate(env,transformations)来生成StreamGraph。自底向上递归调用每一个transformation,也就是说处理顺序是Source->FlatMap->Shuffle->Filter->Sink。 如上图所示: 1)首先处理的Source,生成了Source的StreamNode。 2)然后处理的FlatMap,生成了FlatMap的StreamNode,并生成StreamEdge连接上游Source和FlatMap。由于上下游的并发度不一样(1:4),所以此处是Rebalance分区。 3)然后处理的Shuffle,由于是逻辑转换,并不会生成实际的节点。将partitioner信息暂存在virtuaPartitionNodes中。 4)在处理Filter时,生成了Filter的StreamNode。发现上游是shuffle,找到shuffle的上游FlatMap,创建StreamEdge与Filter相连。并把ShufflePartitioner的信息写到StreamEdge中。 5)最后处理Sink,创建Sink的StreamNode,并生成StreamEdge与上游Filter相连。由于上下游并发度一样(4:4),所以此处选择 Forward 分区。 最后可以通过 UI可视化来观察得到的 StreamGraph。 3、JobGraph在Client生成 StreamGraph 转变成 JobGraph 也是在Client完成,主要作了三件事: StreamNode 转成JobVertex。StreamEdge 转成JobEdge。JobEdge 和JobVertex 之间创建 IntermediateDataSet 来连接。从创建完Yarn客户端应用程序后,看execute里的逻辑(yarn-per-job为例): AbstractJobClusterExecutor.java public CompletableFuture<JobClient> execute(@Nonnull finalPipeline pipeline, @Nonnull final Configuration configuration, @Nonnull finalClassLoader userCodeClassloader) throws Exception { final JobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline,configuration); … …}PipelineExecutorUtils.java public static JobGraph getJobGraph(@Nonnull final Pipelinepipeline, @Nonnull final Configuration configuration) throws MalformedURLException { checkNotNull(pipeline); checkNotNull(configuration); final ExecutionConfigAccessorexecutionConfigAccessor =ExecutionConfigAccessor.fromConfiguration(configuration); final JobGraph jobGraph =FlinkPipelineTranslationUtil .getJobGraph(pipeline, configuration,executionConfigAccessor.getParallelism()); … …}FlinkPipelineTranslationUtil.java public static JobGraph getJobGraph( Pipeline pipeline, ConfigurationoptimizerConfiguration, int defaultParallelism) { FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline); return pipelineTranslator.translateToJobGraph(pipeline, optimizerConfiguration, defaultParallelism);}StreamGraphTranslator.java public JobGraph translateToJobGraph( Pipeline pipeline, ConfigurationoptimizerConfiguration, int defaultParallelism) { checkArgument(pipeline instanceofStreamGraph, "Given pipelineis not a DataStream StreamGraph."); StreamGraph streamGraph = (StreamGraph)pipeline; return streamGraph.getJobGraph(null);}StreamGraph.java public JobGraph getJobGraph(@Nullable JobID jobID) { return StreamingJobGraphGenerator.createJobGraph(this, jobID);}StreamingJobGraphGenerator.java public static JobGraph createJobGraph(StreamGraph streamGraph,@Nullable JobID jobID) { return new StreamingJobGraphGenerator(streamGraph,jobID).createJobGraph();}看一下核心类StreamingJobGraphGenerator的相关属性: public class StreamingJobGraphGenerator { … … private StreamGraph streamGraph; // id -> JobVertex private Map<Integer, JobVertex>jobVertices;private JobGraph jobGraph; // 已经构建的JobVertex的id集合 private Collection<Integer>builtVertices; // 物理边集合(排除了chain内部的边), 按创建顺序排序 private List<StreamEdge>physicalEdgesInOrder; // 保存chain信息,部署时用来构建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig) private Map<Integer, Map<Integer,StreamConfig>> chainedConfigs; // 所有节点的配置信息,id -> StreamConfig private Map<Integer, StreamConfig> vertexConfigs; // 保存每个节点的名字,id -> chainedName private Map<Integer, String> chainedNames; private final Map<Integer, ResourceSpec> chainedMinResources;private final Map<Integer, ResourceSpec> chainedPreferredResources; private final Map<Integer,InputOutputFormatContainer> chainedInputOutputFormats; private final StreamGraphHasher defaultStreamGraphHasher;private final List<StreamGraphHasher> legacyStreamGraphHashers; // 构造函数,入参只有 StreamGraph public StreamingJobGraphGenerator(StreamGraphstreamGraph) { this.streamGraph = streamGraph; }}核心逻辑:根据 StreamGraph,生成 JobGraph: private JobGraph createJobGraph() { preValidate(); // make sure that all vertices startimmediately // streaming 模式下,调度模式是所有节点(vertices)一起启动 jobGraph.setScheduleMode(streamGraph.getScheduleMode());jobGraph.enableApproximateLocalRecovery(streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()); // Generate deterministic hashes forthe nodes in order to identify them across // submission iff they didn't change. //广度优先遍历 StreamGraph 并且为每个SteamNode生成hash id, // 保证如果提交的拓扑没有改变,则每次生成的hash都是一样的 Map<Integer, byte[]> hashes =defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // Generate legacy version hashes forbackwards compatibility List<Map<Integer, byte[]>>legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher :legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } // 最重要的函数,生成JobVertex,JobEdge等,并尽可能地将多个节点chain在一起 setChaining(hashes, legacyHashes); //将每个JobVertex的入边集合也序列化到该JobVertex的StreamConfig中// (出边集合已经在setChaining的时候写入了) setPhysicalEdges(); // 根据group name,为每个 JobVertex 指定所属的 SlotSharingGroup //以及针对 Iteration的头尾设置 CoLocationGroup setSlotSharingAndCoLocation(); setManagedMemoryFraction( Collections.unmodifiableMap(jobVertices), Collections.unmodifiableMap(vertexConfigs), Collections.unmodifiableMap(chainedConfigs), id ->streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(), id ->streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases()); // 配置checkpoint configureCheckpointing(); jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); JobGraphUtils.addUserArtifactEntries(streamGraph.getUserArtifacts(),jobGraph); // set the ExecutionConfig last when ithas been finalized try { // 将StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中 jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Couldnot serialize the ExecutionConfig." + "Thisindicates that non-serializable types (like custom serializers) wereregistered"); } return jobGraph;}StreamingJobGraphGenerator的成员变量都是为了辅助生成最终的JobGraph。 为所有节点生成一个唯一的hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个id就不会改变,这主要用于故障恢复。 这里不能用 StreamNode.id来代替,因为这是一个从1开始的静态计数变量,同样的Job可能会得到不一样的id,如下代码示例的两个job是完全一样的,但是source的id却不一样了。 // 范例1:A.id=1 B.id=2DataStream<String> A = ...DataStream<String> B = ...A.union(B).print();// 范例2:A.id=2 B.id=1DataStream<String> B = ...DataStream<String> A = ...A.union(B).print();看一下最关键的chaining处理: // 从source开始建立 node chainsprivate void setChaining(Map<Integer, byte[]>hashes, List<Map<Integer, byte[]>> legacyHashes) { // we separate out the sources that runas inputs to another operator (chained inputs) // from the sources that needs to runas the main (head) operator. final Map<Integer,OperatorChainInfo> chainEntryPoints =buildChainedInputsAndGetHeadInputs(hashes, legacyHashes); final Collection<OperatorChainInfo> initialEntryPoints = newArrayList<>(chainEntryPoints.values()); // iterate over a copy of the values,because this map gets concurrently modified // 从source开始建⽴node chains for (OperatorChainInfo info :initialEntryPoints) { createChain( info.getStartNodeId(), 1, // operators start at position 1 because 0 isfor chained source inputs info, chainEntryPoints); }} // 构建node chains,返回当前节点的物理出边// startNodeId != currentNodeId 时,说明currentNode是chain中的子节点private List<StreamEdge> createChain( final Integer currentNodeId, final int chainIndex, final OperatorChainInfochainInfo, final Map<Integer,OperatorChainInfo> chainEntryPoints) { Integer startNodeId =chainInfo.getStartNodeId(); if(!builtVertices.contains(startNodeId)) { // 过渡用的出边集合, 用来生成最终的 JobEdge, 注意不包括 chain 内部的边 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); StreamNode currentNode =streamGraph.getStreamNode(currentNodeId); // 将当前节点的出边分成 chainable 和 nonChainable 两类 for (StreamEdge outEdge : currentNode.getOutEdges()){ if(isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } for (StreamEdge chainable :chainableOutputs) { transitiveOutEdges.addAll( createChain(chainable.getTargetId(),chainIndex + 1, chainInfo, chainEntryPoints)); } // 递归调用 for (StreamEdge nonChainable :nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain( nonChainable.getTargetId(), 1,// operators start at position 1 because 0 is for chained source inputs chainEntryPoints.computeIfAbsent( nonChainable.getTargetId(), (k)-> chainInfo.newChain(nonChainable.getTargetId())), chainEntryPoints); } // 生成当前节点的显示名,如:"Keyed Aggregation -> Sink: Unnamed" chainedNames.put(currentNodeId,createChainedName(currentNodeId, chainableOutputs,Optional.ofNullable(chainEntryPoints.get(currentNodeId)))); chainedMinResources.put(currentNodeId,createChainedMinResources(currentNodeId, chainableOutputs)); chainedPreferredResources.put(currentNodeId,createChainedPreferredResources(currentNodeId, chainableOutputs)); OperatorID currentOperatorId =chainInfo.addNodeToChain(currentNodeId, chainedNames.get(currentNodeId)); if(currentNode.getInputFormat() != null) { getOrCreateFormatContainer(startNodeId).addInputFormat(currentOperatorId,currentNode.getInputFormat()); } if (currentNode.getOutputFormat()!= null) { getOrCreateFormatContainer(startNodeId).addOutputFormat(currentOperatorId,currentNode.getOutputFormat()); } // 如果当前节点是起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig // createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig StreamConfig config =currentNodeId.equals(startNodeId) ?createJobVertex(startNodeId, chainInfo) : newStreamConfig(new Configuration()); // 设置 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中. // 其中包括序列化器, StreamOperator, Checkpoint 等相关配置 setVertexConfig(currentNodeId,config, chainableOutputs, nonChainableOutputs, chainInfo.getChainedSources()); if(currentNodeId.equals(startNodeId)) { // 如果是chain的起始节点。(不是chain中的节点,也会被标记成 chain start) config.setChainStart(); config.setChainIndex(chainIndex); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); // 将当前节点(headOfChain)与所有出边相连 for (StreamEdge edge :transitiveOutEdges) {// 通过StreamEdge构建出JobEdge,创建IntermediateDataSe。

标签: 云平台