Flink StreamGraph生成过程

文章目录

    • 概要
    • SteramGraph 核心对象
    • SteramGraph 生成过程

概要

在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
在这里插入图片描述
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。

SteramGraph 核心对象

  • StreamNode
    StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
    实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
  • StreamEdge
    StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。

SteramGraph 生成过程

StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中

    @Internal
    public StreamGraph getStreamGraph() {
        return this.getStreamGraph(this.getJobName());
    }

    @Internal
    public StreamGraph getStreamGraph(String jobName) {
        return this.getStreamGraph(jobName, true);
    }

    @Internal
    public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
        StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();
        if (clearTransformations) {
            this.transformations.clear();
        }

        return streamGraph;
    }

    private StreamGraphGenerator getStreamGraphGenerator() {
        if (this.transformations.size() <= 0) {
            throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
        } else {
            RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);
            return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);
        }
    }

StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//

package org.apache.flink.streaming.api.graph;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator.Context;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.ReduceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TimestampsAndWatermarksTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
    public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
    public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC;
    public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
    public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
    private final List<Transformation<?>> transformations;
    private final ExecutionConfig executionConfig;
    private final CheckpointConfig checkpointConfig;
    private final ReadableConfig configuration;
    private StateBackend stateBackend;
    private boolean chaining;
    private Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts;
    private TimeCharacteristic timeCharacteristic;
    private String jobName;
    private SavepointRestoreSettings savepointRestoreSettings;
    private long defaultBufferTimeout;
    private RuntimeExecutionMode runtimeExecutionMode;
    private boolean shouldExecuteInBatchMode;
    private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;
    protected static Integer iterationIdCounter;
    private StreamGraph streamGraph;
    private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;

    public static int getNewIterationNodeId() {
        Integer var0 = iterationIdCounter;
        iterationIdCounter = iterationIdCounter - 1;
        return iterationIdCounter;
    }

    public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
        this(transformations, executionConfig, checkpointConfig, new Configuration());
    }

    public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {
        this.chaining = true;
        this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
        this.jobName = "Flink Streaming Job";
        this.savepointRestoreSettings = SavepointRestoreSettings.none();
        this.defaultBufferTimeout = -1L;
        this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;
        this.transformations = (List)Preconditions.checkNotNull(transformations);
        this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);
        this.checkpointConfig = new CheckpointConfig(checkpointConfig);
        this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);
    }

    public StreamGraphGenerator setRuntimeExecutionMode(RuntimeExecutionMode runtimeExecutionMode) {
        this.runtimeExecutionMode = (RuntimeExecutionMode)Preconditions.checkNotNull(runtimeExecutionMode);
        return this;
    }

    public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {
        this.stateBackend = stateBackend;
        return this;
    }

    public StreamGraphGenerator setChaining(boolean chaining) {
        this.chaining = chaining;
        return this;
    }

    public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts) {
        this.userArtifacts = userArtifacts;
        return this;
    }

    public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
        this.timeCharacteristic = timeCharacteristic;
        return this;
    }

    public StreamGraphGenerator setDefaultBufferTimeout(long defaultBufferTimeout) {
        this.defaultBufferTimeout = defaultBufferTimeout;
        return this;
    }

    public StreamGraphGenerator setJobName(String jobName) {
        this.jobName = jobName;
        return this;
    }

    public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
        this.savepointRestoreSettings = savepointRestoreSettings;
    }

    public StreamGraph generate() {
        this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
        this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);
        this.configureStreamGraph(this.streamGraph);
        this.alreadyTransformed = new HashMap();
        Iterator var1 = this.transformations.iterator();

        while(var1.hasNext()) {
            Transformation<?> transformation = (Transformation)var1.next();
            this.transform(transformation);
        }

        StreamGraph builtStreamGraph = this.streamGraph;
        this.alreadyTransformed.clear();
        this.alreadyTransformed = null;
        this.streamGraph = null;
        return builtStreamGraph;
    }

    private void configureStreamGraph(StreamGraph graph) {
        Preconditions.checkNotNull(graph);
        graph.setChaining(this.chaining);
        graph.setUserArtifacts(this.userArtifacts);
        graph.setTimeCharacteristic(this.timeCharacteristic);
        graph.setJobName(this.jobName);
        if (this.shouldExecuteInBatchMode) {
            if (this.checkpointConfig.isCheckpointingEnabled()) {
                LOG.info("Disabled Checkpointing. Checkpointing is not supported and not needed when executing jobs in BATCH mode.");
                this.checkpointConfig.disableCheckpointing();
            }

            graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
            graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
            this.setDefaultBufferTimeout(-1L);
            this.setBatchStateBackendAndTimerService(graph);
        } else {
            graph.setStateBackend(this.stateBackend);
            graph.setScheduleMode(ScheduleMode.EAGER);
            if (this.checkpointConfig.isApproximateLocalRecoveryEnabled()) {
                this.checkApproximateLocalRecoveryCompatibility();
                graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
            } else {
                graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
            }
        }

    }

    private void checkApproximateLocalRecoveryCompatibility() {
        Preconditions.checkState(!this.checkpointConfig.isUnalignedCheckpointsEnabled(), "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
    }

    private void setBatchStateBackendAndTimerService(StreamGraph graph) {
        boolean useStateBackend = (Boolean)this.configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
        boolean sortInputs = (Boolean)this.configuration.get(ExecutionOptions.SORT_INPUTS);
        Preconditions.checkState(!useStateBackend || sortInputs, "Batch state backend requires the sorted inputs to be enabled!");
        if (useStateBackend) {
            LOG.debug("Using BATCH execution state backend and timer service.");
            graph.setStateBackend(new BatchExecutionStateBackend());
            graph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);
        } else {
            graph.setStateBackend(this.stateBackend);
        }

    }

    private boolean shouldExecuteInBatchMode(RuntimeExecutionMode configuredMode) {
        boolean existsUnboundedSource = this.existsUnboundedSource();
        Preconditions.checkState(configuredMode != RuntimeExecutionMode.BATCH || !existsUnboundedSource, "Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "' to STREAMING or AUTOMATIC");
        if (Preconditions.checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {
            return configuredMode == RuntimeExecutionMode.BATCH;
        } else {
            return !existsUnboundedSource;
        }
    }

    private boolean existsUnboundedSource() {
        return this.transformations.stream().anyMatch((transformation) -> {
            return this.isUnboundedSource(transformation) || transformation.getTransitivePredecessors().stream().anyMatch(this::isUnboundedSource);
        });
    }

    private boolean isUnboundedSource(Transformation<?> transformation) {
        Preconditions.checkNotNull(transformation);
        return transformation instanceof WithBoundedness && ((WithBoundedness)transformation).getBoundedness() != Boundedness.BOUNDED;
    }

    private Collection<Integer> transform(Transformation<?> transform) {
        if (this.alreadyTransformed.containsKey(transform)) {
            return (Collection)this.alreadyTransformed.get(transform);
        } else {
            LOG.debug("Transforming " + transform);
            if (transform.getMaxParallelism() <= 0) {
                int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();
                if (globalMaxParallelismFromConfig > 0) {
                    transform.setMaxParallelism(globalMaxParallelismFromConfig);
                }
            }

            transform.getOutputType();
            TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator)translatorMap.get(transform.getClass());
            Collection transformedIds;
            if (translator != null) {
                transformedIds = this.translate(translator, transform);
            } else {
                transformedIds = this.legacyTransform(transform);
            }

            if (!this.alreadyTransformed.containsKey(transform)) {
                this.alreadyTransformed.put(transform, transformedIds);
            }

            return transformedIds;
        }
    }

    private Collection<Integer> legacyTransform(Transformation<?> transform) {
        Collection transformedIds;
        if (transform instanceof FeedbackTransformation) {
            transformedIds = this.transformFeedback((FeedbackTransformation)transform);
        } else {
            if (!(transform instanceof CoFeedbackTransformation)) {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }

            transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);
        }

        if (transform.getBufferTimeout() >= 0L) {
            this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
        } else {
            this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);
        }

        if (transform.getUid() != null) {
            this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());
        }

        if (transform.getUserProvidedNodeHash() != null) {
            this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
        }

        if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
            throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());
        } else {
            if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
            }

            this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());
            return transformedIds;
        }
    }

    private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
        if (this.shouldExecuteInBatchMode) {
            throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
        } else if (iterate.getFeedbackEdges().size() <= 0) {
            throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
        } else {
            List<Transformation<?>> inputs = iterate.getInputs();
            Preconditions.checkState(inputs.size() == 1);
            Transformation<?> input = (Transformation)inputs.get(0);
            List<Integer> resultIds = new ArrayList();
            Collection<Integer> inputIds = this.transform(input);
            resultIds.addAll(inputIds);
            if (this.alreadyTransformed.containsKey(iterate)) {
                return (Collection)this.alreadyTransformed.get(iterate);
            } else {
                Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());
                StreamNode itSource = (StreamNode)itSourceAndSink.f0;
                StreamNode itSink = (StreamNode)itSourceAndSink.f1;
                this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));
                this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
                resultIds.add(itSource.getId());
                this.alreadyTransformed.put(iterate, resultIds);
                List<Integer> allFeedbackIds = new ArrayList();
                Iterator var10 = iterate.getFeedbackEdges().iterator();

                while(var10.hasNext()) {
                    Transformation<T> feedbackEdge = (Transformation)var10.next();
                    Collection<Integer> feedbackIds = this.transform(feedbackEdge);
                    allFeedbackIds.addAll(feedbackIds);
                    Iterator var13 = feedbackIds.iterator();

                    while(var13.hasNext()) {
                        Integer feedbackId = (Integer)var13.next();
                        this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
                    }
                }

                String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
                if (slotSharingGroup == null) {
                    slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
                }

                itSink.setSlotSharingGroup(slotSharingGroup);
                itSource.setSlotSharingGroup(slotSharingGroup);
                return resultIds;
            }
        }
    }

    private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
        if (this.shouldExecuteInBatchMode) {
            throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
        } else {
            Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());
            StreamNode itSource = (StreamNode)itSourceAndSink.f0;
            StreamNode itSink = (StreamNode)itSourceAndSink.f1;
            this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));
            this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
            Collection<Integer> resultIds = Collections.singleton(itSource.getId());
            this.alreadyTransformed.put(coIterate, resultIds);
            List<Integer> allFeedbackIds = new ArrayList();
            Iterator var7 = coIterate.getFeedbackEdges().iterator();

            while(var7.hasNext()) {
                Transformation<F> feedbackEdge = (Transformation)var7.next();
                Collection<Integer> feedbackIds = this.transform(feedbackEdge);
                allFeedbackIds.addAll(feedbackIds);
                Iterator var10 = feedbackIds.iterator();

                while(var10.hasNext()) {
                    Integer feedbackId = (Integer)var10.next();
                    this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
                }
            }

            String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
            itSink.setSlotSharingGroup(slotSharingGroup);
            itSource.setSlotSharingGroup(slotSharingGroup);
            return Collections.singleton(itSource.getId());
        }
    }

    private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {
        Preconditions.checkNotNull(translator);
        Preconditions.checkNotNull(transform);
        List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());
        if (this.alreadyTransformed.containsKey(transform)) {
            return (Collection)this.alreadyTransformed.get(transform);
        } else {
            String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));
            Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);
            return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);
        }
    }

    private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {
        List<Collection<Integer>> allInputIds = new ArrayList();
        if (parentTransformations == null) {
            return allInputIds;
        } else {
            Iterator var3 = parentTransformations.iterator();

            while(var3.hasNext()) {
                Transformation<?> transformation = (Transformation)var3.next();
                allInputIds.add(this.transform(transformation));
            }

            return allInputIds;
        }
    }

    private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
        if (specifiedGroup != null) {
            return specifiedGroup;
        } else {
            String inputGroup = null;
            Iterator var4 = inputIds.iterator();

            while(var4.hasNext()) {
                int id = (Integer)var4.next();
                String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);
                if (inputGroup == null) {
                    inputGroup = inputGroupCandidate;
                } else if (!inputGroup.equals(inputGroupCandidate)) {
                    return "default";
                }
            }

            return inputGroup == null ? "default" : inputGroup;
        }
    }

    static {
        DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
        Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap();
        tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());
        tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());
        tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());
        tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());
        tmp.put(SourceTransformation.class, new SourceTransformationTranslator());
        tmp.put(SinkTransformation.class, new SinkTransformationTranslator());
        tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());
        tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());
        tmp.put(UnionTransformation.class, new UnionTransformationTranslator());
        tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());
        tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());
        tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());
        tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());
        tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());
        tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());
        translatorMap = Collections.unmodifiableMap(tmp);
        iterationIdCounter = 0;
    }

    private static class ContextImpl implements Context {
        private final StreamGraphGenerator streamGraphGenerator;
        private final StreamGraph streamGraph;
        private final String slotSharingGroup;
        private final ReadableConfig config;

        public ContextImpl(StreamGraphGenerator streamGraphGenerator, StreamGraph streamGraph, String slotSharingGroup, ReadableConfig config) {
            this.streamGraphGenerator = (StreamGraphGenerator)Preconditions.checkNotNull(streamGraphGenerator);
            this.streamGraph = (StreamGraph)Preconditions.checkNotNull(streamGraph);
            this.slotSharingGroup = (String)Preconditions.checkNotNull(slotSharingGroup);
            this.config = (ReadableConfig)Preconditions.checkNotNull(config);
        }

        public StreamGraph getStreamGraph() {
            return this.streamGraph;
        }

        public Collection<Integer> getStreamNodeIds(Transformation<?> transformation) {
            Preconditions.checkNotNull(transformation);
            Collection<Integer> ids = (Collection)this.streamGraphGenerator.alreadyTransformed.get(transformation);
            Preconditions.checkState(ids != null, "Parent transformation \"" + transformation + "\" has not been transformed.");
            return ids;
        }

        public String getSlotSharingGroup() {
            return this.slotSharingGroup;
        }

        public long getDefaultBufferTimeout() {
            return this.streamGraphGenerator.defaultBufferTimeout;
        }

        public ReadableConfig getGraphGeneratorConfig() {
            return this.config;
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/426303.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

远程调用--Http Interface

远程调用--Http Interface 前言1、导入依赖2、定义接口3 创建代理&测试4、创建成配置变量 前言 这个功能是spring boot6提供的新功能&#xff0c;spring允许我们通过自定义接口的方式&#xff0c;给任意位置发送http请求&#xff0c;实现远程调用&#xff0c;可以用来简化…

Android 开发环境搭建的步骤

本文将为您详细讲解 Android 开发环境搭建的步骤。搭建 Android 开发环境需要准备一些软件和工具&#xff0c;以下是一些基础步骤&#xff1a; 1. 安装 Java Development Kit (JDK) 首先&#xff0c;您需要安装 Java Development Kit (JDK)。JDK 是 Android 开发的基础&#xf…

跨平台指南:在 Windows 和 Linux 上安装 OpenSSL 的完整流程

Windows安装 一&#xff1a;找到安装包&#xff0c;双击即可 https://gitee.com/wake-up-again/installation-package.git 二&#xff1a;按照提示&#xff0c;一步一步来&#xff0c;就可以啦 三&#xff1a;此界面意思是&#xff0c;是否想向创作者捐款&#xff0c;自己视情…

Ubuntu20.04: UE4.27 中 Source Code 的编辑器下拉框没有 Rider选项

问题描述 最近想用 Rider 作为 UE4 开发的 IDE&#xff0c;但安装好 Rider 后&#xff0c;发现编辑器下拉框中没有 Rider 的选项&#xff0c;我检查了 UE4 的插件&#xff0c;发现 Rider Integration 插件已经安装且启用的。 环境&#xff1a;Ubuntu 20.04 UE4.27 Rider2023…

3、Redis Cluster集群运维与核心原理剖析

Redis集群方案比较 哨兵模式 在redis3.0以前的版本要实现集群一般是借助哨兵sentinel工具来监控master节点的状态&#xff0c;如果master节点异常&#xff0c;则会做主从切换&#xff0c;将某一台slave作为master&#xff0c;哨兵的配置略微复杂&#xff0c;并且性能和高可用性…

企业计算机服务器中了360勒索病毒如何解密,360后缀勒索病毒处理流程

对于众多的企业来说&#xff0c;企业的数据是企业发展的核心&#xff0c;越来越多的企业开始注重企业的数据安全问题&#xff0c;但随着网络技术的不断发展与应用&#xff0c;网络黑客的攻击加密手段也在不断升级。近期&#xff0c;云天数据恢复中心接到多家企业的求助&#xf…

【深入理解设计模式】桥接设计模式

桥接设计模式 桥接设计模式是一种结构型设计模式&#xff0c;它旨在将抽象部分与实现部分分离&#xff0c;使它们可以独立变化&#xff0c;从而更好地管理复杂性。桥接模式通常涉及多个层次的抽象&#xff0c;其中一个层次&#xff08;通常称为"抽象"&#xff09;依…

YOLO-World 简单无需标注无需训练直接可以使用的检测模型

参考: https://github.com/AILab-CVC/YOLO-World YOLO-World 常规的label基本不用训练,直接传入图片,然后写入文本label提示既可 案例demo: 1)官方提供 https://huggingface.co/spaces/stevengrove/YOLO-World https://huggingface.co/spaces/SkalskiP/YOLO-World 检测…

javaWebssh在线授课辅导系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 java ssh在线授课辅导系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用 B/S模式开发。开发环境为TOMCAT7.…

Spring框架学习

Spring&#xff1a; &#xff08;1&#xff09;Bean线程安全问题 &#xff08;2&#xff09;AOP&#xff0c;事务原理&#xff0c;事务失败 &#xff08;3&#xff09;Bean的生命周期 &#xff08;4&#xff09;循环依赖 SpringMVC&#xff1a; &#xff08;1&#xff09…

技术小知识:面向对象和过程的区别 ⑤

一、思想区别 面相对象&#xff1a;始终把所有事情思考归类、抽离封装成对象来调用完成。 面向过程&#xff1a;直接平铺展开按顺序执行完成任务。 面向对象多了很多对象的创建、使用&#xff0c;销毁的过程资源消耗。是一种模块化编程思想。 https://www.cnblogs.com/kuangmen…

为何要使用流媒体服务器

安防系统中&#xff0c;我们偶尔会遇到“流媒体服务器”这个词&#xff0c;那么为什么需要这个服务呢&#xff1f; 视频监控 我们知道&#xff0c;监控摄像机的工作原理就是将自然界的光影&#xff0c;通过摄像机镜头对焦到“靶芯”&#xff08;CMOS&#xff09;&#xff0c;实…

mysql8修改密码

mysql8.0修改密码 windows下忘了MySQL8.0的密码&#xff0c;可以通过以下方式修改。 1、管理员方式打开cmd命令窗口 输入&#xff1a; net stop mysql接着输入&#xff1a; mysqld --console --skip-grant-tables --shared-memory2、管理员方式打开另外一个cmd窗口 输入&…

nvm安装和使用保姆级教程(详细)

一、 nvm是什么 &#xff1a; nvm全英文也叫node.js version management&#xff0c;是一个nodejs的版本管理工具。nvm和npm都是node.js版本管理工具&#xff0c;为了解决node.js各种版本存在不兼容现象可以通过它可以安装和切换不同版本的node.js。 二、卸载之前安装的node: …

c++之通讯录管理系统

1&#xff0c;系统需求 通讯录是一个记录亲人&#xff0c;好友信息的工具 系统中需要实现的功能如下&#xff1a; 1&#xff0c;添加联系人&#xff1a;向通讯录中添加新人&#xff0c;信息包括&#xff08;姓名&#xff0c;性别&#xff0c;年龄&#xff0c;联系电话&#…

基于SpringBoot的企业头条管理系统

文章目录 项目介绍主要功能截图&#xff1a;部分代码展示设计总结项目获取方式 &#x1f345; 作者主页&#xff1a;超级无敌暴龙战士塔塔开 &#x1f345; 简介&#xff1a;Java领域优质创作者&#x1f3c6;、 简历模板、学习资料、面试题库【关注我&#xff0c;都给你】 &…

Libevent的使用及reactor模型

Libevent 是一个用C语言编写的、轻量级的开源高性能事件通知库&#xff0c;主要有以下几个亮点&#xff1a;事件驱动&#xff08; event-driven&#xff09;&#xff0c;高性能;轻量级&#xff0c;专注于网络&#xff0c;不如 ACE 那么臃肿庞大&#xff1b;源代码相当精炼、易读…

深入理解抽象工厂模式:原理、应用与优缺点分析

文章目录 **一、模式原理****二、使用场景****三、为何使用抽象工厂模式**四、代码示例**五、优点与缺点****总结** 一、模式原理 ​ 抽象工厂模式是一种创建型设计模式&#xff0c;其核心思想在于通过抽象工厂接口提供一个创建一系列相关或相互依赖对象的接口&#xff0c;而不…

Rust结构体讲解学习,以及impl结构体方法和结构体关联函数

Rust 中的结构体&#xff08;Struct&#xff09;与元组&#xff08;Tuple&#xff09;都可以将若干个类型不一定相同的数据捆绑在一起形成整体&#xff0c;但结构体的每个成员和其本身都有一个名字&#xff0c;这样访问它成员的时候就不用记住下标了。元组常用于非定义的多值传…

十六、异常和File

异常和File 一、异常1.1异常的分类1.2 异常的作用1.3 异常的处理方式1.3.1 JVM默认的处理方式1.3.2 自己处理&#xff08;捕获异常&#xff09;1.3.3 自己处理&#xff08;灵魂四问&#xff09; 1.4 异常中的常见方法1.5 抛出异常综合练习&#xff08;键盘录入数据&#xff09;…