Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

3 JobGraph 在 Client 生成 

StreamGraph 转变成 JobGraph 也是在 Client 完成,主要作了三件事:
⚫ StreamNode 转成 JobVertex。
⚫ StreamEdge 转成 JobEdge。
⚫ JobEdge 和 JobVertex 之间创建 IntermediateDataSet 来连接。
从 1.2.6 接着进行源码分析,看 execute 里的逻辑(yarn-per-job 为例):
AbstractJobClusterExecutor.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
PipelineExecutorUtils.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
FlinkPipelineTranslationUtil.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
StreamGraphTranslator.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
StreamGraph.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
StreamingJobGraphGenerator.java
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
看一下核心类 StreamingJobGraphGenerator 的相关属性:
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

核心逻辑:根据 StreamGraph,生成 JobGraph: 
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

  StreamingJobGraphGenerator 的成员变量都是为了辅助生成最终的 JobGraph。
  为所有节点生成一个唯一的 hash id,如果节点在多次提交中没有改变(包括并发度、上下游等),那么这个 id 就不会改变,这主要用于故障恢复。
  这里不能用 StreamNode.id 来代替,因为这是一个从 1 开始的静态计数变量,同样的 Job可能会得到不一样的 id,如下代码示例的两个 job 是完全一样的,但是 source 的 id 却不一
样了。 
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph
看一下最关键的 chaining 处理:
Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

 Flink 源码(二十一):Flink 任务调度机制(二)JobGraph

  每个 JobVertex 都会对应一个可序列化的 StreamConfig, 用来发送给 JobManager 和TaskManager。最后在 TaskManager 中起 Task 时,需要从这里面反序列化出所需要的配置信息, 其中就包括了含有用户代码的 StreamOperator。
  setChaining 会对 source 调用 createChain 方法,该方法会递归调用下游节点,从而构建出 node chains。createChain 会分析当前节点的出边,根据 Operator Chains 中的 chainable 条
件,将出边分成 chainalbe 和 noChainable 两类,并分别递归调用自身方法。之后会将StreamNode 中的配置信息序列化到 StreamConfig 中。如果当前不是 chain 中的子节点,则会构建 JobVertex 和 JobEdge 相连。如果是 chain 中的子节点,则会将 StreamConfig 添加到该chain 的 config 集合中。一个 node chains,除了 headOfChain node 会生成对应的 JobVertex,其余的 nodes 都是以序列化的形式写入到 StreamConfig 中,并保存到 headOfChain 的CHAINED_TASK_CONFIG 配置项中。直到部署时,才会取出并生成对应的 ChainOperators。