Flink源码解析之:如何根据StreamGraph生成JobGraph

Flink源码解析之:如何根据StreamGraph生成JobGraph

在上一章节中,我们讲解了Flink如何将用户自定义逻辑算子转换成StreamGraph。在生成StreamGraph的过程中,Flink内部没有做任何优化,只是将用户自定义算子和处理流程转换成了StreamGraph的拓扑图来表示。本章我们将会介绍StreamGraph到JobGraph的生成流程,在这一过程中,**Flink内部是做了优化操作的,主要是做了算子的Chain操作,Chain在一起的算子会形成算子链在同一个线程上下文中执行,减少算子间的上下文切换开销以及shuffle开销。**接下来,我们就带大家通过源码来深入理解这一转换流程,并理解Flink优化的算子Chain操作原理。

源码入口与转换图

StreamGraph -> JobGraph的转换核心源码,在包org.apache.flink.streaming.api.graph 下的StreamingJobGraphGenerator,该类的注解也表明了该类的作用:

The StreamingJobGraphGenerator converts a {@link StreamGraph} into a {@link JobGraph}.

StreamGraph的整体转换流程可以参考下图,有一个大致的概念,在阅读完本文后再回来看这张图可能会有更深刻的理解。

在这里插入图片描述

具体实现原理

首先进入到StreamingJobGraphGenerator类的转换入口方法createJobGraph中,源码如下所示:

private JobGraph createJobGraph() {
	// 校验应用参数
    preValidate();
    jobGraph.setJobType(streamGraph.getJobType());

    jobGraph.enableApproximateLocalRecovery(
            streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

    // Generate deterministic hashes for the nodes in order to identify them across
    // submission iff they didn't change.
	// note: 为每个StreamNode生成一个确定的hash id,如果提交的拓扑结构没有改变,则每次生成的hash id都会是一样的
	// note: 此哈希码基于节点的当前状态以及其所有输入的状态。
    Map<Integer, byte[]> hashes =
            defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

    // Generate legacy version hashes for backwards compatibility
	// note: 这个设置主要是为了防止 hash 机制变化时出现不兼容的情况
    List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
    for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
        legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
    }

	// note: 转换中最重要的方法,合并算子形成算子链、生成jobvertex、连接算子顶点等操作
    setChaining(hashes, legacyHashes);

	// note: note: 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了)
    setPhysicalEdges();

	// note: 为每个 JobVertex 指定所属的 SlotSharingGroup 以及设置 CoLocationGroup
    setSlotSharingAndCoLocation();

    setManagedMemoryFraction(
            Collections.unmodifiableMap(jobVertices),
            Collections.unmodifiableMap(vertexConfigs),
            Collections.unmodifiableMap(chainedConfigs),
            id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
            id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

	// note: checkpoint相关的配置
    configureCheckpointing();

    // note: 用户的第三方依赖包就是在这里(cacheFile)传给 JobGraph
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

    final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
            JobGraphUtils.prepareUserArtifactEntries(
                    streamGraph.getUserArtifacts().stream()
                            .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                    jobGraph.getJobID());

    for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
            distributedCacheEntries.entrySet()) {
        jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
    }

    // set the ExecutionConfig last when it has been finalized
    try {
		//note: 将 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中
        jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
    } catch (IOException e) {
        throw new IllegalConfigurationException(
                "Could not serialize the ExecutionConfig."
                        + "This indicates that non-serializable types (like custom serializers) were registered");
    }

    return jobGraph;
}

该方法的主要实现步骤如下:

  • 首先,在做任何操作之前,它运行preValidate()来验证流图的状态,确保其有效性。
  • 使用traverseStreamGraphAndGenerateHashes 方法生成流图中每个节点的哈希值。此哈希码基于节点的当前状态以及其所有输入的状态。这是为了实现Flink的状态后端,特别是在恢复作业/从保存点启动作业时需要使用这些哈希码来准确找到并恢复节点的状态。
  • setChaining(hashes, legacyHashes) 调用用于为StreamGraph设置操作链。链式操作可以在单个任务中执行多个操作符,以减少数据在任务之间的传输,以此提高性能。
    这里会生成相应的 JobVertex 、JobEdge 、 IntermediateDataSet 对象,JobGraph 的 Graph 在这一步就已经完全构建出来了;
  • setPhysicalEdges() `方法会将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了);
  • setSlotSharingAndCoLocation()设置插槽共享和共位,以实现任务之间的资源共享。
  • setManagedMemoryFraction() 设置管理内存的分数,参数包括jobVertices, vertexConfigs, chainedConfigs,以及一些函数,用于根据id获取流节点的权重和使用案例。
  • configureCheckpointing() 配置检查点。检查点是Flink提供的容错机制,可以恢复到特定状态以保证结果的正确性。
  • 最后,将StreamGraph中的ExecutionConfig(执行配置)序列化并设置给JobGraph。如果ExecutionConfig不能被序列化,例如其中含有自定义序列化程序,那么将抛出IllegalConfigurationException异常。

上面的方法中,最重要的就是setChaining方法,该方法中设置了算子的操作链,并生成JobVertex、JobEdge、IntermediateDataSet对象,并将它们连接成JobGraph的拓扑图。

为此,接下来,我们会核心着重讲解该方法。

我们先进入setChaining方法中,探究其源码执行原理。

/**
 * Sets up task chains from the source {@link StreamNode} instances.
 *
 * <p>This will recursively create all {@link JobVertex} instances.
 */
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes) {
    // we separate out the sources that run as inputs to another operator (chained inputs)
    // from the sources that needs to run as the main (head) operator.
    final Map<Integer, OperatorChainInfo> chainEntryPoints =
            buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
    final Collection<OperatorChainInfo> initialEntryPoints =
            chainEntryPoints.entrySet().stream()
                    .sorted(Comparator.comparing(Map.Entry::getKey))
                    .map(Map.Entry::getValue)
                    .collect(Collectors.toList());

    // iterate over a copy of the values, because this map gets concurrently modified
    for (OperatorChainInfo info : initialEntryPoints) {
        createChain(
                info.getStartNodeId(),
                1, // operators start at position 1 because 0 is for chained source inputs
                info,
                chainEntryPoints);
    }
}

上述代码首先执行了buildChainedInputsAndGetHeadInputs方法。该方法有什么作用呢?为了节省篇幅,暂且不列出该方法的源码,只分析其实现原理和作用。以下是该方法的主要步骤:

  1. 该方法内部会遍历所有的源节点,并判断源节点与下游直接连接的节点是否可以合并成一个算子链。
  2. 如果可以合并,则会为当前源节点创建一个StreamConfig对象,并配置源节点在当前算子链中的索引(设置为0,因为是开头)、算子ID、算子名称等元数据信息,并添加到一个chainedSources的Map对象中,该对象包含了每个源节点的上述元数据信息。
  3. 如果可以合并,则会为源节点的下游节点创建一个算子链对象OperatorChainInfo,该对象会包含上面的chainedSources属性,以标识该算子的算子链源节点。并将该下游节点和OperatorChainInfo对象放入chainEntryPoints的Map结构中。
  4. 如果无法合并,则只会将源节点本身创建算子链OperatorChainInfo对象,放入chainEntryPoints的Map结构中。表明从源头的初始算子链,只有其本身。

经过上面的处理后,就可以得到从源节点开始的初始算子链,只包含源节点和其下游节点。chainEntryPoints 映射将包含所有源节点的 OperatorChainInfo,以及可被链接的源节点的紧邻下游节点信息。其中那些可以被链到其他操作符的源节点信息也同时存储在 chainedSources映射中。这个映射将被用于后续的处理。

以下图的StreamGraph拓扑图为例,直观来看,chainedSources对象存储的是可被chain的op1op2源节点的信息。chainEntryPoint存储的是op1op2op3op4,包含源节点及其可链接的邻接下游节点信息。

在这里插入图片描述

有了上述的源节点可链接信息后,便从源节点开始遍历chainEntryPoints的values集合,对每个源节点执行createChain方法。该方法中会递归地对源节点的所有下游节点进行遍历,以判断哪些是可以链接的,哪些是不可链接的,并最终生成JobVertex和JobEdge进行连接。接下来,我们着重来分析该方法流程。

算子是如何Chain在一起的

这一小节,我们来介绍生成JobGraph的一个核心步骤,即算子是如何Chain到一起的。在具体讲解前,先看一下算子Chain的示例图:

在这里插入图片描述

上图可以看到,在StreamGraph中,从KeyedAggregation算子到DataSink算子是forward的分区方式,当Flink判断两者是可以链接到一起时,便会在转换成JobGraph时,将两个算子合并在一个算子链中,生成一个JobVertex。

StreamGraph转换为JobGraph的处理过程主要是在createChain方法中完成的,先来看下这个方法的实现:

private List<StreamEdge> createChain(
        final Integer currentNodeId,
        final int chainIndex,
        final OperatorChainInfo chainInfo,
        final Map<Integer, OperatorChainInfo> chainEntryPoints) {

	// 算子链的起始节点
    Integer startNodeId = chainInfo.getStartNodeId();
    if (!builtVertices.contains(startNodeId)) {

        List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

		// 记录能链接在一起的下游边集合和不能链接在一起的下游边集合
        List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
        List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

		// 算子链中当前要处理的StreamNode
        StreamNode currentNode = streamGraph.getStreamNode(currentNodeId);

		// 遍历当前节点的下游节点,判断是否可以chain在一起,并把出边写入相应集合中
        for (StreamEdge outEdge : currentNode.getOutEdges()) {
            if (isChainable(outEdge, streamGraph)) {
                chainableOutputs.add(outEdge);
            } else {
                nonChainableOutputs.add(outEdge);
            }
        }

		// 对于能够链接的下游节点,递归继续遍历其出边,重复上面的逻辑。
		// chainIndex表示当前StreamNode在算子链中的索引位置,递归下游节点时需要+1
        for (StreamEdge chainable : chainableOutputs) {
            transitiveOutEdges.addAll(
                    createChain(
                            chainable.getTargetId(),
                            chainIndex + 1,
                            chainInfo,
                            chainEntryPoints));
        }

		// 遍历不可链接的出边集合,并加入到transitiveOutEdges集合中
		// 这个transitiveOutEdges会在递归调用中返回,调用栈中上层调用会使用下面的connect方法将当前StreamNode与下层调用返回的transitiveOutEdges中的出边进行连接(如果不为空的话)
		// 表明当前StreamNode和返回的下游算子链顶点不能合并在同一个算子链中
        for (StreamEdge nonChainable : nonChainableOutputs) {
            transitiveOutEdges.add(nonChainable);
			// 下游StreamNode会成为新的算子链顶点,只不过这里chainIndex会设置为1
			// 并继续往下递归,构造以该下游StreamNode为顶点开始的算子链
            createChain(
                    nonChainable.getTargetId(),
                    1, // operators start at position 1 because 0 is for chained source inputs
                    chainEntryPoints.computeIfAbsent(
                            nonChainable.getTargetId(),
                            (k) -> chainInfo.newChain(nonChainable.getTargetId())),
                    chainEntryPoints);
        }

		// 记录 chainedName
        chainedNames.put(
                currentNodeId,
                createChainedName(
                        currentNodeId,
                        chainableOutputs,
                 Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
		
		// 计算Chain之后,算子链的minResources
        chainedMinResources.put(
                currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
		// 计算chain之后,算子链的资源上限
        chainedPreferredResources.put(
                currentNodeId,
                createChainedPreferredResources(currentNodeId, chainableOutputs));

        OperatorID currentOperatorId =
                chainInfo.addNodeToChain(
                        currentNodeId,
                        streamGraph.getStreamNode(currentNodeId).getOperatorName());

        if (currentNode.getInputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addInputFormat(currentOperatorId, currentNode.getInputFormat());
        }

        if (currentNode.getOutputFormat() != null) {
            getOrCreateFormatContainer(startNodeId)
                    .addOutputFormat(currentOperatorId, currentNode.getOutputFormat());
        }

		// 如果当前节点和startNodeId一致,说明递归过程回到了算子链的起始节点,则直接创建JobVertex,否则先创建一个空的StreamConfig
		// createJobVertex方法就是根据StreamNode创建对应的JobVertex,并返回包含该JobVertex配置的StreamConfig
        StreamConfig config =
                currentNodeId.equals(startNodeId)
                        ? createJobVertex(startNodeId, chainInfo)
                        : new StreamConfig(new Configuration());
		
		// 设置每个顶点的基本属性
        setVertexConfig(
                currentNodeId,
                config,
                chainableOutputs,
                nonChainableOutputs,
                chainInfo.getChainedSources());

		// 如果当前currentNodeId与startNodeId一致,证明当前算子链已经完成
        if (currentNodeId.equals(startNodeId)) {
			// 标识StreamNode为当前算子链的起始节点,设置索引位置
            config.setChainStart();
            config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
			// 遍历递归栈中返回的出边集合,如果不为空,则证明存在不能被链接在一起的出边,如果为空,则截止目前递归过程中的算子都是被合并链接在一起。
            for (StreamEdge edge : transitiveOutEdges) {
				// 在connect中构建graph
                connect(startNodeId, edge);
            }

			// 设置当前节点的所有出边
            config.setOutEdgesInOrder(transitiveOutEdges);
            // 将chain中所有子节点的StreamConfig写入到headOfChain节点的CHAINED_TASK_CONFIG配置中。
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

        } else {
			// 如果当前节点是算子链中的子节点
            chainedConfigs.computeIfAbsent(
                    startNodeId, k -> new HashMap<Integer, StreamConfig>());

            config.setChainIndex(chainIndex);
            StreamNode node = streamGraph.getStreamNode(currentNodeId);
            config.setOperatorName(node.getOperatorName());
			// 将当前StreamNode的config记录到以startNodeId开头的算子链config中
            chainedConfigs.get(startNodeId).put(currentNodeId, config);
        }

        config.setOperatorID(currentOperatorId);

		// 如果chainableOutputs为空,证明达到了当前算子链的结尾
        if (chainableOutputs.isEmpty()) {
            config.setChainEnd();
        }

		// 每次递归调用返回的是transitiveOutEdges
        return transitiveOutEdges;

    } else {
        return new ArrayList<>();
    }
}

上述createChain方法是由StreamGraph转换到JobGraph的核心方法。

该方法首先会遍历这个StreamGraph的source节点的所有下游出边,对于每一个边,它将其分类为"可链接(chainable)“和"不可链接(non-chainable)”,分别添加到chainableOutputs和nonChainableOutputs列表中。在具体的实现里,主要逻辑如下:

  1. 遍历输入节点的所有出边,判断出边对应的下游节点与当前上游节点是否可以链接在一起,判断具体逻辑在isChainable()方法中。并根据出边是否可链接,将其添加到不同的集合中。
  2. 接下来,会分两种情况,分别进行递归操作
    • 对于可以chain的情况,会继续递归该下游节点执行createChain方法,以此递归判断整个StreamGraph拓扑结构中,能够chain在一起的所有算子。同时,会将chainIndex+1,chainIndex用来标识算子在算子链中的索引位置。
    • 如果上下游节点不能被chain在一起,则transitiveOutEdges集合中会添加该下游StreamEdge,该集合会在递归调用中返回,上层递归调用根据该变量即可知悉下游递归调用过程中,不能被链接的出边有哪些,后续会调用connect方法来连接。然后会继续调用createChain执行递归,此时当前下游节点即会成为新算子链的起始节点,不过需要注意的是,这里的起始节点索引设置为1,因为只有source节点在算子链中才会被设置为0。
    • 通过上面两种方式的递归,最终递归调用会沿着StreamGraph的拓扑结构一直调用到Sink节点。
  3. 在每个递归执行完成后,也会有两种情况的判断逻辑:
    • 如果currentNodeIdstartNodeId一致,说明递归已经回退到了当前算子链的起始节点,当前Chain过程已经完成。此时,会先做一些相关配置,比如标识当前StreamNode为这个算子链的起始节点,设置其在算子链中的索引值,设置操作符名称等作用。并遍历下层递归放回的transitiveOutEdges集合,如果该集合不为空,说明下游节点在递归过程中,存在与当前节点无法链接的节点,为此会调用connect方法进行连接。
    • 如果两者不一致,那么证明当前StreamNode只是这个算子链的一部分,因此,只会设置一些当前节点在算子链中的索引值,操作符名称等信息,并会将当前节点和配置放入以startNodeId为键的chainedConfigsMap对象中,表示当前节点为startNodeId开头的算子链中的一部分。

上面就是这个方法的主要实现逻辑,下面会详细把这个方法展开,重点介绍其中的一些方法实现。

如何判断算子是否可以Chain在一起

两个StreamNode是否可以链接在一起是通过isChainable()方法来判断,更为具体的判断逻辑在该方法的isChainableInput方法中,具体的判断逻辑,如下源码所示:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
	// 获取出边对应的下游StreamNode
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

	// 下游Operator的入边只有一个,如果有多个是无法Chain在一起的,因为它还需要接收其他节点的输入
    return downStreamVertex.getInEdges().size() == 1 && 	isChainableInput(edge, streamGraph);
}


private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
	// 获取当前StreamEdge对应的上下游StreamNode
    StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
    StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

	// 1. 需要对应的slotSharingGroup一样,如果不在同一槽位,意味着仍然需要线程上下文切换,所以无法链接在一起
	// 2. 分区器必须是ForwardPartitioner类型,只有这样才能保证上下游关系一对一
	// 3. 执行模式不能是Batch模式
	// 4. 上下游并发必须一样
	// 5. StreamGraph配置了允许执行算子链接
    if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getExchangeMode() != StreamExchangeMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled())) {

        return false;
    }

    // check that we do not have a union operation, because unions currently only work
    // through the network/byte-channel stack.
    // we check that by testing that each "type" (which means input position) is used only once
    for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
        if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
            return false;
        }
    }
    return true;
}

这个方法判断的指标有很多,主要判断逻辑有以下内容:

  1. 需要对应的slotSharingGroup一样,如果不在同一槽位,意味着仍然需要线程上下文切换,所以无法链接在一起
  2. 分区器必须是ForwardPartitioner类型,只有这样才能保证上下游关系一对一
  3. 执行模式不能是Batch模式
  4. 上下游并发必须一样.
  5. StreamGraph配置了允许执行算子链接

基于上述规则判断后,即可判断StreamEdge对应的上下游StreamNode是否可以进行链接。这样的规则也可以指导我们日常中的实际任务开发,比如我们希望算子间能够被链接在一起,提升执行性能的话,就需要让我们的算子间满足上面的规则条件。

创建JobVertex节点

JobVertex 对象的创建是在上面的 createJobVertex() 方法中实现的,这个方法实现比较简单,创建相应的 JobVertex 对象,并把相关的配置信息设置到 JobVertex 对象中就完成了,最终封装在StreamConfig对象中。

connect方法创建JobEdge和IntermediateDataSet对象

上面我们说到,对于不可被链接到同一算子链中的上下游StreamNode是利用connect方法进行连接的,接下来我们就来看看connect方法中执行了什么逻辑,是如何进行算子连接的。

具体的实现源码如下:

private void connect(Integer headOfChain, StreamEdge edge) {

    physicalEdgesInOrder.add(edge);

    Integer downStreamVertexID = edge.getTargetId();
	
	    // 这里 headVertex 指的是 headOfChain 对应的 JobVertex(也是当前 node 对应的 vertex)
    JobVertex headVertex = jobVertices.get(headOfChain);
    JobVertex downStreamVertex = jobVertices.get(downStreamVertexID);

    StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

    // 当前下游节点的输入边数+1
downStreamConfig.setNumberOfNetworkInputs(downStreamConfig.getNumberOfNetworkInputs() + 1);

    StreamPartitioner<?> partitioner = edge.getPartitioner();

    ResultPartitionType resultPartitionType;
    switch (edge.getExchangeMode()) {
        case PIPELINED:
            resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
            break;
        case BATCH:
            resultPartitionType = ResultPartitionType.BLOCKING;
            break;
        case UNDEFINED:
            resultPartitionType = determineResultPartitionType(partitioner);
            break;
        default:
            throw new UnsupportedOperationException(
                    "Data exchange mode " + edge.getExchangeMode() + " is not supported yet.");
    }

    checkBufferTimeout(resultPartitionType, edge);

    JobEdge jobEdge;
	// 如果当前分区器是点对点的,创建下游节点的上游输入节点和连接
	// 具体的连接逻辑即在connectNewDataSetAsInput方法中
    if (partitioner.isPointwise()) {
        jobEdge =
                downStreamVertex.connectNewDataSetAsInput(
                        headVertex, DistributionPattern.POINTWISE, resultPartitionType);
    } else {
        jobEdge =
                downStreamVertex.connectNewDataSetAsInput(
                        headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
    }
    // set strategy name so that web interface can show it.
    jobEdge.setShipStrategyName(partitioner.toString());
    jobEdge.setDownstreamSubtaskStateMapper(partitioner.getDownstreamSubtaskStateMapper());
    jobEdge.setUpstreamSubtaskStateMapper(partitioner.getUpstreamSubtaskStateMapper());

    if (LOG.isDebugEnabled()) {
        LOG.debug(
                "CONNECTED: {} - {} -> {}",
                partitioner.getClass().getSimpleName(),
                headOfChain,
                downStreamVertexID);
    }
}

真正创建JobEdge执行上下游节点连接的地方在downStreamVertex.connectNewDataSetAsInput方法中,进入到该方法中一探究竟:

public JobEdge connectNewDataSetAsInput(
        JobVertex input, DistributionPattern distPattern, ResultPartitionType partitionType) {
	
	// 创建上游节点的输出中间结果集对象
    IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);

	// 创建对应的JobEdge
    JobEdge edge = new JobEdge(dataSet, this, distPattern);
	// 当前对象(downStreamVertex)的inputs属性添加该JobEdge
    this.inputs.add(edge);
	// 上游节点的输出中间结果集添加下游消费者为该edge
    dataSet.addConsumer(edge);
    return edge;
}

上述的代码清晰展示了连接上下游节点的流程,也对应了我们最开始画的StreamGraph到JobGraph转换的拓扑结构图。

在生成JobGraph时,上游节点并不是直接连接到下游节点的,而是存在一个中间结果集对象,表示上游节点的输出,中间结果集再通过JobEdge连接到下游节点。从而构成了JobGraph结构图中的连接关系。

整体的流程可以再次参考文初的转换图,加深对于整体流程的理解和掌握。

JobGraph 的其他配置

执行完 setChaining() 方法后,下面还有几步操作:

  1. setPhysicalEdges(): 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中 (出边集合已经在 setChaining 的时候写入了);
  2. setSlotSharingAndCoLocation(): 为每个 JobVertex 指定所属的 SlotSharingGroup 以及设置 CoLocationGroup;
  3. configureCheckpointing(): checkpoint相关的配置;
  4. JobGraphGenerator.addUserArtifactEntries(): 用户依赖的第三方包就是在这里(cacheFile)传给 JobGraph;

在此不再进行赘述。

至此,StreamGraph转换为JobGraph的具体流程就已经梳理完成了,转换流程中需要重点关注的点就是在这一步会进行算子链的优化,以减少算子间的上下文切换开销以及shuffle开销。有兴趣的可以自己翻阅一下源码,这部分内容虽然看着很长,但只要多看几遍,多debug看看具体的执行流程,基本都可以搞明白一二。

参考:
https://matt33.com/2019/12/09/flink-job-graph-3/
https://www.cnblogs.com/GeQian-hq/p/17880647.html
https://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944865.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker Container 可观测性最佳实践

Docker Container 介绍 Docker Container&#xff08; Docker 容器&#xff09;是一种轻量级、可移植的、自给自足的软件运行环境&#xff0c;它在 Docker 引擎的宿主机上运行。容器在许多方面类似于虚拟机&#xff0c;但它们更轻量&#xff0c;因为它们不需要模拟整个操作系统…

google广告 google分析

这里写自定义目录标题 google广告AFC类型广告AFS类型广告CSE广告RS广告 google分析监听广告点击click事件&#xff08;广告追踪&#xff09; google广告 AFS广告主要是指嵌入在搜索引擎上的广告&#xff0c;用户在进行搜索时看到的广告&#xff0c;与搜索关键词息息相关。 AFC…

【开源免费】基于SpringBoot+Vue.JS网上摄影工作室系统(JAVA毕业设计)

本文项目编号 T 103 &#xff0c;文末自助获取源码 \color{red}{T103&#xff0c;文末自助获取源码} T103&#xff0c;文末自助获取源码 目录 一、系统介绍二、数据库设计三、配套教程3.1 启动教程3.2 讲解视频3.3 二次开发教程 四、功能截图五、文案资料5.1 选题背景5.2 国内…

基于SSM的“电器网上订购系统”的设计与实现(源码+数据库+文档+PPT)

基于SSM的“电器网上订购系统”的设计与实现&#xff08;源码数据库文档PPT) 开发语言&#xff1a;Java 数据库&#xff1a;MySQL 技术&#xff1a;SSM 工具&#xff1a;IDEA/Ecilpse、Navicat、Maven 系统展示 系统首页 商品类型 商品管理 订单展示 商品购物车 登录页面 …

【物联网】给EoRa Pi 烧录Meshtastic

文章目录 一、Meshtastic 是什么&#xff1f;二、Meshtastic 烧录过程1. 在线烧录工具2. 刷机进度 总结 一、Meshtastic 是什么&#xff1f; Meshtastic 是一种基于 LoRa 技术的离网通信平台。它通过低成本、低功耗的无线电设备&#xff0c;实现远距离自组网通信。可在脱离现有…

宝塔-firefox(Docker应用)-构建自己的Web浏览器

安装基础软件 宝塔中安装firefox(Docker应用) 。宝塔中需要先安装docker及docker-composefirefox配置安装 点击firefox应用&#xff0c;选择【安装配置】点击右边绿色按钮&#xff0c;进行安装&#xff0c;这一步等待docker-compose根据你的配置初始化docker应用 等待安装 …

windows 本地node版本快速升级

文章目录 前言一、前置条件二、使用步骤1.查看node 安装位置2.下载指定的node 版本3.下载后进行解压缩4. 删除覆盖原来的node文件夹内容5. 验证 总结 前言 Node.js 是一个开源、跨平台的JavaScript运行时环境&#xff0c;它允许开发者在服务器端运行JavaScript代码。Node.js 基…

HTML4笔记

尚硅谷 一、前序知识 1.认识两位先驱 2.计算机基础知识 3.C/S架构与B/S架构 4.浏览器相关知识 5.网页相关概念 二、HTML简介 1.什么是HTML? 2.相关国际组织(了解) 3.HTML发展历史(了解)** 三、准备工作 1.常用电脑设置 2.安装Chrome浏览器 四、HTML入门 1.HTML初体验 2.H…

跟着逻辑先生学习FPGA-实战篇第二课 6-2 LED灯流水灯实验

** 硬件平台&#xff1a;征战Pro开发板 软件平台&#xff1a;Vivado2018.3 仿真软件&#xff1a;Modelsim10.6d 文本编译器&#xff1a;Notepad** 征战Pro开发板资料 链接:https://pan.baidu.com/s/1AIcnaGBpNLgFT8GG1yC-cA?pwdx3u8 提取码:x3u8 1 知识背景 我们在《LED 灯…

【已解决】Latex中高亮段内命令(如参考文献引用、图、表格)

速览&#xff1a;解决前后图片对比拟解决的问题问题描述Latex高亮的一般做法段内有命令时候的高亮报错 问题原因 解决方案——在导言区为 \cite 等命令“注册”解决方案简要描述详细解释其他情况 速览&#xff1a;解决前后图片对比 解决前&#xff1a; 解决后&#xff1a; …

CSS中的“display“

简单记录一下&#xff0c;看图理解~&#xff08;图片来自于MDN Web&#xff09;

数字图像处理

一 形态学处理 ①二值图像 PS&#xff1a;1&#xff08;255&#xff09;代表的是白 0代表的是黑&#xff08;0就是什么都看不见&#xff0c;就是黑&#xff09; ②灰度图像 ③彩色图像 ④数学形态学基础&#xff1a;是分析几何形状和结构的数学方法&#xff0c;它建立在…

【项目日记(7)】第三层:页缓存的具体实现(上)

目录 前言1. 页缓存的具体结构2. 页缓存分配内存的全过程3. 页缓存分配内存的代码实现4. 优化代码&#xff0c;并完全脱离malloc5. 总结以及代码拓展 前言 在页缓存这一层中&#xff0c;负责给中心缓存分配大块儿的内存&#xff0c;以及合并前后空闲的内存&#xff0c;这一层为…

Python + 深度学习从 0 到 1(03 / 99)

希望对你有帮助呀&#xff01;&#xff01;&#x1f49c;&#x1f49c; 如有更好理解的思路&#xff0c;欢迎大家留言补充 ~ 一起加油叭 &#x1f4a6; 欢迎关注、订阅专栏 【深度学习从 0 到 1】谢谢你的支持&#xff01; ⭐ 神经网络的数据表示 – 张量 你可能对矩阵很熟悉&a…

使用Docker-compose部署SpringCloud项目

docker编写dockfile遇到的问题&#xff1a; 需要在docker-compose.yml文件下执行命令 docker-compose.yml文件格式的问题 1和2处空2格&#xff0c;3处空1格&#xff0c;4为本地配置文件目录&#xff0c;5为docker容器的目录&#xff0c;version为自己安装的docker-compose版本 …

【机器学习】【朴素贝叶斯分类器】从理论到实践:朴素贝叶斯分类器在垃圾短信过滤中的应用

&#x1f31f; 关于我 &#x1f31f; 大家好呀&#xff01;&#x1f44b; 我是一名大三在读学生&#xff0c;目前对人工智能领域充满了浓厚的兴趣&#xff0c;尤其是机器学习、深度学习和自然语言处理这些酷炫的技术&#xff01;&#x1f916;&#x1f4bb; 平时我喜欢动手做实…

Tonghttpserver6.0.1.3 使用整理(by lqw)

文章目录 1.声明2.关于单机版控制台和集中管理控制台3.单机版控制台3.1安装&#xff0c;启动和查看授权信息3.2一些常见的使用问题&#xff08;单机控制台&#xff09;3.3之前使用的是nginx&#xff0c;现在要配nginx.conf上的配置&#xff0c;在THS上如何配置3.4如何配置密码过…

BUUCTF Pwn ciscn_2019_es_2 WP

1.下载 checksec 用IDA32打开 定位main函数 发现了个假的后门函数&#xff1a; 看看vul函数&#xff1a; 使用read读取 想到栈溢出 但是只有48个 只能覆盖EBP和返回地址 长度不够构造 所以使用栈迁移&#xff1a; 栈迁移需要用到leave ret 使用ROPgadget找地址&#xff1a; …

6.若依数据字典

数据字典 维护系统中常见的静态数据&#xff0c;例如&#xff1a;性别、状态等。 好处 不写死在页面上&#xff0c;而是通过数据库来维护&#xff0c;因为如果要修改&#xff0c;则只需要改数据库中的数据即可&#xff0c;不用每个地方都修改了。 字典类型的管理 字典数据的…

JVM学习-内存结构(二)

一、堆 1.定义 2.堆内存溢出问题 1.演示 -Xmx设置堆大小 3.堆内存的诊断 3.1介绍 1&#xff0c;2都是命令行工具&#xff08;可直接在ideal运行时&#xff0c;在底下打开终端&#xff0c;输入命令&#xff09; 1可以拿到Java进程的进程ID&#xff0c;2 jmap只能查询某一个时…