文章目录
- 概要
- SteramGraph 核心对象
- SteramGraph 生成过程
概要
在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。
SteramGraph 核心对象
- StreamNode
StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。
实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。 - StreamEdge
StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。
SteramGraph 生成过程
StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中
@Internal
public StreamGraph getStreamGraph() {
return this.getStreamGraph(this.getJobName());
}
@Internal
public StreamGraph getStreamGraph(String jobName) {
return this.getStreamGraph(jobName, true);
}
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = this.getStreamGraphGenerator().setJobName(jobName).generate();
if (clearTransformations) {
this.transformations.clear();
}
return streamGraph;
}
private StreamGraphGenerator getStreamGraphGenerator() {
if (this.transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
} else {
RuntimeExecutionMode executionMode = (RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);
return (new StreamGraphGenerator(this.transformations, this.config, this.checkpointCfg, this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);
}
}
StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by FernFlower decompiler)
//
package org.apache.flink.streaming.api.graph;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.cache.DistributedCache.DistributedCacheEntry;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator.Context;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionInternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.sorted.state.BatchExecutionStateBackend;
import org.apache.flink.streaming.api.transformations.BroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.CoFeedbackTransformation;
import org.apache.flink.streaming.api.transformations.FeedbackTransformation;
import org.apache.flink.streaming.api.transformations.KeyedBroadcastStateTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.LegacySourceTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.api.transformations.TimestampsAndWatermarksTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
import org.apache.flink.streaming.api.transformations.UnionTransformation;
import org.apache.flink.streaming.api.transformations.WithBoundedness;
import org.apache.flink.streaming.runtime.translators.BroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.KeyedBroadcastStateTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.LegacySourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.MultiInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.OneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.PartitionTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.ReduceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SideOutputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SinkTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.SourceTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TimestampsAndWatermarksTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.TwoInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.UnionTransformationTranslator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Internal
public class StreamGraphGenerator {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraphGenerator.class);
public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = 128;
public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC;
public static final String DEFAULT_JOB_NAME = "Flink Streaming Job";
public static final String DEFAULT_SLOT_SHARING_GROUP = "default";
private final List<Transformation<?>> transformations;
private final ExecutionConfig executionConfig;
private final CheckpointConfig checkpointConfig;
private final ReadableConfig configuration;
private StateBackend stateBackend;
private boolean chaining;
private Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts;
private TimeCharacteristic timeCharacteristic;
private String jobName;
private SavepointRestoreSettings savepointRestoreSettings;
private long defaultBufferTimeout;
private RuntimeExecutionMode runtimeExecutionMode;
private boolean shouldExecuteInBatchMode;
private static final Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> translatorMap;
protected static Integer iterationIdCounter;
private StreamGraph streamGraph;
private Map<Transformation<?>, Collection<Integer>> alreadyTransformed;
public static int getNewIterationNodeId() {
Integer var0 = iterationIdCounter;
iterationIdCounter = iterationIdCounter - 1;
return iterationIdCounter;
}
public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig) {
this(transformations, executionConfig, checkpointConfig, new Configuration());
}
public StreamGraphGenerator(List<Transformation<?>> transformations, ExecutionConfig executionConfig, CheckpointConfig checkpointConfig, ReadableConfig configuration) {
this.chaining = true;
this.timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC;
this.jobName = "Flink Streaming Job";
this.savepointRestoreSettings = SavepointRestoreSettings.none();
this.defaultBufferTimeout = -1L;
this.runtimeExecutionMode = RuntimeExecutionMode.STREAMING;
this.transformations = (List)Preconditions.checkNotNull(transformations);
this.executionConfig = (ExecutionConfig)Preconditions.checkNotNull(executionConfig);
this.checkpointConfig = new CheckpointConfig(checkpointConfig);
this.configuration = (ReadableConfig)Preconditions.checkNotNull(configuration);
}
public StreamGraphGenerator setRuntimeExecutionMode(RuntimeExecutionMode runtimeExecutionMode) {
this.runtimeExecutionMode = (RuntimeExecutionMode)Preconditions.checkNotNull(runtimeExecutionMode);
return this;
}
public StreamGraphGenerator setStateBackend(StateBackend stateBackend) {
this.stateBackend = stateBackend;
return this;
}
public StreamGraphGenerator setChaining(boolean chaining) {
this.chaining = chaining;
return this;
}
public StreamGraphGenerator setUserArtifacts(Collection<Tuple2<String, DistributedCacheEntry>> userArtifacts) {
this.userArtifacts = userArtifacts;
return this;
}
public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
this.timeCharacteristic = timeCharacteristic;
return this;
}
public StreamGraphGenerator setDefaultBufferTimeout(long defaultBufferTimeout) {
this.defaultBufferTimeout = defaultBufferTimeout;
return this;
}
public StreamGraphGenerator setJobName(String jobName) {
this.jobName = jobName;
return this;
}
public void setSavepointRestoreSettings(SavepointRestoreSettings savepointRestoreSettings) {
this.savepointRestoreSettings = savepointRestoreSettings;
}
public StreamGraph generate() {
this.streamGraph = new StreamGraph(this.executionConfig, this.checkpointConfig, this.savepointRestoreSettings);
this.shouldExecuteInBatchMode = this.shouldExecuteInBatchMode(this.runtimeExecutionMode);
this.configureStreamGraph(this.streamGraph);
this.alreadyTransformed = new HashMap();
Iterator var1 = this.transformations.iterator();
while(var1.hasNext()) {
Transformation<?> transformation = (Transformation)var1.next();
this.transform(transformation);
}
StreamGraph builtStreamGraph = this.streamGraph;
this.alreadyTransformed.clear();
this.alreadyTransformed = null;
this.streamGraph = null;
return builtStreamGraph;
}
private void configureStreamGraph(StreamGraph graph) {
Preconditions.checkNotNull(graph);
graph.setChaining(this.chaining);
graph.setUserArtifacts(this.userArtifacts);
graph.setTimeCharacteristic(this.timeCharacteristic);
graph.setJobName(this.jobName);
if (this.shouldExecuteInBatchMode) {
if (this.checkpointConfig.isCheckpointingEnabled()) {
LOG.info("Disabled Checkpointing. Checkpointing is not supported and not needed when executing jobs in BATCH mode.");
this.checkpointConfig.disableCheckpointing();
}
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
graph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST);
this.setDefaultBufferTimeout(-1L);
this.setBatchStateBackendAndTimerService(graph);
} else {
graph.setStateBackend(this.stateBackend);
graph.setScheduleMode(ScheduleMode.EAGER);
if (this.checkpointConfig.isApproximateLocalRecoveryEnabled()) {
this.checkApproximateLocalRecoveryCompatibility();
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED_APPROXIMATE);
} else {
graph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
}
}
}
private void checkApproximateLocalRecoveryCompatibility() {
Preconditions.checkState(!this.checkpointConfig.isUnalignedCheckpointsEnabled(), "Approximate Local Recovery and Unaligned Checkpoint can not be used together yet");
}
private void setBatchStateBackendAndTimerService(StreamGraph graph) {
boolean useStateBackend = (Boolean)this.configuration.get(ExecutionOptions.USE_BATCH_STATE_BACKEND);
boolean sortInputs = (Boolean)this.configuration.get(ExecutionOptions.SORT_INPUTS);
Preconditions.checkState(!useStateBackend || sortInputs, "Batch state backend requires the sorted inputs to be enabled!");
if (useStateBackend) {
LOG.debug("Using BATCH execution state backend and timer service.");
graph.setStateBackend(new BatchExecutionStateBackend());
graph.setTimerServiceProvider(BatchExecutionInternalTimeServiceManager::create);
} else {
graph.setStateBackend(this.stateBackend);
}
}
private boolean shouldExecuteInBatchMode(RuntimeExecutionMode configuredMode) {
boolean existsUnboundedSource = this.existsUnboundedSource();
Preconditions.checkState(configuredMode != RuntimeExecutionMode.BATCH || !existsUnboundedSource, "Detected an UNBOUNDED source with the '" + ExecutionOptions.RUNTIME_MODE.key() + "' set to 'BATCH'. This combination is not allowed, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "' to STREAMING or AUTOMATIC");
if (Preconditions.checkNotNull(configuredMode) != RuntimeExecutionMode.AUTOMATIC) {
return configuredMode == RuntimeExecutionMode.BATCH;
} else {
return !existsUnboundedSource;
}
}
private boolean existsUnboundedSource() {
return this.transformations.stream().anyMatch((transformation) -> {
return this.isUnboundedSource(transformation) || transformation.getTransitivePredecessors().stream().anyMatch(this::isUnboundedSource);
});
}
private boolean isUnboundedSource(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation);
return transformation instanceof WithBoundedness && ((WithBoundedness)transformation).getBoundedness() != Boundedness.BOUNDED;
}
private Collection<Integer> transform(Transformation<?> transform) {
if (this.alreadyTransformed.containsKey(transform)) {
return (Collection)this.alreadyTransformed.get(transform);
} else {
LOG.debug("Transforming " + transform);
if (transform.getMaxParallelism() <= 0) {
int globalMaxParallelismFromConfig = this.executionConfig.getMaxParallelism();
if (globalMaxParallelismFromConfig > 0) {
transform.setMaxParallelism(globalMaxParallelismFromConfig);
}
}
transform.getOutputType();
TransformationTranslator<?, Transformation<?>> translator = (TransformationTranslator)translatorMap.get(transform.getClass());
Collection transformedIds;
if (translator != null) {
transformedIds = this.translate(translator, transform);
} else {
transformedIds = this.legacyTransform(transform);
}
if (!this.alreadyTransformed.containsKey(transform)) {
this.alreadyTransformed.put(transform, transformedIds);
}
return transformedIds;
}
}
private Collection<Integer> legacyTransform(Transformation<?> transform) {
Collection transformedIds;
if (transform instanceof FeedbackTransformation) {
transformedIds = this.transformFeedback((FeedbackTransformation)transform);
} else {
if (!(transform instanceof CoFeedbackTransformation)) {
throw new IllegalStateException("Unknown transformation: " + transform);
}
transformedIds = this.transformCoFeedback((CoFeedbackTransformation)transform);
}
if (transform.getBufferTimeout() >= 0L) {
this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
} else {
this.streamGraph.setBufferTimeout(transform.getId(), this.defaultBufferTimeout);
}
if (transform.getUid() != null) {
this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}
if (!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled() && transform instanceof PhysicalTransformation && transform.getUserProvidedNodeHash() == null && transform.getUid() == null) {
throw new IllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator " + transform.getName());
} else {
if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
}
this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());
return transformedIds;
}
}
private <T> Collection<Integer> transformFeedback(FeedbackTransformation<T> iterate) {
if (this.shouldExecuteInBatchMode) {
throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
} else if (iterate.getFeedbackEdges().size() <= 0) {
throw new IllegalStateException("Iteration " + iterate + " does not have any feedback edges.");
} else {
List<Transformation<?>> inputs = iterate.getInputs();
Preconditions.checkState(inputs.size() == 1);
Transformation<?> input = (Transformation)inputs.get(0);
List<Integer> resultIds = new ArrayList();
Collection<Integer> inputIds = this.transform(input);
resultIds.addAll(inputIds);
if (this.alreadyTransformed.containsKey(iterate)) {
return (Collection)this.alreadyTransformed.get(iterate);
} else {
Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(iterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());
StreamNode itSource = (StreamNode)itSourceAndSink.f0;
StreamNode itSink = (StreamNode)itSourceAndSink.f1;
this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));
this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
resultIds.add(itSource.getId());
this.alreadyTransformed.put(iterate, resultIds);
List<Integer> allFeedbackIds = new ArrayList();
Iterator var10 = iterate.getFeedbackEdges().iterator();
while(var10.hasNext()) {
Transformation<T> feedbackEdge = (Transformation)var10.next();
Collection<Integer> feedbackIds = this.transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
Iterator var13 = feedbackIds.iterator();
while(var13.hasNext()) {
Integer feedbackId = (Integer)var13.next();
this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
}
}
String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
if (slotSharingGroup == null) {
slotSharingGroup = "SlotSharingGroup-" + iterate.getId();
}
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return resultIds;
}
}
}
private <F> Collection<Integer> transformCoFeedback(CoFeedbackTransformation<F> coIterate) {
if (this.shouldExecuteInBatchMode) {
throw new UnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '" + ExecutionOptions.RUNTIME_MODE.key() + "'=" + RuntimeExecutionMode.STREAMING.name());
} else {
Tuple2<StreamNode, StreamNode> itSourceAndSink = this.streamGraph.createIterationSourceAndSink(coIterate.getId(), getNewIterationNodeId(), getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());
StreamNode itSource = (StreamNode)itSourceAndSink.f0;
StreamNode itSink = (StreamNode)itSourceAndSink.f1;
this.streamGraph.setSerializers(itSource.getId(), (TypeSerializer)null, (TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));
this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig), (TypeSerializer)null, (TypeSerializer)null);
Collection<Integer> resultIds = Collections.singleton(itSource.getId());
this.alreadyTransformed.put(coIterate, resultIds);
List<Integer> allFeedbackIds = new ArrayList();
Iterator var7 = coIterate.getFeedbackEdges().iterator();
while(var7.hasNext()) {
Transformation<F> feedbackEdge = (Transformation)var7.next();
Collection<Integer> feedbackIds = this.transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);
Iterator var10 = feedbackIds.iterator();
while(var10.hasNext()) {
Integer feedbackId = (Integer)var10.next();
this.streamGraph.addEdge(feedbackId, itSink.getId(), 0);
}
}
String slotSharingGroup = this.determineSlotSharingGroup((String)null, allFeedbackIds);
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);
return Collections.singleton(itSource.getId());
}
}
private Collection<Integer> translate(TransformationTranslator<?, Transformation<?>> translator, Transformation<?> transform) {
Preconditions.checkNotNull(translator);
Preconditions.checkNotNull(transform);
List<Collection<Integer>> allInputIds = this.getParentInputIds(transform.getInputs());
if (this.alreadyTransformed.containsKey(transform)) {
return (Collection)this.alreadyTransformed.get(transform);
} else {
String slotSharingGroup = this.determineSlotSharingGroup(transform.getSlotSharingGroup(), (Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));
Context context = new StreamGraphGenerator.ContextImpl(this, this.streamGraph, slotSharingGroup, this.configuration);
return this.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context) : translator.translateForStreaming(transform, context);
}
}
private List<Collection<Integer>> getParentInputIds(@Nullable Collection<Transformation<?>> parentTransformations) {
List<Collection<Integer>> allInputIds = new ArrayList();
if (parentTransformations == null) {
return allInputIds;
} else {
Iterator var3 = parentTransformations.iterator();
while(var3.hasNext()) {
Transformation<?> transformation = (Transformation)var3.next();
allInputIds.add(this.transform(transformation));
}
return allInputIds;
}
}
private String determineSlotSharingGroup(String specifiedGroup, Collection<Integer> inputIds) {
if (specifiedGroup != null) {
return specifiedGroup;
} else {
String inputGroup = null;
Iterator var4 = inputIds.iterator();
while(var4.hasNext()) {
int id = (Integer)var4.next();
String inputGroupCandidate = this.streamGraph.getSlotSharingGroup(id);
if (inputGroup == null) {
inputGroup = inputGroupCandidate;
} else if (!inputGroup.equals(inputGroupCandidate)) {
return "default";
}
}
return inputGroup == null ? "default" : inputGroup;
}
}
static {
DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
Map<Class<? extends Transformation>, TransformationTranslator<?, ? extends Transformation>> tmp = new HashMap();
tmp.put(OneInputTransformation.class, new OneInputTransformationTranslator());
tmp.put(TwoInputTransformation.class, new TwoInputTransformationTranslator());
tmp.put(MultipleInputTransformation.class, new MultiInputTransformationTranslator());
tmp.put(KeyedMultipleInputTransformation.class, new MultiInputTransformationTranslator());
tmp.put(SourceTransformation.class, new SourceTransformationTranslator());
tmp.put(SinkTransformation.class, new SinkTransformationTranslator());
tmp.put(LegacySinkTransformation.class, new LegacySinkTransformationTranslator());
tmp.put(LegacySourceTransformation.class, new LegacySourceTransformationTranslator());
tmp.put(UnionTransformation.class, new UnionTransformationTranslator());
tmp.put(PartitionTransformation.class, new PartitionTransformationTranslator());
tmp.put(SideOutputTransformation.class, new SideOutputTransformationTranslator());
tmp.put(ReduceTransformation.class, new ReduceTransformationTranslator());
tmp.put(TimestampsAndWatermarksTransformation.class, new TimestampsAndWatermarksTransformationTranslator());
tmp.put(BroadcastStateTransformation.class, new BroadcastStateTransformationTranslator());
tmp.put(KeyedBroadcastStateTransformation.class, new KeyedBroadcastStateTransformationTranslator());
translatorMap = Collections.unmodifiableMap(tmp);
iterationIdCounter = 0;
}
private static class ContextImpl implements Context {
private final StreamGraphGenerator streamGraphGenerator;
private final StreamGraph streamGraph;
private final String slotSharingGroup;
private final ReadableConfig config;
public ContextImpl(StreamGraphGenerator streamGraphGenerator, StreamGraph streamGraph, String slotSharingGroup, ReadableConfig config) {
this.streamGraphGenerator = (StreamGraphGenerator)Preconditions.checkNotNull(streamGraphGenerator);
this.streamGraph = (StreamGraph)Preconditions.checkNotNull(streamGraph);
this.slotSharingGroup = (String)Preconditions.checkNotNull(slotSharingGroup);
this.config = (ReadableConfig)Preconditions.checkNotNull(config);
}
public StreamGraph getStreamGraph() {
return this.streamGraph;
}
public Collection<Integer> getStreamNodeIds(Transformation<?> transformation) {
Preconditions.checkNotNull(transformation);
Collection<Integer> ids = (Collection)this.streamGraphGenerator.alreadyTransformed.get(transformation);
Preconditions.checkState(ids != null, "Parent transformation \"" + transformation + "\" has not been transformed.");
return ids;
}
public String getSlotSharingGroup() {
return this.slotSharingGroup;
}
public long getDefaultBufferTimeout() {
return this.streamGraphGenerator.defaultBufferTimeout;
}
public ReadableConfig getGraphGeneratorConfig() {
return this.config;
}
}
}