这里以flatMap,filter为例,介绍Flink如果要实现这些基本转换需要实现哪些接口,Flink运行时调用这些实现类的入口,这些基本转换函数之间的类关系
- 一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
- 1、RichFlatMapFunction
- 2、RichFilterFunction
- 二、Flink把实现了flatMap,filter功能的类加入到作业中
- 三、Flink运行时如何调用flatMap和filter的实现类的
- 四、类关系图
一、创建基本转换函数需要实现类继承AbstractRichFunction并实现特性接口
1、RichFlatMapFunction
@Public
public abstract class RichFlatMapFunction<IN, OUT> extends AbstractRichFunction implements FlatMapFunction<IN, OUT> {
private static final long serialVersionUID = 1L;
public RichFlatMapFunction() {
}
//需要实现下面这个方法
public abstract void flatMap(IN var1, Collector<OUT> var2) throws Exception;
}
只需要实现类继承了RichFlatMapFunction
,实现了flatMap
方法就可以
2、RichFilterFunction
@Public
public abstract class RichFilterFunction<T> extends AbstractRichFunction implements FilterFunction<T> {
private static final long serialVersionUID = 1L;
public RichFilterFunction() {
}
//需要实现下面这个类
public abstract boolean filter(T var1) throws Exception;
}
只需要实现类继承了RichFilterFunction
,实现了filter
方法就可以
二、Flink把实现了flatMap,filter功能的类加入到作业中
一般是通过如下代码
DataStream<Row> dateStream = 来自source的数据流
dateStream.flatMap(extend RichFlatMapFunction的子类);
dateStream.filter(extend RichFilterFunction的子类);
三、Flink运行时如何调用flatMap和filter的实现类的
那就看一下dateStream.flatMap
方法
@Public
public class DataStream<T> {
protected final Transformation<T> transformation;
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes((FlatMapFunction)this.clean(flatMapper), this.getType(), Utils.getCallLocationName(), true);
return this.flatMap(flatMapper, outType);
}
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
return this.transform("Flat Map", outputType, (OneInputStreamOperator)(new StreamFlatMap((FlatMapFunction)this.clean(flatMapper))));
}
}
StreamFlatMap
构造时会把实现类当成入参构建OneInputStreamOperator
@Internal
public class StreamFlatMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> {
private static final long serialVersionUID = 1L;
private transient TimestampedCollector<OUT> collector;
public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
super(flatMapper);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
public void open() throws Exception {
super.open();
this.collector = new TimestampedCollector(this.output);
}
public void processElement(StreamRecord<IN> element) throws Exception {
this.collector.setTimestamp(element);
//这里就是调用的父类的userFunction,即构造函数传入的flatMapper
((FlatMapFunction)this.userFunction).flatMap(element.getValue(), this.collector);
}
}
下面会把userFunction
赋值给AbstractUdfStreamOperator
的字段,这样子类在调用userFunction
时就是调用的这个
@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends AbstractStreamOperator<OUT> implements OutputTypeConfigurable<OUT> {
private static final long serialVersionUID = 1L;
protected final F userFunction;
public AbstractUdfStreamOperator(F userFunction) {
this.userFunction = (Function)Objects.requireNonNull(userFunction);
this.checkUdfCheckpointingPreconditions();
}
}
这样StreamFlatMap
对userFunction
的操作,就是对实现了RichFlatMapFunction
的子类的操作
像filter也类似,如下
@Internal
public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
private static final long serialVersionUID = 1L;
public StreamFilter(FilterFunction<IN> filterFunction) {
super(filterFunction);
this.chainingStrategy = ChainingStrategy.ALWAYS;
}
public void processElement(StreamRecord<IN> element) throws Exception {
if (((FilterFunction)this.userFunction).filter(element.getValue())) {
this.output.collect(element);
}
}
}
StreamFilter
和StreamFlatMap
都是继承了AbstractUdfStreamOperator
实现了OneInputStreamOperator
接口,
你可以理解StreamFilter
和StreamFlatMap
有共同的父类和接口,
四、类关系图
RichFlatMapFunction
RichFilterFunction
通过上面两张图就知道RichFlatMapFunction
和RichFilterFunction
都是相同的父类扩展下来的
StreamFlatMap
StreamFilter
通过上面的也清楚,StreamFlatMap
和StreamFilter
都是相同的父类和接口,只是processElement
方法的实现不一样