目录
1、状态概述
1.1 无状态算子
1.2 有状态算子
2、状态分类
编辑 2.1 算子状态
2.1.1 列表状态(ListState)
2.1.2 联合列表状态(UnionListState)
2.1.3 广播状态(BroadcastState)
2.2 按键分区状态
2.2.1 值状态(ValueState)
2.2.2 列表状态(ListState)
2.2.3 Map状态(MapState)
2.2.4 归约状态(ReducingState)
2.2.5 聚合状态(AggregatingState)
2.2.6 状态生存时间(TTL)
3、状态后端(State Backends)
3.1 状态后端的分类(HashMapStateBackend/RocksDB)
3.1.1 哈希表状态后端(HashMapStateBackend)
3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
3.2 如何选择正确的状态后端
3.3 状态后端的配置
1、状态概述
1.1 无状态算子
根据当前的输入可以直接转换得到输出结果,这种鼻子就是无状态算子,如map,flatMap,filter
1.2 有状态算子
除当前处理之外,还需要其他处理才能得到计算结果。如聚合算子,窗口算子等
2、状态分类
Flink的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由Flink统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由Flink实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。
通常我们采用Flink托管状态来实现需求。
2.1 算子状态
一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽(task slot)。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。
算子状态的实际应用场景不如Keyed State多,一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。比如Flink的Kafka连接器中,就用到了算子状态。
算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。
2.1.1 列表状态(ListState)
与Keyed State中的列表状态的区别是:在算子状态的上下文中,不会按键(key)分别处理状态,所以每一个并行子任务上只会保留一个“列表”(list),也就是当前并行子任务上所有状态项的集合。列表中的状态项就是可以重新分配的最细粒度,彼此之间完全独立。
案例实操:在map算子中计算数据的个数。
public class OperatorListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env
.socketTextStream("hadoop102", 7777)
.map(new MyCountMapFunction())
.print();
env.execute();
}
// TODO 1.实现 CheckpointedFunction 接口
public static class MyCountMapFunction implements MapFunction<String, Long>, CheckpointedFunction {
private Long count = 0L;
private ListState<Long> state;
@Override
public Long map(String value) throws Exception {
return ++count;
}
/**
* TODO 2.本地变量持久化:将 本地变量 拷贝到 算子状态中,开启checkpoint时才会调用
*
* @param context
* @throws Exception
*/
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
System.out.println("snapshotState...");
// 2.1 清空算子状态
state.clear();
// 2.2 将 本地变量 添加到 算子状态 中
state.add(count);
}
/**
* TODO 3.初始化本地变量:程序启动和恢复时, 从状态中 把数据添加到 本地变量,每个子任务调用一次
*
* @param context
* @throws Exception
*/
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
System.out.println("initializeState...");
// 3.1 从 上下文 初始化 算子状态
state = context
.getOperatorStateStore()
.getListState(new ListStateDescriptor<Long>("state", Types.LONG));
// 3.2 从 算子状态中 把数据 拷贝到 本地变量
if (context.isRestored()) {
for (Long c : state.get()) {
count += c;
}
}
}
}
}
2.1.2 联合列表状态(UnionListState)
与ListState类似,联合列表状态也会将状态表示为一个列表。它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。
UnionListState的重点就在于“联合”(union)。在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。
如果列表中状态项数量太多,为资源和效率考虑一般不建议使用联合重组的方式。
使用方式同ListState,区别在:getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
state = context
.getOperatorStateStore()
.getUnionListState(new ListStateDescriptor<Long>("union-state", Types.LONG));
2.1.3 广播状态(BroadcastState)
有时我们希望算子并行子任务都保持同一份“全局”状态,用来做统一的配置和规则设定。
案例实操:水位超过指定的阈值发送告警,阈值可以动态修改。
public class OperatorBroadcastStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 数据流
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction());
// 配置流(用来广播配置)
DataStreamSource<String> configDS = env.socketTextStream("hadoop102", 8888);
// TODO 1. 将 配置流 广播
MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
BroadcastStream<String> configBS = configDS.broadcast(broadcastMapState);
// TODO 2.把 数据流 和 广播后的配置流 connect
BroadcastConnectedStream<WaterSensor, String> sensorBCS = sensorDS.connect(configBS);
// TODO 3.调用 process
sensorBCS
.process(
new BroadcastProcessFunction<WaterSensor, String, String>() {
/**
* 数据流的处理方法: 数据流 只能 读取 广播状态,不能修改
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processElement(WaterSensor value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
// TODO 5.通过上下文获取广播状态,取出里面的值(只读,不能修改)
ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
Integer threshold = broadcastState.get("threshold");
// 判断广播状态里是否有数据,因为刚启动时,可能是数据流的第一条数据先来
threshold = (threshold == null ? 0 : threshold);
if (value.getVc() > threshold) {
out.collect(value + ",水位超过指定的阈值:" + threshold + "!!!");
}
}
/**
* 广播后的配置流的处理方法: 只有广播流才能修改 广播状态
* @param value
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
// TODO 4. 通过上下文获取广播状态,往里面写数据
BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
broadcastState.put("threshold", Integer.valueOf(value));
}
}
)
.print();
env.execute();
}
}
2.2 按键分区状态
而很多有状态的操作(比如聚合、窗口)都是要先做keyBy进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。
它的特点非常鲜明,就是以key为作用范围进行隔离。
需要注意,使用Keyed State必须基于KeyedStream。没有进行keyBy分区的DataStream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问Keyed State。
2.2.1 值状态(ValueState)
public interface ValueState<T> extends State {
T value() throws IOException;
void update(T value) throws IOException;
}
- T value():获取当前状态的值;
- update(T value):对状态进行更新,传入的参数value就是要覆写的状态值。
在具体使用时,为了让运行时上下文清楚到底是哪个状态,我们还需要创建一个“状态描述器”(StateDescriptor)来提供状态的基本信息。例如源码中,ValueState的状态描述器构造方法如下:
public ValueStateDescriptor(String name, Class<T> typeClass) {
super(name, typeClass, null);
}
案例需求:检测每种传感器的水位值,如果连续的两个水位值超过10,就输出报警。
public class KeyedValueStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDS.keyBy(r -> r.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
// TODO 1.定义状态
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// TODO 2.在open方法中,初始化状态
// 状态描述器两个参数:第一个参数,起个名字,不重复;第二个参数,存储的类型
lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Types.INT));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// lastVcState.value(); // 取出 本组 值状态 的数据
// lastVcState.update(); // 更新 本组 值状态 的数据
// lastVcState.clear(); // 清除 本组 值状态 的数据
// 1. 取出上一条数据的水位值(Integer默认值是null,判断)
int lastVc = lastVcState.value() == null ? 0 : lastVcState.value();
// 2. 求差值的绝对值,判断是否超过10
Integer vc = value.getVc();
if (Math.abs(vc - lastVc) > 10) {
out.collect("传感器=" + value.getId() + "==>当前水位值=" + vc + ",与上一条水位值=" + lastVc + ",相差超过10!!!!");
}
// 3. 更新状态里的水位值
lastVcState.update(vc);
}
}
)
.print();
env.execute();
}
2.2.2 列表状态(ListState)
将需要保存的数据,以列表(List)的形式组织起来。在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。
- Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>;
- update(List<T> values):传入一个列表values,直接对状态进行覆盖;
- add(T value):在状态列表中添加一个元素value;
- addAll(List<T> values):向列表中添加多个元素,以列表values形式传入。
类似地,ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。
案例:针对每种传感器输出最高的3个水位值
public class KeyedListStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDS.keyBy(r -> r.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ListState<Integer> vcListState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcListState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vcListState", Types.INT));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 1.来一条,存到list状态里
vcListState.add(value.getVc());
// 2.从list状态拿出来(Iterable), 拷贝到一个List中,排序, 只留3个最大的
Iterable<Integer> vcListIt = vcListState.get();
// 2.1 拷贝到List中
List<Integer> vcList = new ArrayList<>();
for (Integer vc : vcListIt) {
vcList.add(vc);
}
// 2.2 对List进行降序排序
vcList.sort((o1, o2) -> o2 - o1);
// 2.3 只保留最大的3个(list中的个数一定是连续变大,一超过3就立即清理即可)
if (vcList.size() > 3) {
// 将最后一个元素清除(第4个)
vcList.remove(3);
}
out.collect("传感器id为" + value.getId() + ",最大的3个水位值=" + vcList.toString());
// 3.更新list状态
vcListState.update(vcList);
// vcListState.get(); //取出 list状态 本组的数据,是一个Iterable
// vcListState.add(); // 向 list状态 本组 添加一个元素
// vcListState.addAll(); // 向 list状态 本组 添加多个元素
// vcListState.update(); // 更新 list状态 本组数据(覆盖)
// vcListState.clear(); // 清空List状态 本组数据
}
}
)
.print();
env.execute();
}
}
2.2.3 Map状态(MapState)
把一些键值对(key-value)作为状态整体保存起来,可以认为就是一组key-value映射的列表。
MapState提供了操作映射状态的方法,与Map的使用非常类似。
- UV get(UK key):传入一个key作为参数,查询对应的value值;
- put(UK key, UV value):传入一个键值对,更新key对应的value值;
- putAll(Map<UK, UV> map):将传入的映射map中所有的键值对,全部添加到映射状态中;
- remove(UK key):将指定key对应的键值对删除;
- boolean contains(UK key):判断是否存在指定的key,返回一个boolean值。
另外,MapState也提供了获取整个映射相关信息的方法;
- Iterable<Map.Entry<UK, UV>> entries():获取映射状态中所有的键值对;
- Iterable<UK> keys():获取映射状态中所有的键(key),返回一个可迭代Iterable类型;
- Iterable<UV> values():获取映射状态中所有的值(value),返回一个可迭代Iterable类型;
- boolean isEmpty():判断映射是否为空,返回一个boolean值。
案例需求:统计每种传感器每种水位值出现的次数。
public class KeyedMapStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDS.keyBy(r -> r.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
MapState<Integer, Integer> vcCountMapState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcCountMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("vcCountMapState", Types.INT, Types.INT));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 1.判断是否存在vc对应的key
Integer vc = value.getVc();
if (vcCountMapState.contains(vc)) {
// 1.1 如果包含这个vc的key,直接对value+1
Integer count = vcCountMapState.get(vc);
vcCountMapState.put(vc, ++count);
} else {
// 1.2 如果不包含这个vc的key,初始化put进去
vcCountMapState.put(vc, 1);
}
// 2.遍历Map状态,输出每个k-v的值
StringBuilder outStr = new StringBuilder();
outStr.append("======================================\n");
outStr.append("传感器id为" + value.getId() + "\n");
for (Map.Entry<Integer, Integer> vcCount : vcCountMapState.entries()) {
outStr.append(vcCount.toString() + "\n");
}
outStr.append("======================================\n");
out.collect(outStr.toString());
// vcCountMapState.get(); // 对本组的Map状态,根据key,获取value
// vcCountMapState.contains(); // 对本组的Map状态,判断key是否存在
// vcCountMapState.put(, ); // 对本组的Map状态,添加一个 键值对
// vcCountMapState.putAll(); // 对本组的Map状态,添加多个 键值对
// vcCountMapState.entries(); // 对本组的Map状态,获取所有键值对
// vcCountMapState.keys(); // 对本组的Map状态,获取所有键
// vcCountMapState.values(); // 对本组的Map状态,获取所有值
// vcCountMapState.remove(); // 对本组的Map状态,根据指定key,移除键值对
// vcCountMapState.isEmpty(); // 对本组的Map状态,判断是否为空
// vcCountMapState.iterator(); // 对本组的Map状态,获取迭代器
// vcCountMapState.clear(); // 对本组的Map状态,清空
}
}
)
.print();
env.execute();
}
}
2.2.4 归约状态(ReducingState)
归约逻辑的定义,是在归约状态描述器(ReducingStateDescriptor)中,通过传入一个归约函数(ReduceFunction)来实现的。这里的归约函数,就是我们之前介绍reduce聚合算子时讲到的ReduceFunction,所以状态类型跟输入的数据类型是一样的。
public ReducingStateDescriptor(
String name, ReduceFunction<T> reduceFunction, Class<T> typeClass) {...}
案例:计算每种传感器的水位和
.process(new KeyedProcessFunction<String, WaterSensor, Integer>() {
private ReducingState<Integer> sumVcState;
@Override
public void open(Configuration parameters) throws Exception {
sumVcState = this
.getRuntimeContext()
.getReducingState(new ReducingStateDescriptor<Integer>("sumVcState",Integer::sum,Integer.class));
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<Integer> out) throws Exception {
sumVcState.add(value.getVc());
out.collect(sumVcState.get());
}
})
2.2.5 聚合状态(AggregatingState)
与归约状态非常类似,聚合状态也是一个值,用来保存添加进来的所有数据的聚合结果。与ReducingState不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数(AggregateFunction)来定义的;这也就是之前我们讲过的AggregateFunction,里面通过一个累加器(Accumulator)来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。
案例需求:计算每种传感器的平均水位
public class KeyedAggregatingStateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDS.keyBy(r -> r.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
AggregatingState<Integer, Double> vcAvgAggregatingState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
vcAvgAggregatingState = getRuntimeContext()
.getAggregatingState(
new AggregatingStateDescriptor<Integer, Tuple2<Integer, Integer>, Double>(
"vcAvgAggregatingState",
new AggregateFunction<Integer, Tuple2<Integer, Integer>, Double>() {
@Override
public Tuple2<Integer, Integer> createAccumulator() {
return Tuple2.of(0, 0);
}
@Override
public Tuple2<Integer, Integer> add(Integer value, Tuple2<Integer, Integer> accumulator) {
return Tuple2.of(accumulator.f0 + value, accumulator.f1 + 1);
}
@Override
public Double getResult(Tuple2<Integer, Integer> accumulator) {
return accumulator.f0 * 1D / accumulator.f1;
}
@Override
public Tuple2<Integer, Integer> merge(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
// return Tuple2.of(a.f0 + b.f0, a.f1 + b.f1);
return null;
}
},
Types.TUPLE(Types.INT, Types.INT))
);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 将 水位值 添加到 聚合状态中
vcAvgAggregatingState.add(value.getVc());
// 从 聚合状态中 获取结果
Double vcAvg = vcAvgAggregatingState.get();
out.collect("传感器id为" + value.getId() + ",平均水位值=" + vcAvg);
// vcAvgAggregatingState.get(); // 对 本组的聚合状态 获取结果
// vcAvgAggregatingState.add(); // 对 本组的聚合状态 添加数据,会自动进行聚合
// vcAvgAggregatingState.clear(); // 对 本组的聚合状态 清空数据
}
}
)
.print();
env.execute();
}
}
2.2.6 状态生存时间(TTL)
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。
配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的.enableTimeToLive()方法启动TTL功能。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("my state", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
这里用到了几个配置项:
- .newBuilder()
状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig了。方法需要传入一个Time作为参数,这就是设定的状态生存时间。
- .setUpdateType()
设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型OnReadAndWrite则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为OnCreateAndWrite。
- .setStateVisibility()
设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能继续存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的NeverReturnExpired是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。
public class StateTTLDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> sensorDS = env
.socketTextStream("hadoop102", 7777)
.map(new WaterSensorMapFunction())
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, ts) -> element.getTs() * 1000L)
);
sensorDS.keyBy(r -> r.getId())
.process(
new KeyedProcessFunction<String, WaterSensor, String>() {
ValueState<Integer> lastVcState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// TODO 1.创建 StateTtlConfig
StateTtlConfig stateTtlConfig = StateTtlConfig
.newBuilder(Time.seconds(5)) // 过期时间5s
// .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入(更新) 更新 过期时间
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入(更新) 更新 过期时间
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
.build();
// TODO 2.状态描述器 启用 TTL
ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastVcState", Types.INT);
stateDescriptor.enableTimeToLive(stateTtlConfig);
this.lastVcState = getRuntimeContext().getState(stateDescriptor);
}
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 先获取状态值,打印 ==》 读取状态
Integer lastVc = lastVcState.value();
out.collect("key=" + value.getId() + ",状态值=" + lastVc);
// 如果水位大于10,更新状态值 ===》 写入状态
if (value.getVc() > 10) {
lastVcState.update(value.getVc());
}
}
}
)
.print();
env.execute();
}
}
3、状态后端(State Backends)
在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就叫作状态后端(state backend)。状态后端主要负责管理本地状态的存储方式和位置。
3.1 状态后端的分类(HashMapStateBackend/RocksDB)
Flink中提供了两类不同的状态后端,一种是“哈希表状态后端”(HashMapStateBackend),另一种是“内嵌RocksDB状态后端”(EmbeddedRocksDBStateBackend)。
系统默认的状态后端是HashMapStateBackend。
3.1.1 哈希表状态后端(HashMapStateBackend)
HashMapStateBackend是把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在Taskmanager的JVM堆上。
普通的状态,以及窗口中收集的数据和触发器,都会以键值对的形式存储起来,所以底层是一个哈希表(HashMap),这种状态后端也因此得名。
3.1.2 内嵌RocksDB状态后端(EmbeddedRocksDBStateBackend)
RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。
配置EmbeddedRocksDBStateBackend后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。
3.2 如何选择正确的状态后端
HashMap和RocksDB两种状态后端最大的区别,就在于本地状态存放在哪里。
HashMapStateBackend是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。
而RocksDB是硬盘存储,所以可以根据可用的磁盘空间进行扩展,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比HashMapStateBackend慢一个数量级。
3.3 状态后端的配置
3.3.1 配置默认的状态后端
#flink-conf.yaml
# 默认状态后端
state.backend: hashmap
# 存放检查点的文件路径
# 这里的state.checkpoints.dir配置项,定义了检查点和元数据写入的目录。
state.checkpoints.dir: hdfs://hadoop102:8020/flink/checkpoints
3.3.2 为每个作业(Per-job/Application)单独配置状态后端
通过执行环境设置,HashMapStateBackend。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new HashMapStateBackend());
通过执行环境设置,EmbeddedRocksDBStateBackend。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
需要注意,如果想在IDE中使用EmbeddedRocksDBStateBackend,需要为Flink项目添加依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>