窗口机制
-
tumble(滚动窗口)
-
hop(滑动窗口)
-
session(会话窗口)
-
cumulate(渐进式窗口)
-
Over(聚合窗口)
滚动窗口(tumble)
概念
滚动窗口 (tumble): 窗口大小 = 滑动距离。
它的窗口是紧密排布的,中间没有任何的数据重复和丢失。
案例 - SQL
--创建表
CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
--语法
tumble(row_time,时间间隔),比如,如下的sql
tumble(row_time,interval '5' second),每隔5秒滚动一次。
--业务查询逻辑(传统方式)
select
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end
from source_table
group by
user_id,
tumble(row_time, interval '5' second);
运行结果如下:
案例 - DataStream API
package day04;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
/**
* @desc: 演示基于事件时间的滚动窗口。
* 从socket数据源中接收数据(id,price,ts)
*/
public class Demo01_TumbleWindow {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.数据源
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
//3.1 数据切分,转成Tuple3
//3.2 分配窗口
/**
* WatermarkStrategy的策略有四种:
* forMonotonousTimestamps,单调递增水印
* forBoundedOutOfOrderness,允许乱序数据(数据迟到)的水印
* forGenerator,自定义水印
* noWatermarks,没有水印
*/
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapAndWatermarkStream = source.map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public Tuple3<String, Integer, Long> map(String value) throws Exception {
String[] lines = value.split(",");
/**
* lines分为3个字段:String id,Integer price,Long ts
*/
return Tuple3.of(lines[0], Integer.valueOf(lines[1]), Long.parseLong(lines[2]));
}
}).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
//这个方法就是用来标识(标记)哪个列是用来表示时间戳的
return element.f2 * 1000L;
}
}));
mapAndWatermarkStream.print("源数据:");
//3.3 根据id进行分组
KeyedStream<Tuple3<String, Integer, Long>, String> keyedStream = mapAndWatermarkStream.keyBy(value -> value.f0);
//3.4分组后,进行窗口划分
//3.5 划分窗口后,对窗口内的数据进行sum
//3.6为了显示的友好性,我们对Tuple3<String,Integer,Long>进行转换成Tuple2<String,Integer>
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedStream.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1)
.map(value -> Tuple2.of(value.f0, value.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
//4.数据输出
result.printToErr("聚合后的数据:");
//5.启动流式任务
env.execute();
}
}
运行结果如下:
SQL案例TVF写法
--语法,跟3个参数:
--参数1:表名
--参数2:表中事件时间列
--参数3:窗口大小
from table(tumble(table source,descriptor(row_time),interval '5' second))
--业务逻辑
SELECT
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND))
GROUP BY window_start,
window_end,
user_id;
--window_start,window_end是新写法的关键字
--对照前面的方式:
select
user_id,
count(*) as pv,
sum(price) as sum_price,
UNIX_TIMESTAMP(CAST(tumble_start(row_time, interval '5' second) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(tumble_end(row_time, interval '5' second) AS STRING)) * 1000 as window_end
from source_table
group by
user_id,
tumble(row_time, interval '5' second);
运行结果如下:
滑动窗口(hop)
概念
滑动窗口 :滑动距离 不等于 窗口大小。
(1)如果滑动距离小于窗口大小,则会产生数据重复
(2)如果滑动距离等于窗口大小,这就是滚动窗口
(3)如果滑动距离大于窗口大小,则会产生数据丢失(不考虑)
案例 - SQL
--创建表
CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
--语法
hop(事件时间列,滑动间隔,窗口大小)
hop(row_time,interval '2' second, interval '5' second)
--业务SQL
SELECT user_id,
UNIX_TIMESTAMP(CAST(hop_start(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(hop_end(row_time, interval '2' SECOND, interval '5' SECOND) AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, hop(row_time, interval '2' SECOND, interval '5' SECOND);--每隔两秒滑动一次
运行结果如下:
SQL案例TVF写法
--语法
--table:表名
--descriptor:事件时间列
--滑动距离:interval 2 second
--窗口大小:interval 5 second
from table(hop(table 表名,descriptor(事件时间列),滑动间隔,窗口大小))
from table(hop(table source,descriptor(row_time),interval '2' second,interval '5' second))
--业务SQL
SELECT
user_id,
UNIX_TIMESTAMP(CAST(window_start AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(window_end AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM TABLE(HOP(
TABLE source_table
, DESCRIPTOR(row_time)
, interval '2' SECOND, interval '6' SECOND))
GROUP BY window_start,
window_end,
user_id;
运行结果如下:
案例 - DataStream API
package day04;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @desc: 演示基于事件时间的滑动窗口。
* 从socket数据源中接收数据(id,price,ts)
*/
public class Demo02_SlideWindow {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.数据源
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
//3.1数据map转换操作,转成Tuple3
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapStream = source.map(value -> {
/**
* 由一行数据中,用逗号进行切分【id,price,ts】
* String id
* Integer price
* Long ts
*/
String[] lines = value.split(",");
return Tuple3.of(lines[0], Integer.valueOf(lines[1]), Long.parseLong(lines[2]));
}).returns(Types.TUPLE(Types.STRING, Types.INT, Types.LONG));
//3.2把Tuple3的数据添加Watermark(monotonousTimestamps)
/**
* WatermarkStrategy生成水印的策略有四种:
* forMonotonousTimestamps,单调递增水印(用的次多)
* forBoundedOutOfOrderness,运行数据迟到(乱序)(用的最多)
* forGeneric,自定义水印(不用)
* noWatermark,没有水印(不用)
*/
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarks = mapStream.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
return element.f2 * 1000L;
}
}));
//3.3把数据根据id进行分组
watermarks.print("源数据:");
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> sumStream = watermarks.keyBy(value -> value.f0)
//3.4分组之后,设置滑动事件时间窗口,并且制定窗口大小为5秒钟,滑动间隔为2秒。
/**
* SlidingEventTimeWindows,滑动事件时间窗口,带2个参数:
* 参数1:窗口大小
* 参数2:滑动间隔
*/
.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))
//3.5对窗口内的数据进行sum操作
.sum(1);
//3.6把Tuple3转成了Tuple2(取id和sum的值)
SingleOutputStreamOperator<Tuple2<String, Integer>> result = sumStream.map(value -> Tuple2.of(value.f0, value.f1))
.returns(Types.TUPLE(Types.STRING,Types.INT));
//4.数据输出
result.printToErr("聚合后的数据:");
//5.启动流式任务
env.execute();
}
}
运行结果如下:
会话窗口(session)
概念
会话窗口:在一个会话周期内,窗口的数据会累积,超过会话周期就会触发窗口的计算,同时开辟下一个新窗口。
注意:
数据本身的事件时间大于窗口间隔,才会触发当前窗口的计算。
案例 - SQL
--创建表
CREATE TABLE source_table (
user_id STRING,
price BIGINT,
`timestamp` bigint,
row_time AS TO_TIMESTAMP(FROM_UNIXTIME(`timestamp`)),
watermark for row_time as row_time - interval '0' second
) WITH (
'connector' = 'socket',
'hostname' = 'node1',
'port' = '9999',
'format' = 'csv'
);
--语法
session(事件时间列,窗口间隔)
session(row_time,interval '5' second)
--业务SQL
SELECT
user_id,
UNIX_TIMESTAMP(CAST(session_start(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_start,
UNIX_TIMESTAMP(CAST(session_end(row_time, interval '5' SECOND) AS STRING)) * 1000 as window_end,
sum(price) as sum_price
FROM source_table
GROUP BY user_id
, session(row_time, interval '5' SECOND);
运行结果如下:
案例 - DataStream API
package day04;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @desc: 演示基于事件时间的会话窗口。
* 从socket数据源中接收数据(id,price,ts)
*/
public class Demo03_SessionWindow {
public static void main(String[] args) throws Exception {
//1.创建流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//2.数据源
DataStreamSource<String> source = env.socketTextStream("node1", 9999);
//3.数据处理
//3.1数据map转换操作,转成Tuple3
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> mapStream = source.map(new MapFunction<String, Tuple3<String, Integer, Long>>() {
@Override
public Tuple3<String, Integer, Long> map(String value) throws Exception {
/**
* String id
* Integer price
* Long ts
*/
String[] lines = value.split(",");
return Tuple3.of(lines[0], Integer.valueOf(lines[1]), Long.parseLong(lines[2]));
}
});
//3.2把Tuple3的数据添加Watermark(monotonousTimestamps)
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> watermarks = mapStream
.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, Integer, Long>>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, Long>>() {
@Override
public long extractTimestamp(Tuple3<String, Integer, Long> element, long recordTimestamp) {
return element.f2 * 1000L;
}
}));
watermarks.print("源数据:");
//3.3把数据根据id进行分组
//3.4分组之后,设置会话事件时间窗口,并且指定窗口间隔为5秒钟。
//3.5对窗口内的数据进行sum操作
SingleOutputStreamOperator<Tuple3<String, Integer, Long>> sumStream = watermarks.keyBy(value -> value.f0)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.sum(1);
//3.6把Tuple3转成了Tuple2(取id和sum的值)
SingleOutputStreamOperator<Tuple2<String, Integer>> result = sumStream.map(value -> Tuple2.of(value.f0, value.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
//4.数据输出
result.printToErr("聚合后的结果:");
//5.启动流式任务
env.execute();
}
}
运行结果如下:
渐进式窗口(cumulate)
概念
渐进式窗口:在固定时间内,结果是单调递增的。比如周期性累计求某些指标。
如下图所示:
案例 -TVF方式
--语法
--1.source:表名
--2.事件时间列
--3.时间间隔,每隔多久把窗口内的数据统计一次
--4.窗口大小,窗口的时间长度
from table(cumulate(table source,descriptor(事件时间列),时间间隔,窗口大小))
--建表
CREATE TABLE source_table (
-- 用户 id
user_id BIGINT,
-- 用户
money BIGINT,
-- 事件时间戳
row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
-- watermark 设置
WATERMARK FOR row_time AS row_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '10',
'fields.user_id.min' = '1',
'fields.user_id.max' = '100000',
'fields.money.min' = '1',
'fields.money.max' = '100000'
);
--业务SQL
SELECT
FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_start AS STRING))) as window_start,
FROM_UNIXTIME(UNIX_TIMESTAMP(CAST(window_end AS STRING))) as window_end,
sum(money) as sum_money,
count(distinct user_id) as count_distinct_id
FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '5' SECOND
, INTERVAL '30' SECOND))
GROUP BY
window_start,
window_end;
--上面的cumulate窗口翻译如下:
--每隔5秒,统计最近30秒内的数据(总金额,用户量)
运行结果如下:
补充:
1.渐进式窗口没有普通SQL的写法。
2.渐进式窗口没有DataStream API。
Over 窗口
聚合窗口,用的不多。
Over 窗口分为两类:
-
时间区间范围
-
行数
时间区间范围
--语法
--range:范围,
--between ... and ... : 在...之间
--INTERVAL '1' HOUR PRECEDING:一小时前
--CURRENT ROW:当前行
RANGE BETWEEN 起始时间范围 AND CURRENT ROW
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
--建表SQL
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '10',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
--业务SQL
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 30秒的数据
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table;
运行结果如下:
行数
--语法
--ROWS:行数
--行数 preceding:往前多少行
--:CURRENT ROW:当前行
ROWS BETWEEN 行数 PRECEDING AND CURRENT ROW
--统计最近100行到当前行的指标
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
对比前面的时间区间语法:
RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
--创建表
CREATE TABLE source_table (
order_id BIGINT,
product BIGINT,
amount BIGINT,
order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
WATERMARK FOR order_time AS order_time - INTERVAL '0' SECOND
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1',
'fields.order_id.min' = '1',
'fields.order_id.max' = '2',
'fields.amount.min' = '1',
'fields.amount.max' = '2',
'fields.product.min' = '1',
'fields.product.max' = '2'
);
--业务SQL
SELECT product, order_time, amount,
SUM(amount) OVER (
PARTITION BY product
ORDER BY order_time
-- 标识统计范围是一个 product 的最近 5 行数据
ROWS BETWEEN 100 PRECEDING AND CURRENT ROW
) AS one_hour_prod_amount_sum
FROM source_table;
运行结果如下:
补充:
1.Over窗口没有传统窗口的SQL写法。
2.Over窗口没有DataStream API的写法。