文章目录
- 1.OperatorChain的设计与实现
- 2.OperatorChain的创建和初始化
- 3.创建RecordWriterOutput
1.OperatorChain的设计与实现
OperatorChain的大致逻辑
在JobGraph对象的创建过程中,将链化可以
连在一起的算子
,常见的有StreamMap、StreamFilter等类型的算子。OperatorChain中的所有算子都会被运行在同一个Task实例中
。StreamTaskNetworkOutput会将接入的数据元素
写入算子链的HeadOperator
中,从而开启整个OperatorChain的数据处理。
OperatorChain的Output组件:将数据发送到下游
如图所示,在OperatorChain中通过
Output组件
将上下游算子相连,当上游算子数据处理完毕后,会通过Output组件发送到下游的算子中继续处理。
OperatorChain的collect():收集处理完的数据
如图所示,OperatorChain内部定义了WatermarkGaugeExposingOutput接口,且该接口分别继承了Output和Collector接口。Collector接口提供了collect()方法,用于收集处理完的数据。
OperatorChain的Output接口:也能输出Watermark和LatencyMarker等事件
Output接口提供了emitWatermark()、emitLatencyMarker()等方法,用于对Collector接口进行拓展,使得Output接口实现类可以输出Watermark和LatencyMarker等事件。WatermarkGaugeExposingOutput接口则提供了获取WatermarkGauge的方法,用于监控最新的Watermark。
OperatorChain内部定义了不同的WatermarkGaugeExposingOutput接口实现类。
- RecordWriterOutput:用于输出OperatorChain中尾端算子处理完成的数据,借助RecordWriter组件
将数据元素写入网络
。- ChainingOutput/CopyingChainingOutput:适用于
上下游算子连接在一起
且上游算子属于单输出类型的情况。- BroadcastingOutputCollector/CopyingBroadcastingOutputCollector:上游算子是多输出类型但上下游算子之间的Selector为空时,创建广播类型的BroadcastingOutputCollector。
- DirectedOutput/CopyingDirectedOutput:上游算子是多输出类型且Selector不为空时,创建DirectedOutput或CopyingDirectedOutput
连接上下游算子
。
例子:收集数据并通过Output发数据数据到下游
例如在WordCount的程序中定义flatMap()方法时,会调用Collector.collect()方法收集数据元素,每个算子在定义的函数或使用Output接口的实现类中,完成了
上游算子向下游算子发送数据元素的操作
。
2.OperatorChain的创建和初始化
接下来我们看OperatorChain的初始化过程,如下代码,OperatorChain的构造器包含如下逻辑。
- 创建StreamOperator(即算子)实例,这里StreamOperator会封装为StreamOperatorFactory并存储在StreamGraph结构中。
- 获取算子之间的链接配置。chainedConfigs的配置决定了算子之间Output接口的具体实现。
- 遍历当前作业所有节点的
输出边
,并构建RecordWriterOutput组件,最终通过RecordWriterOutput组件将数据元素输出到网络中。- 创建OperatorChain内部算子之间的上下游连接,完成OperatorChain内部
上下游算子之间的数据传输
。- 单独创建headOperator。
headOperator
是OperatorChain的头部节点,创建完成后将headOperator暴露到StreamTask实例,供DataOutput接口实现类调用
。- 如果OperatorChain构建失败,则关闭实例,防止出现内存泄漏。
public OperatorChain(
StreamTask<OUT, OP> containingTask,
RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>>
recordWriterDelegate
) {
// 获取当前StreamTask的userCodeClassloader
final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
// 获取StreamConfig
final StreamConfig configuration = containingTask.getConfiguration();
// 获取StreamOperatorFactory
StreamOperatorFactory<OUT> operatorFactory =
configuration.getStreamOperatorFactory(userCodeClassloader);
// 读取chainedConfigs
Map<Integer, StreamConfig> chainedConfigs =
configuration.getTransitiveChainedTaskConfigsWithSelf(userCodeClassloader);
// 根据StreamEdge创建RecordWriterOutput组件
List<StreamEdge> outEdgesInOrder =
configuration.getOutEdgesInOrder(userCodeClassloader);
Map<StreamEdge, RecordWriterOutput<?>> streamOutputMap =
new HashMap<>(outEdgesInOrder.size());
this.streamOutputs = new RecordWriterOutput<?>[outEdgesInOrder.size()];
boolean success = false;
try {
for (int i = 0; i < outEdgesInOrder.size(); i++) {
StreamEdge outEdge = outEdgesInOrder.get(i);
// 为每个输出边创建RecordWriterOutput
RecordWriterOutput<?> streamOutput = createStreamOutput(
recordWriterDelegate.getRecordWriter(i),
outEdge,
chainedConfigs.get(outEdge.getSourceId()),
containingTask.getEnvironment());
this.streamOutputs[i] = streamOutput;
streamOutputMap.put(outEdge, streamOutput);
}
// 创建OperatorChain内部算子之间的连接
List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
this.chainEntryPoint = createOutputCollector(
containingTask,
configuration,
chainedConfigs,
userCodeClassloader,
streamOutputMap,
allOps,
containingTask.getMailboxExecutorFactory());
if (operatorFactory != null) {
WatermarkGaugeExposingOutput<StreamRecord<OUT>> output =
getChainEntryPoint();
// 创建headOperator
headOperator = StreamOperatorFactoryUtil.createOperator(
operatorFactory,
containingTask,
configuration,
output);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK,
output.getWatermarkGauge());
} else {
headOperator = null;
}
allOps.add(headOperator);
this.allOperators = allOps.toArray(new StreamOperator<?>[allOps.size()]);
success = true;
}
finally {
// 如果创建不成功,则关闭StreamOutputs中的RecordWriterOutput
// 这里防止内存泄漏
if (!success) {
for (RecordWriterOutput<?> output : this.streamOutputs) {
if (output != null) {
output.close();
}
}
}
}
}
OperatorChain作用小结
当OperatorChain创建完成后,就能正常
接收StreamTaskInput中的数据元素了
。在OperatorChain内部算子之间进行数据传递和处理,最终通过RecordWriterOutput
组件将处理完成的数据元素发送到网络中,供下游的Task实例使用。
对于OperatorChain内部Output接口的实现,这里暂不展开。
3.创建RecordWriterOutput
RecordWriterOutput用于将数据输出到网络指定位置。
OperatorChain.createStreamOutput()逻辑如下:
- 获取输出边的OutputTag标签,判断当前Stream节点输出边是否为
旁路输出
,即在DataStream API中是否使用了旁路输出的相关方法。- 返回RecordWriterOutput。RecordWriterOutput中包含RecordWriter组件,最终会通过
RecordWriter
将算子链处理完成的数据写入网络。
private RecordWriterOutput<OUT> createStreamOutput(
RecordWriter<SerializationDelegate<StreamRecord<OUT>>> recordWriter,
StreamEdge edge,
StreamConfig upStreamConfig,
Environment taskEnvironment) {
// 获取OutputTag
OutputTag sideOutputTag = edge.getOutputTag();
// 获取数据序列化器TypeSerializer
TypeSerializer outSerializer = null;
// 如果StreamEdge指定了OutputTag
if (edge.getOutputTag() != null) {
// 则进行边路输出
outSerializer = upStreamConfig.getTypeSerializerSideOut(
edge.getOutputTag(), taskEnvironment.getUserClassLoader());
} else {
// 正常输出
outSerializer =
upStreamConfig.getTypeSerializerOut(taskEnvironment.getUserClassLoader());
}
// 返回创建的RecordWriterOutput实例
return new RecordWriterOutput<>(recordWriter, outSerializer, sideOutputTag, this);
}
StreamRecord将数据输出的逻辑
在RecordWriterOutput.collect()方法中定义了StreamRecord数据的输出逻辑,实际上是
调用pushToRecordWriter()方法将数据写入RecordWriter,最终通过RecordWriter组件进行数据元素的网络输出
。
public void collect(StreamRecord<OUT> record) {
if (this.outputTag != null) {
return;
}
pushToRecordWriter(record);
}
pushToRecordWriter发送数据
- 调用serializationDelegate.setInstance()方法,对接入的数据元素进行序列化操作,将数据元素转换成二进制格式。
- 调用recordWriter.emit()方法通过RecordWriter组件将serializationDelegate中序列化后的二进制数据输出到下游网络中。
//RecordWriterOutput.pushToRecordWriter()
private <X> void pushToRecordWriter(StreamRecord<X> record) {
serializationDelegate.setInstance(record);
try {
recordWriter.emit(serializationDelegate);
}
catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
}