可以快速搭建一个Flink编写程序
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.17.1 \
-DgroupId=com.zxx.langhuan \
-DartifactId=langhuan-flink \
-Dversion=1.0.0-SNAPSHOT \
-Dpackage=com.zxx.langhuan.flink \
-DinteractiveMode=false
Flink 的 Java DataStream API 可以将任何可序列化的对象转化为流。Flink 自带的序列化器有
- 基本类型,即 String、Long、Integer、Boolean、Array
- 复合类型:Tuples、POJOs 和 Scala case classes
Flink 的原生序列化器可以高效地操作 tuples 和 POJOs。
对于 Java,Flink 自带有 Tuple0
到 Tuple25
类型。
POJOs
如果满足以下条件,Flink 将数据类型识别为 POJO 类型(并允许“按名称”字段引用):
- 该类是公有且独立的(没有非静态内部类)
- 该类有公有的无参构造函数
- 类(及父类)中所有的所有不被 static、transient 修饰的属性要么是公有的(且不被 final 修饰),要么是包含公有的 getter 和 setter 方法,这些方法遵循 Java bean 命名规范。
基本的 stream source
// DataStream API
// fromElements
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Person> flintstones = env.fromElements(
new Person("Fred", 35),
new Person("Wilma", 35),
new Person("Pebbles", 2));
// fromCollection
List<Person> people = new ArrayList<Person>();
people.add(new Person("Fred", 35));
people.add(new Person("Wilma", 35));
people.add(new Person("Pebbles", 2));
DataStream<Person> flintstones = env.fromCollection(people);
// socketTextStream
DataStream<String> lines = env.socketTextStream("localhost", 9999)
// readTextFile
DataStream<String> lines = env.readTextFile("file:///path");
// customSource
DataStreamSource<OUT> out = env.addSource(SourceFunction<OUT> function);
// Table API
// 创建TableEnvironment(表环境)。 创建表环境时,你可以设置作业属性,定义应用的批流模式,以及创建数据源。 我们先创建一个标准的表环境,并选择流式执行器。
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
// 批处理模式
// EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.executeSql("CREATE TABLE transactions (\n" +
" account_id BIGINT,\n" +
" amount BIGINT,\n" +
" transaction_time TIMESTAMP(3),\n" +
" WATERMARK FOR transaction_time AS transaction_time - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 'transactions',\n" +
" 'properties.bootstrap.servers' = 'kafka:9092',\n" +
" 'format' = 'csv'\n" +
")");
tEnv.executeSql("CREATE TABLE spend_report (\n" +
" account_id BIGINT,\n" +
" log_ts TIMESTAMP(3),\n" +
" amount BIGINT\n," +
" PRIMARY KEY (account_id, log_ts) NOT ENFORCED" +
") WITH (\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://mysql:3306/sql-demo',\n" +
" 'table-name' = 'spend_report',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'sql-demo',\n" +
" 'password' = 'demo-sql'\n" +
")");
Table transactions = tEnv.from("transactions");
report(transactions).executeInsert("spend_report");
支持的connectors
DataStream Connectors | Table API Connectors | |
DataGen | ✅ | ✅ |
Kafka | ✅ | ✅ |
Cassandra | ✅ | |
DynamoDB | ✅ | ✅ |
ElasticSearch | ✅ | ✅ |
Firehose | ✅ | ✅ |
Kinesis | ✅ | ✅ |
MongoDB | ✅ | ✅ |
OpenSearch | ✅ | ✅ |
文件系统 | ✅ | ✅ |
RabbitMQ | ✅ | |
Google Cloud PubSub | ✅ | |
Hybrid Source | ✅ | |
Pulsar | ✅ | |
JDBC | ✅ | ✅ |
Upsert Kafka | ✅ | |
HBase | ✅ | |
✅ | ||
BlackHole | ✅ | |
Hive | ✅ |
场景说明
一个 Flink 集群总是包含一个 JobManager 以及一个或多个 Flink TaskManager。
JobManager 负责处理 Job 提交、 Job 监控以及资源管理。
Flink TaskManager 运行 worker 进程, 负责实际任务 Tasks 的执行,而这些任务共同组成了一个 Flink Job。
有界流:批处理
无界流:流处理
无状态的转换
map()
flatmap()
keyBy()
以下情况,一个类不能作为 key:
- 它是一种 POJO 类,但没有重写 hashCode() 方法而是依赖于 Object.hashCode() 实现。
- 它是任意类的数组。
reduce()
和其他聚合算子
数据流转换
有状态的转换
在 Flink 不参与管理状态的情况下,你的应用也可以使用状态,但 Flink 为其管理状态提供了一些引人注目的特性:
- 本地性: Flink 状态是存储在使用它的机器本地的,并且可以以内存访问速度来获取
- 持久性: Flink 状态是容错的,例如,它可以自动按一定的时间间隔产生 checkpoint,并且在任务失败后进行恢复
- 纵向可扩展性: Flink 状态可以存储在集成的 RocksDB 实例中,这种方式下可以通过增加本地磁盘来扩展空间
- 横向可扩展性: Flink 状态可以随着集群的扩缩容重新分布
- 可查询性: Flink 状态可以通过使用 状态查询 API 从外部进行查询。
Rich Functions
open(Configuration c):
仅在算子初始化时调用一次。可以用来加载一些静态数据,或者建立外部服务的链接等。close()
getRuntimeContext():
为整套潜在有趣的东西提供了一个访问途径,最明显的,它是你创建和访问 Flink 状态的途径。setRuntimeContext()
getIteration
RuntimeContext()
ValueState:要理解这个代表的不仅仅是一个单独的布尔类型变量,而是一个分布式的共享键值存储。(相同的key共享)
value()
- update()
- clear()
Flink 明确支持以下三种时间语义:
-
事件时间(event time): 事件产生的时间,记录的是设备生产(或者存储)事件的时间
-
摄取时间(ingestion time): Flink 读取事件时记录的时间
-
处理时间(processing time): Flink pipeline 中具体算子处理事件的时间
Watermarks: 定义何时停止等待较早的事件
// AssignerWithPunctuatedWatermarks 在每个事件上都可以提供水位线,因此水位线可以更加灵活和精确地根据事件的特性进行生成。
// AssignerWithPeriodicWatermarks 在一定的时间间隔内提供水位线,因此水位线的生成不会频繁,适用于事件的时间戳变化频率较高,而水位线的变化频率较低的情况。
// 在选择使用哪个接口时,可以根据具体的业务需求和事件流的特点来决定。如果事件的时间戳和水位线的变化较为频繁,或者需要更精确的控制,可以选择 AssignerWithPunctuatedWatermarks。如果事件的时间戳变化较为平稳,或者水位线的变化不需要那么频繁,可以选择 AssignerWithPeriodicWatermarks。
// WatermarkStrategy 接口包含以下方法:
// withTimestampAssigner:指定如何从事件中提取事件时间戳(timestamp)。
// withIdleness:配置是否在流处于空闲状态时发出水位线,默认为不发出水位线。
// withIdlenessTimeout:指定流处于空闲状态多久后发出水位线。
// withTimestampAssignerAndIdlenessTimeout:同时指定事件时间戳提取逻辑和流空闲状态的配置。
// 通常,你可以通过静态工厂方法 WatermarkStrategy.forXXX() 来创建特定的水位线策略,其中 XXX 可以是以下几种类型之一:
// forBoundedOutOfOrderness:适用于乱序但有界的事件流,根据最大允许的乱序程度指定固定的延迟时间。
// forMonotonousTimestamps:适用于单调递增的事件流,例如源自某些消息队列的事件流。
// forGenerator:通过自定义的水位线生成器函数来创建水位线策略。
// forPeriodicBoundedOutOfOrderness:适用于周期性有界乱序事件流,指定固定的延迟时间和乱序间隔。
// forEventTime:适用于事件时间已经正确嵌入在事件中的情况,不需要提取时间戳。
DataStream<Event> stream = ...;
WatermarkStrategy<Event> strategy = WatermarkStrategy
.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.timestamp);
DataStream<Event> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(strategy);
Windows
Flink 有一些内置的窗口分配器,如下所示:
- 滚动时间窗口
- 每分钟页面浏览量
TumblingEventTimeWindows.of(Time.minutes(1))
- 滑动时间窗口
- 每10秒钟计算前1分钟的页面浏览量
SlidingEventTimeWindows.of(Time.minutes(1), Time.seconds(10))
- 会话窗口
- 每个会话的网页浏览量,其中会话之间的间隔至少为30分钟
EventTimeSessionWindows.withGap(Time.minutes(30))
以下都是一些可以使用的间隔时间 Time.milliseconds(n)
, Time.seconds(n)
, Time.minutes(n)
, Time.hours(n)
, 和 Time.days(n)
。
窗口应用函数
我们有三种最基本的操作窗口内的事件的选项:
- 像批量处理,
ProcessWindowFunction
会缓存Iterable
和窗口内容,供接下来全量计算; - 或者像流处理,每一次有事件被分配到窗口时,都会调用
ReduceFunction
或者AggregateFunction
来增量计算; - 或者结合两者,通过
ReduceFunction
或者AggregateFunction
预聚合的增量计算结果在触发窗口时, 提供给ProcessWindowFunction
做全量计算。
DataStream<SensorReading> input = ...;
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.process(new MyWastefulMax());
public static class MyWastefulMax extends ProcessWindowFunction<
SensorReading, // 输入类型
Tuple3<String, Long, Integer>, // 输出类型
String, // 键类型
TimeWindow> { // 窗口类型
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> events,
Collector<Tuple3<String, Long, Integer>> out) {
int max = 0;
for (SensorReading event : events) {
max = Math.max(event.value, max);
}
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
// Context 接口 展示大致如下:
public abstract class Context implements java.io.Serializable {
public abstract W window();
public abstract long currentProcessingTime();
public abstract long currentWatermark();
public abstract KeyedStateStore windowState();
public abstract KeyedStateStore globalState();
}
// windowState 和 globalState 可以用来存储当前的窗口的 key、窗口或者当前 key 的每一个窗口信息。这在一些场景下会很有用,试想,我们在处理当前窗口的时候,可能会用到上一个窗口的信息。
// 增量聚合示例
DataStream<SensorReading> input = ...;
input
.keyBy(x -> x.key)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.reduce(new MyReducingMax(), new MyWindowFunction());
private static class MyReducingMax implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r1 : r2;
}
}
private static class MyWindowFunction extends ProcessWindowFunction<
SensorReading, Tuple3<String, Long, SensorReading>, String, TimeWindow> {
@Override
public void process(
String key,
Context context,
Iterable<SensorReading> maxReading,
Collector<Tuple3<String, Long, SensorReading>> out) {
SensorReading max = maxReading.iterator().next();
out.collect(Tuple3.of(key, context.window().getEnd(), max));
}
}
晚到的事件
-
旁路输出
OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
SingleOutputStreamOperator<Event> result = stream
.keyBy(...)
.window(...)
.sideOutputLateData(lateTag)
.process(...);
DataStream<Event> lateStream = result.getSideOutput(lateTag);
- 指定允许的延迟
stream
.keyBy(...)
.window(...)
.allowedLateness(Time.seconds(10))
.process(...);
如果延迟的事件需要特殊处理,可以从Context中获取水位线时间来做比较
public void processElement(
TaxiFare fare,
Context ctx,
Collector<Tuple3<Long, Long, Float>> out) throws Exception {
long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();
if (eventTime <= timerService.currentWatermark()) {
// 事件延迟;其对应的窗口已经触发。
} else {
// 其他处理
}
}
深入了解窗口操作
滑动窗口是通过复制来实现的
滑动窗口分配器可以创建许多窗口对象,并将每个事件复制到每个相关的窗口中。例如,如果您每隔 15 分钟就有 24 小时的滑动窗口,则每个事件将被复制到 4 * 24 = 96 个窗口中。
时间窗口会和时间对齐
仅仅因为我们使用的是一个小时的处理时间窗口并在 12:05 开始运行您的应用程序,并不意味着第一个窗口将在 1:05 关闭。第一个窗口将长 55 分钟,并在 1:00 关闭。
请注意,滑动窗口和滚动窗口分配器所采用的 offset 参数可用于改变窗口的对齐方式。有关详细的信息,请参见 滚动窗口 和 滑动窗口 。
window 后面可以接 window
比如说:
stream
.keyBy(t -> t.key)
.window(<window assigner>)
.reduce(<reduce function>)
.windowAll(<same window assigner>)
.reduce(<same reduce function>);
可能我们会猜测以 Flink 的能力,想要做到这样看起来是可行的(前提是你使用的是 ReduceFunction 或 AggregateFunction ),但不是。
之所以可行,是因为时间窗口产生的事件是根据窗口结束时的时间分配时间戳的。例如,一个小时小时的窗口所产生的所有事件都将带有标记一个小时结束的时间戳。后面的窗口内的数据消费和前面的流产生的数据是一致的。
空的时间窗口不会输出结果
事件会触发窗口的创建。换句话说,如果在特定的窗口内没有事件,就不会有窗口,就不会有输出结果。
延迟事件可能导致延迟合并
会话窗口的实现是基于窗口的一个抽象能力,窗口可以 聚合。会话窗口中的每个数据在初始被消费时,都会被分配一个新的窗口,但是如果窗口之间的间隔足够小,多个窗口就会被聚合。延迟事件可以弥合两个先前分开的会话间隔,从而产生一个虽然有延迟但是更加准确地结果。