flink任务提交与执行-StreamGraph和JobGr

1.StreamGraph

StreamGraph结构是通过StreamGraphGenerator转换executionEnvironment的transform集合而来的

StreamGraphStructure.png

图一

1.StreamGraph的构成

  • StreamGraph主要包含了用于构建JobGraph执行的所有必要信息,同时使用有向无环图来展示流的拓扑信息
  • StreamGraph包含了StreamNode,每个StreamNode代表了流处理系统中的一个操作,如Source,flatMap,KeyedAgg,Sink以及该操作的相关配置信息,例如最大并行maxParallelism,managedMemoryWeight内存分配权重.
  • StreamEdge是Streaming流的拓扑信息的边,连接两个StreamNode,一个StreamNode可以有多个出边,入边。该边主要是由上游(tranformation)转换操作的InputID和当前(tranformation)转换操作的tranformId相连,构成StreamGraph对应的边,主要标识了数据的流向.StreamEdge包含了sideoutput,StreamPartitioner,outputtag等参数
  • StreamGraph的还包含了timeCharacteristic、executionConfig、checkponitConfig,stateBackend等其他信息
  • StreamGraph在创建时,不设计数据的处理,仅仅对上下游数据之间进行描述,因此节点被设定为virtual形式

2. JobGraph

当StreamExecutionEnvironment.execute开始启动job,运行至StreamExecutionEnvironment.Async(StreamGraph)時,开始构建JobGraph。使用Jobgraph兼容离线OptimizedPlan和流式作业StreamGraph,让不同类型的作业都可以在同一套集群运行。

  • JobGraph代表了能够被JobManager所接收的底层的数据类型,每个JobGraph是由vertices(顶点)以及intermidiate results(中间结果)构成的一个DAG图。vertices代表了具体操作,intermidiate results代表了中间数据集

JobGraph实例图.png

图二

1. JobGraph数据结构

  • JobID:当前Job的ID
  • taskVertices:是一个map,存储了当前JobGraph的所有Vertice
  • scheduleMode:当前Job启用的Task调度模式,流式作业中默认的调度模式是EAGER类型
  • snapshotSetting:checkpoint的配置信息
  • savepointRestoreSetting:恢复任务的savepoint配置信息
  • userJars:当前作业依赖JAR包的地址,在作业提交集群的过程中,这些JAR包会通过网络上传到运行时的BlobServer中,在应用程序中的Task执行时会将相关JAR安排下载到TaskManager本地路径,并加载到Task线程所在的UserClassLoader中
  • userArtifacts: 当前作业需要使用的自定义文件,并通过DistributedCacheEntry表示
  • userJarBlobKeys:将JAR包上传到BlobServer之后,返回的BlobKey地址信息会存储在该集合中
  • classpath:当前作业对应的classpath信息

2. JobVertex主要结构

JobVertex是taskVertices对象Map 的键值对的值。主要包含了JobEdge和IntermidateDataSet等结构以及与节点相关的配置和运行资源等信息。

JobVertex结构图.png

图三

  • JobVertexID:当前的JobVertex节点的ID信息
  • idAlternatives:Vertex中的替代ID,如果JobVertex重复了,可以使用该列表中的ID
  • operatorIDs:包含当前Vertex中所有Operator的唯一ID集合
  • results:当前Vertex对应的中间结果数据集,通过IntermediateDataSet表示
  • inputs:当前Vertex所有的输入边集合,通过JobEdge表示
  • parallelism:当前Vertex节点对应的并行度参数
  • invokableClassname: 当前Vertex对应的invokableClassName,即Operator对应的ClassName,在Task运行的过程中,会通过invakableClassName进行对应Operator的初始化与执行
  • slotSharingGroup:Vertex中Slot共享组配置,该配置运行Job节点中的SubTask运行在同一个Slot中
  • inputDependencyConstraint:表示当前Vertex节点中Task被调度的输入依赖约束,其中ANY表示一旦有输入节点可以消费,就立即对当前JobVertex的Task进行调度执行。ALL表示只有当所有的输入节点全部可消费时候,才能调度当前节点对应的Task任务

3. JobEdge主要结构

JobEdge代表一个jobGraph的边(通信管道),是从IntermidiateDataSet指向job vertex,同时,JobEdge的连接方式主要由DistributionPattern来决定

JobEdge结构图.png

图四

  • JobVertex target:位于target端,处于下游的JobVertex
  • DistributionPattern distributionPattern : 定义了producing sub task和consuming sub tasks的连接方式,有ALL_TO_ALL(全连接)和POINTWISE(点对点)两种方式
  • IntermediateDataSet source:位于source端的IntermediateDataSet,从JobVertex上游输出的中间结果

4.IntermidiateDataSet

IntermidiateDataSet是由Operatpr如source或者其他operator产生的数据集。如图五所示,由如下几个参数

  • JobVertex producer:产生Data set的Operation,为IntermidiateDataSet的上游输入,由JobVertex
  • List consumers : 下游输出,是一个包含JobEdge的集合

因此在jobGraph中JobVertex和JobEdge不是直接关联的,是通过IntermidateDataset连接的。因此IntermidiateDataSet代表了JobVertex的输出,IntermediateDataset的个数和JobVertex对应的StreamNode的输出并行度相同,是一个或多个

IntermediateDataSet结构图.png

图五

  1. StreamGraph和JobGraph的异同点

  2. 生成时间段

  • Streamgraph的生成,是由StreamExecutionEnvironment.execute启动,具体由StreamGraphGenerator.generate()生成。由tranformation转换而成
  • JobGraph的生成,是由StreamExecutionEnvironment.execute启动,具体由PipelineExecutorde的实现类FlinkPipelineTranslationUtil.getJobGraph()方法来生成JobGraph,由StreamGraph转换而成
  1. 内部构成
  • StreamGraph主要包含了涉及上层组件的一些指定,例如StateBackend,TimeCharacteristic,ScheduleMode任务调度模式。同时,拥有StreamNode,StreamEdge节点,StreamNode代表了一个算子,如Source,Sink,FlatMap等。StreamEdge,主要连接两个StreamNode,包含了partitioner,sideoutputtag
  • JobGraph主要是从StreamGraph的基础上生成而来,同时针对StreamGraph进行了一下优化操作,比如利用operatorchain将多个算子进行合并,在同一个线程上调度。JobVertex相对于StreamNode来说,有可能包含了一个或者多个StreamNode。例如通过blockchain,多个StreamNode融合成一个JobVertex。同时,加入了IntermidiateDataSet作为上下游的一个管道。

参考文献

Flink设计与实现:核心原理与源码解析
Flink内核原理与实现