目录
前言
Source
FlatMap
KeyBy
sum
总结
前言
以下面的WordCount为例
package com.wlh.p1;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class HelloWorld {
public static void main(String[] args) throws Exception {
//
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 7777)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = s.split(" ");
for (String word : split) {
collector.collect(Tuple2.of(word, 1));
}
}
})
.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
return stringIntegerTuple2.f0;
}
})
.sum(1)
.print();
//
env.execute();
}
}
上面是一个简单的WordCount程序,理解Flink之前,需要理解好Flink中的一些核心抽象概念
如下图所示,主要为3个:
(1)Transformation
(2)StreamOperator
(3)User-Defined Function
Transformation指的是在DateStream之间转换的操作,比如上面WordCount例子中的flatMap,它其实就对应着一个Transformation,表示从某个DataStream转换为另一个DataStream对应的Transformation。
以WordCount为例,先看一下对应的transformations,上述任务对应的transformations是一个list,list包含3个元素,但是元素对应的transformation的id是2/4/5。具体这些transformation是如何产生的是本文的重点。
Source
我们根据代码一步步跟进看一下
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
当获取到流式执行环境后(如果在本地获取的是Local的执行环境),StreamExecutionEnvironment中会存在一个成员变量transformations,初始化为空集合。
env.socketTextStream("localhost", 7777)
跟进代码后,
可以看到new SocketTextStreamFunction
这就是上面说的User-Defined Function,跟进该方法,可以看到确实是继承了Function接口
跟进addSource(
new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream");
这里有一个比较关键的参数Boundedness.CONTINUOUS_UNBOUNDED
表示该source是一个无界流,事实也是如此,socket流当然是无界的。
跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);
注意TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
TypeInformation是Flink中类型系统的核心类,该方法的内部逻辑是通过java提供的Type系统来提取的。debug跟进一下类型的提取
baseClass是SourceFunction;clazz是具体的实现类SocketTextStreamFunction。通过new TypeExtractor()对象提取该function具体的输出类型信息。
跟进new TypeExtractor()
.privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);查看具体的类型提取代码,通过反射获取,关于反射获取具体的类型,自行学习了解。
获取到SocketTextStreamFunction的输出类型后,继续跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);
刚刚已经获取输出类型之后,继续跟进后续代码
boolean isParallel = function instanceof ParallelSourceFunction; // 判断该source是不是可以并行的source,很明显这里的isParallel是false
clean(function); // 该行在做闭包清理/检查。如果不通过会报类似异常"Object " + obj + " is not serializable"
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
通过StreamSource的继承关系可以看出,StreamSource其实是开头提到的二号人物StreamOperator,至此,User-Defined Function和StreamOperator都已经出现了,并且它们的关系是StreamOperator中包含User-Defined Function,和开头图示一致。
return new DataStreamSource<>(
this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
最终,构建了Transformation,并且将operator传入,至此Transformation、StreamOperator、User-Defined Function都已经出现在视野中。
注意:在看DataStreamSource的继承关系时,会看到它继承自SingleOutputStreamOperator,这里是我觉得Flink命名不太好的地方,SingleOutputStreamOperator会被误认为是StreamOperator,但其实不是,SingleOutputStreamOperator是继承自DataStream的,并且在注释中明确说明SingleOutputStreamOperator是transformation。
跟进DataStream,会看到DataStream中封装了environment环境和通过env.socketTextStream("localhost", 7777)定义的第一个transformation。后面的api操作,如flatMap等,都是基于该DataStream进行操作了。
FlatMap
在上面DataStream的基础上将后续的api都介绍一下,跟进.flatMap
形参flatMapper,即为用户在编程时自定义的function,代码逻辑很清晰,依然是先获取输出的类型信息
跟进return flatMap(flatMapper, outType);
StreamFlatMap是StreamOperator的子类
跟进return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
跟进return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
首先new OneInputTransformation,创建一个新的Transformation,在构建的时候,传入了this.transformation,就是上面的LegacySourceTransformation。
new SingleOutputStreamOperator(environment, resultTransform); 创建一个新的DataStream,作为这一步API操作的返回值。
跟进getExecutionEnvironment().addOperator(resultTransform);
将transformation添加进env环境的transformations集合中,这个集合在未来会遍历生成StreamGraph。
KeyBy
在上面DataStream的基础上将后续的api都介绍一下,跟进.keyBy
直接new KeyedStream,跟进
获取key的类型信息,继续跟进构造方法
new PartitionTransformation创建了一个Transformation,将当前Datastream的Transformation作为PartitionTransformation的输入,并且将用户自定义的keySelector封装进KeyGroupStreamPartitioner。
继续跟进后,由于KeyedStream继承自DataStream,同样的,将env和当前的transformation封装进去。
至此KeyedStream构建完成,它的内容如下,
sum
在上面KeyedStream的基础上,继续跟进代码
SumAggregator<T> extends AggregationFunction<T>
SumAggerator就是User-Defined Function
跟进return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));
方法的签名为AggeragationFunction
跟进return reduce(aggregate).name("Keyed Aggregation");
这里出现了transformation
getExecutionEnvironment().addOperator(reduce); 这一行代码在之前说过,将该transformation加入到env的transformations中。
这里重点来看一下ReduceTransformation
构造方法如下:包含了聚合算子必须的一些信息,reducer是聚合的函数,input是之前的transformation,keySelector是分组key的提取方式。
值得注意的一行代码updateManagedMemoryStateBackendUseCase(true);
这里是在设置状态后端,这里第一次提到了状态的概念,状态是Flink得以流行的重要原因之一,有状态的流式计算。
这里是WordCount例子中的最后一步了,和前面的算子都非常类似,看到这里应该是可以举一反三了。
熟悉的味道,创建Function
创建StreamOperator
在创建DateStream时,创建了transformation
总结
本文介绍了Flink是如何将用户的api转换为Transformation,这是Flink的核心抽象,DataStream是面向用户的,Transformation并不面向用户,在Flink触发执行时,transformation会被Flink转换为更贴近底层执行的各种有向无环图,即常说的DAG。