一、Flink中的基本合流操作
在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。
二、联合(Union)
最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:
stream1.union(stream2, stream3, ...)
注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。
代码实现:我们可以用下面的代码做一个简单测试:
package com.flink.DataStream.UnionStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class FlinkUnionStream {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setParallelism(1);
SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment
.socketTextStream("localhost", 1111)
.map(a -> Integer.parseInt(a));
SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment
.socketTextStream("localhost", 2222)
.map(a -> Integer.parseInt(a));
DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");
DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));
unionResult.print();
streamExecutionEnvironment.execute();
}
}
三、连接(Connect)
为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。