flink重温笔记(十):Flink 高级 API 开发——flink 四大基石之 State(涉及Checkpoint)

Flink学习笔记

前言:今天是学习 flink 的第 10 天啦!学习了 flink 四大基石之 State (状态),主要是解决大数据领域增量计算的效果,能够保存已经计算过的结果数据状态!重点学习了 state 的类型划分和应用,以及 TTL 原理和应用,即数据状态也会过期和定期清除的问题,以及广播流数据的企业应用场景,结合自己实验猜想和代码实践,总结了很多自己的理解和想法,希望和大家多多交流!

Tips:广州回南天色佳,学习 state 意更浓。心随知识飘然去,智慧之舟破浪中。越来越有状态,明天也要继续努力!


文章目录

  • Flink学习笔记
    • 三、Flink 高级 API 开发
      • 3. State
        • 3.1 State 应用场景
        • 3.2 State 类型划分
          • 3.2.1 Keyed State 键控状态
            • (1) 特点
            • (2) 保存的数据结构
            • (3) 案例演示
          • 3.2.1 Operate State 算子状态
            • (1) 特点
            • (2) 保存的数据结构
            • (3) 案例演示
        • 3.3 State TTL 状态有效期
          • 3.3.1 功能用法
            • (1) TTL 的更新策略
            • (2) 状态数据过期但未被清除时
            • (3) 过期数据的清除
          • 3.3.2 案例演示
        • 3.4 Broadcast State
          • 3.4.1 应用场景
          • 3.4.2 注意事项
          • 3.4.3 案例演示
          • 3.4.4 BroadcastState 执行思路梳理

三、Flink 高级 API 开发

3. State

简介:State(状态)是基于 Checkpoint(检查点)来完成状态持久化,在 Checkpoint 之前,State 是在内存中(变量),在 Checkpoint 之后,State 被序列化永久保存,支持存储方式:File,HDFS,S3等。

3.1 State 应用场景
  • (1)去重
  • (2)窗口计算
  • (3)机器学习/深度学习
  • (4)访问历史数据

3.2 State 类型划分
  • 基本类型划分
    • Keyed State(键控状态)
    • Operate State(算子状态)
  • 存在方式划分
    • raw State (原始状态):原始状态
    • managed State(托管状态):Flink 自动管理的 State,实际生产推荐使用
  • 原始状态和托管状态区别:
状态管理方式数据结构使用场景
Managed StateFlink Runtime 管理自动存储,自动恢复内存管理可自动优化Value,List,Map…大多数情况下均可使用
Raw State需要用户自己管理,需要自己序列化字节数组自定义Operator时使用
3.2.1 Keyed State 键控状态
(1) 特点
  • 1- 只能用于 keyby 后的数据流
  • 2- 一个 key 只能属于 一个key State
(2) 保存的数据结构
  • Keyed State 通过 RuntimeContext 访问,这需要 Operator 是一个RichFunction。
  • 1- ValueState:单值状态
  • 2- ListState:列表状态
  • 3- ReducingState:传入reduceFunction,单一状态值
  • 4- MapState<UK, UV>:状态值为 map
(3) 案例演示

例子:词频统计,不要用 sum,而是用 reduce,然后 ValueState

package cn.itcast.day10.state;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-03-03 16:01:44
 * @description TODO:演示 keyedState 的使用
 */
public class KeyedStateDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)创建flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)开启checkpoint
        env.enableCheckpointing(5000);

        //todo 3)构建数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //todo 4)单词拆分
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndNum = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String line) throws Exception {
                String[] data = line.split(",");
                return Tuple2.of(data[0], Integer.parseInt(data[1]));
            }
        });

        //todo 5)分流操作
        KeyedStream<Tuple2<String, Integer>, String> keyedDataStream = wordAndNum.keyBy(t -> t.f0);

        //todo 6) 聚合操作(自定义state方式实现)
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduceState = keyedDataStream.reduce(new RichReduceFunction<Tuple2<String, Integer>>() {
            // todo 6.1 定义 state 对象
            private ValueState<Tuple2<String, Integer>> valueState = null;

            // 初始化资源
            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
                // todo 6.2 实例化 state 对象
                valueState = getRuntimeContext().getState(
                        new ValueStateDescriptor<Tuple2<String, Integer>>(
                                "reduceState",
                                TypeInformation.of(
                                        new TypeHint<Tuple2<String, Integer>>() {
                                        })));
            }

            @Override
            public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
                // state 存储的历史数据
                // todo 6.3 获取state 对象
                Tuple2<String, Integer> result = valueState.value();
                if (result == null) {
                    result = Tuple2.of(value1.f0, value1.f1);
                }
                Tuple2<String, Integer> resultSum = Tuple2.of(result.f0, result.f1 + value2.f1);
                // 之前没有重写状态:Tuple2.of(value1.f0, value1.f1 + value2.f1);
                // 因为我们这里的 value1 已经从 state 中获取到 result了,所以重写状态的方法是,就用result!
                // todo 6.4 更新 state 对象
                valueState.update(resultSum);
                return resultSum;
            }

            // 释放资源
            @Override
            public void close() throws Exception {
                super.close();
                // 获取 state 的数据
                Tuple2<String, Integer> value = valueState.value();
                System.out.println("=======释放资源的时候打印 state 数据========");
                System.out.println(value);
            }
        });
        //todo 6)打印测试
        reduceState.print();
        //todo 7)运行
        env.execute();
    }
}

结果:

输入:
hadoop,1
hadoop,2

输出:
8> (hadoop,1)
8> (hadoop,3)

总结:

  • 1- state 类似于一个数据库,可以从中获取之前计算过的数据
  • 2- 定义 state 对象,初始值为 null
  • 3- 实例化 state 对象,getRuntimeContext().getState(new ValueStateDescriptor())
  • 4- 获取 state 中的数据:状态.value()
  • 5- 更新 state 中的数据:状态.update(数据流)

3.2.1 Operate State 算子状态
(1) 特点
  • 1- 一个算子状态仅与一个算子实例绑定
  • 2- 算子状态可以用于所有算子,但常见是 Source
(2) 保存的数据结构
  • Operator State 需要自己实现 CheckpointedFunction 或 ListCheckpointed 接口
  • 1- ListState
  • 2- BroadcastState<K,V>
(3) 案例演示

例子:使用 OperatorState 进行演示基于类似于 kafka 消费数据的功能

package cn.itcast.day10.state;

/**
 * @author lql
 * @time 2024-03-03 17:21:50
 * @description TODO:使用OperatorState进行演示基于类似于kafka消费数据的功能
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * 实现步骤:
 * 1)初始化flink流式程序的运行环境
 * 2)设置并行度为1
 * 3)启动checkpoint
 * 4)接入数据源
 * 5)打印测试
 * 6)启动作业
 */
public class OperatorStateDemo {
    public static void main(String[] args) throws Exception {
        // todo 1) 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // todo 2) 设置并行度为 1
        env.setParallelism(1);

        // todo 3) 启动checkpoint机制
        env.enableCheckpointing(4000);

        // todo 4) 接入数据源
        DataStreamSource<Integer> source  = env.addSource(new MySourceWithState());

        SingleOutputStreamOperator<String> result = source.map(new MapFunction<Integer, String>() {
            @Override
            public String map(Integer integer) throws Exception {
                return integer.toString();
            }
        });

        //TODO 5)打印测试
        result.printToErr();

        //TODO 6)启动作业
        env.execute();
    }

    private static class MySourceWithState extends RichSourceFunction<Integer> implements CheckpointedFunction {
        //定义成员变量是否循环生成数据
        private Boolean isRunning = true;
        private Integer currentCounter = 0;

        //定义ListState保存结果数据。保存offset的累加值
        private ListState<Integer> listState = null;

        /**
         * 将state中的数据持久化存储到文件中(每4秒钟进行一次快照,将state数据存储到hdfs)
         * @param functionSnapshotContext
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            System.out.println("调用snapshotState方法。。。。。。。。");

            // 清除历史状态存储的历史数据,this引用对象的成员变量
            this.listState.clear();

            //将最新的累加值添加到状态中
            this.listState.add(this.currentCounter);
        }

        /**
         * 初始化 state 对象
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("调用initializeState方法。。。。。。。。");

            //初始化一个listState
            OperatorStateStore stateStore  = context.getOperatorStateStore();

            listState = stateStore.getListState(
                    new ListStateDescriptor<>(
                            "operator-states",
                            TypeInformation.of(new TypeHint<Integer>() {}))
            );

            // 获取历史数据
            for (Integer counter  : this.listState.get()) {
                //将历史存储的累加值取出来赋值给当前累加值变量
                this.currentCounter = counter;
            }

            //清除状态中存储的历史数据
            this.listState.clear();
        }

        /**
         * 生产数据
         * @param sourceContext
         * @throws Exception
         */
        @Override
        public void run(SourceContext<Integer> sourceContext) throws Exception {
            while (isRunning){
                currentCounter ++;
                sourceContext.collect(currentCounter);

                TimeUnit.SECONDS.sleep(1);
                if (this.currentCounter == 10){
                    System.out.println("手动抛出异常"+(1/0));
                }
            }
        }

        /**
         * 取消生产数据
         */
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

调用initializeState方法。。。。。。。。
1
2
3
4
调用snapshotState方法。。。。。。。。
5
6
7
8
调用snapshotState方法。。。。。。。。
9
10
调用initializeState方法。。。。。。。。
9
调用snapshotState方法。。。。。。。。
10
    
<==10这里出现异常了,所以保存state中的数据是9,
    初始化方法后恢复 9+1 数据,然后 10 也保存到state中了==>
    
调用initializeState方法。。。。。。。。
10
调用snapshotState方法。。。。。。。。
调用initializeState方法。。。。。。。。
11
12
13
14
调用snapshotState方法。。。。。。。。
15
16
17

总结:

  • 1- 定义数据源要继承 RichSourceFunction 父类,实现 CheckpointedFunction 接口
  • 2- 初始化 listState:getOperatorStateStore.getListState(new ListStateDescriptor<>)
  • 3- 获取历史数据:因为是listState,所以用循环 get()
  • 4- 历史值赋予当前值,清空历史数据 clear()
  • 5- 检查点之后,恢复数据就是 [ 历史数据 + 1 ]

3.3 State TTL 状态有效期

举例子:更新策略着眼于是更新日期是在哪个时候,

​ 而这里设置停留时间 .newBuilder(Time.seconds(1)) 是指状态保存多长时间,

​ 时间一过状态数据就标记过期(设置时间要比 checkpoint 时间长,才能保证 checkpoint 顺利持久化),

清除策略着眼于过期数据清理是在哪个时候

应用场景:使用 flink 进行实时计算中,会遇到一些状态数不断累积,导致状态量越来越大的情形


3.3.1 功能用法
(1) TTL 的更新策略
  • 只在创建和写入时更新:.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

  • 读取和写入时也会更新:.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)

(2) 状态数据过期但未被清除时
  • 不返回过期数据:.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  • 返回未被清除的过期数据:.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
(3) 过期数据的清除
  • 关闭后台自动清理过期数据:disableCleanupInBackground()

  • 全量快照时清理:cleanupFullSnapshot()

    • 这种策略在 RocksDBStateBackend 的增量 checkpoint 模式下无效。
  • 增量快照时清理:.cleanupIncrementally(10, true)

    • 第一个是每次清理时检查状态的条目数,在每个状态访问时触发。

      第二个参数表示是否在处理每条记录时触发清理。

  • 后台自动清理(RocksDB state backend 存储):.cleanupInRocksdbCompactFilter(1000)

    • 默认后台清理策略会每处理 1000 条数据进行一次,

    • 表示在 RocksDB 的compaction过程中,每删除1000个键值对,就会执行一次TTL过期的键值对的清理。

    • RocksDB 的 compaction是一个过程,它会合并多个小的数据库文件(SSTables)成一个大的文件


3.3.2 案例演示

例子1:10s 读取一行数据,checkpoint 60s,state 时间 5 s

package cn.itcast.day10.state;

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.util.Collector;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-03-04 18:53:52
 * @description TODO
 */
public class StateWordCount {
    public static void main(String[] args) throws Exception {
        // todo 1) 使用工具类将传入的参数解析为对象
        final ParameterTool parameters = ParameterTool.fromArgs(args);

        // todo 2) 初始化环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        // todo 3) 将传入的参数解析成参数注册到作业中
        env.getConfig().setGlobalJobParameters(parameters);

        // todo 4) 开启 checkpoint,一致性语义
        env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);

        // todo 5) 接入数据源
        DataStreamSource<String> lines = env.addSource(new SourceFunctionFile());

        lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] data = s.split(" ");
                for (String item : data) {
                    collector.collect(new Tuple2<>(item, 1));
                }
            }
        }).keyBy(t -> t.f0)
                .flatMap(new WordCountFlatMap()).printToErr();

        // todo 6)启动作业
        env.execute();
    }

    private static class SourceFunctionFile extends RichSourceFunction<String> {

        // 定义是否继续生成数据标记
        private Boolean isRunning = true;

        @Override
        public void run(SourceContext<String> sourceContext) throws Exception {
            BufferedReader bufferedReader = new BufferedReader(new FileReader("D:\\IDEA_Project\\BigData_Java\\flinkbase_pro\\data\\input\\wordcount.txt"));

            while (isRunning) {
                String line = bufferedReader.readLine();
                if (StringUtils.isBlank(line)) {
                    continue;
                }
                sourceContext.collect(line);
                TimeUnit.SECONDS.sleep(10);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }

    // 自定义聚合
    private static class WordCountFlatMap extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {

        // 定义状态
        private ValueState<Tuple2<String, Integer>> valueState = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);
            //创建ValueStateDescriptor
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("StateWordCount",
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {
                    }));

            //配置stateTTL
            StateTtlConfig ttlConfig = StateTtlConfig
                    .newBuilder(Time.seconds(5))  // 存活时间
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 创建或更新的时候修改时间
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 永不返回过期的数据
                    .cleanupFullSnapshot() // 全量快照时进行清理
                    .build();

            // 激活 stateTTL
            valueStateDescriptor.enableTimeToLive(ttlConfig);

            // 实例化 valueState 对象
            valueState = getRuntimeContext().getState(valueStateDescriptor);

        }

        @Override
        public void close() throws Exception {
            super.close();
        }

        @Override
        public void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> collector) throws Exception {

            // 获取状态数据
            Tuple2<String, Integer> currentState = valueState.value();

            // 初始化 valueState 数据
            if (currentState == null) {
                currentState = new Tuple2<>(value.f0,0);
            }
            // 累加单词的次数
            Tuple2<String, Integer> newState = new Tuple2<>(currentState.f0, currentState.f1 + value.f1);

            // 更新valueState
            valueState.update(newState);

            // 返回累加后的结果
            collector.collect(newState);
        }
    }
}

数据源:

Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
Total time BUILD SUCCESS
Final Memory Finished at
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS
BUILD SUCCESS

结果:没有累加历史值

======110s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

======210s=====
(Final,1)
(Memory,1)
(Finished,1)
(at,1)
    
======310s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

例子2:10s 读取一行数据,checkpoint 5s,state 时间 20 s 或者 21s

结果:触发累加历史值

======110s=====
(Total,1)
(time,1)
(BUILD,1)
(SUCCESS,1)

======210s=====
(Final,1)
(Memory,1)
(Finished,1)
(at,1)
    
======310s=====
(Total,2)
(time,2)
(BUILD,2)
(SUCCESS,2)

例子3:10s 读取一行数据,checkpoint 5s,state 时间 19s

结果:没有累加历史值


  • 总结1: 从例子1-3,因为第1个Total距第2个Total 20s,故 state 设置要尽可能大20s,设置太小来不及遇见第二个就过期了。


例子4:10s 读取一行数据,checkpoint 10s,state 时间 20 s

结果:触发累加历史值


例子5:10s 读取一行数据,checkpoint 60s,state 时间 20s

结果:触发累加历史值


例子6:10s 读取一行数据,checkpoint 20s,state 时间 20 s

结果:没有累加历史值


  • 总结2:从例子 4-6,checkpoint 比 state 建议小很多或大很多,不然 [10-20] 有些不能触发,[20-60] 有些能触发。


例子7:10s 读取一行数据,checkpoint 19s,state 时间 20s(之前不能触发)

(此时设置.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 返回过期但未被清除的数据)

结果:触发累加历史值


例子8:10s 读取一行数据,checkpoint 60s,state 时间 5 s(之前state小于20s不能触发)

结果:触发累加历史值


例子9:10s 读取一行数据,checkpoint 60s,state 时间 20s(之前可以触发)

(此时设置.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) // 返回过期但未被清除的数据)

结果:触发累加历史值


  • 总结3:从例子7-9,设置 ReturnExpiredIfNotCleanedUp 之前可以触发的不可以触发的,都可以触发了!


总结:

  • 1- state 设置要尽可能大于相同数据间隔s,设置太小来不及遇见第二个就过期了。
  • 2- checkpoint 比 state 建议小很多或大很多
  • 3- 设置 ReturnExpiredIfNotCleanedUp 可以避免不能触发
  • 4- BufferedReader 去 new 一个 FileReader 可以读取文件
  • 5- 激活 stateTTL:valueStateDescriptor.enableTimeToLive(ttlConfig)

3.4 Broadcast State
3.4.1 应用场景
  • 1-动态更新计算规则: 如事件流需要根据最新规则进行计算,可将规则作为广播状态广播到下游Task。
  • 2-实时增加额外字段: 如事件流需要实时增加用户基础信息,可将基础信息作为广播状态到下游Task。

3.4.2 注意事项
  • Broadcast State 是 Map 类型,即 K-V 类型

  • Broadcast State 只有在广播的一侧,即在重写方法:processBroadcastElement 方法中可修改,另一个方法只读

  • Broadcast State 在 checkpoint 时,每个 Task 都会 checkpoint(持久化)广播状态。

  • Broadcast State 在运行时保存在内存中,(flink 1.13)还不能保存在 Rocked State Backend


3.4.3 案例演示

例子:公司有10个广告位, 其广告的内容(描述和图片)会经常变动(广告到期,更换广告等)

package cn.itcast.day10.state;

/**
 * @author lql
 * @time 2024-03-05 13:54:54
 * @description TODO
 */

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * 广播状态流演示
 * 需求:公司有10个广告位, 其广告的内容(描述和图片)会经常变动(广告到期,更换广告等)
 * 实现:
 * 1)通过socket输入广告id(事件流)
 * 2)关联出来广告的信息打印出来,就是广告发生改变的时候,能够感知到(规则流)
 */
public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2) 设置checkpoint周期运转
        env.enableCheckpointing(5000L);

        //todo 3) 构建数据流 (我输入的是字符串数字,需要转化为整数类型)
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);
        SingleOutputStreamOperator<Integer> adIdDataStream = lines.map(new MapFunction<String, Integer>() {
            @Override
            public Integer map(String value) throws Exception {
                return Integer.parseInt(value);
            }
        });

        // todo 4) 构建规则流(广告流)
        DataStreamSource<Map<Integer, Tuple2<String, String>>> adSourceStream = env.addSource(new MySourceForBroadcastFunction());
        adSourceStream.print("最新的广告信息>>>");

        // todo 5) 将规则流(广告流)转化为广播流==> 这里和之前的广播是不一样的!
        // todo 5.1 定义规则流(广告流)的描述器
        MapStateDescriptor<Integer, Tuple2<String, String>> mapStateDescriptor = new MapStateDescriptor<Integer, Tuple2<String, String>>(
                "broadcaststate",
                TypeInformation.of(new TypeHint<Integer>() {}),
                TypeInformation.of(new TypeHint<Tuple2<String,String>>(){})
        );
        // todo 5.2 运用描述器将(广告流)转化为广播流
        BroadcastStream<Map<Integer, Tuple2<String, String>>> broadcastStream = adSourceStream.broadcast(mapStateDescriptor);

        // todo 6) 将数据流和规则流合并在一起
        BroadcastConnectedStream<Integer, Map<Integer, Tuple2<String, String>>> connectedStream = adIdDataStream.connect(broadcastStream);

        // todo 7) 对关联后的数据做拉宽操作
        SingleOutputStreamOperator<Tuple2<String, String>> result = connectedStream.process(new MyBroadcastProcessFunction());

        // todo 8) 打印结果数据
        result.printToErr("拉宽后的结果>>>");

        //todo 9)启动作业
        env.execute();
    }

    /**
     * 自定义规则数据,注意 返回类型是 Map<K,V>
     */
    private static class MySourceForBroadcastFunction implements SourceFunction<Map<Integer, Tuple2<String,String>>> {

        private final Random random = new Random();
        private final List<Tuple2<String, String>> ads = Arrays.asList(
                Tuple2.of("baidu", "搜索引擎"),
                Tuple2.of("google", "科技大牛"),
                Tuple2.of("aws", "全球领先的云平台"),
                Tuple2.of("aliyun", "全球领先的云平台"),
                Tuple2.of("腾讯", "氪金使我变强"),
                Tuple2.of("阿里巴巴", "电商龙头"),
                Tuple2.of("字节跳动", "靠算法出名"),
                Tuple2.of("美团", "黄色小公司"),
                Tuple2.of("饿了么", "蓝色小公司"),
                Tuple2.of("瑞幸咖啡", "就是好喝")
        );
        private boolean isRun = true;

        @Override
        public void run(SourceContext<Map<Integer, Tuple2<String, String>>> sourceContext) throws Exception {
            while (isRun){
                // 定义一个 HashMap,用来存储键值对
                HashMap<Integer, Tuple2<String,String>> map = new HashMap<>();
                int keyCounter = 0;
                for (int i = 0; i < ads.size(); i++) {
                    keyCounter++;
                    map.put(keyCounter,ads.get(random.nextInt(ads.size())));
                }
                sourceContext.collect(map);
                TimeUnit.SECONDS.sleep(5L);
            }
        }

        @Override
        public void cancel() {
            isRun = false;
        }
    }

    private static class MyBroadcastProcessFunction extends BroadcastProcessFunction<Integer,Map<Integer, Tuple2<String, String>>, Tuple2<String, String>> {

        //定义state的描述器
        MapStateDescriptor<Integer, Tuple2<String, String>> mapStateDescriptor = new MapStateDescriptor<Integer, Tuple2<String, String>>(
                "broadcaststate",
                TypeInformation.of(new TypeHint<Integer>() {}),
                TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})
        );

        /**
         * 这个方法只读,用来拉宽操作
         * @param integer
         * @param readOnlyContext
         * @param collector
         * @throws Exception
         */
        @Override
        public void processElement(Integer integer, ReadOnlyContext readOnlyContext, Collector<Tuple2<String, String>> collector) throws Exception {
            //只读操作,意味着只能读取数据,不能修改数据,根据广告id获取广告信息
            ReadOnlyBroadcastState<Integer, Tuple2<String, String>> broadcastState = readOnlyContext.getBroadcastState(mapStateDescriptor);
            //根据广告id获取广告信息
            Tuple2<String, String> tuple2 = broadcastState.get(integer);
            //判断广告信息是否关联成功
            if(tuple2 != null) {
                collector.collect(tuple2);
            }
        }

        /**
         * 可写的,用来更新state的数据
         * @param integerTuple2Map
         * @param context
         * @param collector
         * @throws Exception
         */
        @Override
        public void processBroadcastElement(Map<Integer, Tuple2<String, String>> integerTuple2Map, Context context, Collector<Tuple2<String, String>> collector) throws Exception {
            //先读取state数据
            BroadcastState<Integer, Tuple2<String, String>> broadcastState = context.getBroadcastState(mapStateDescriptor);
            // 删除历史状态数据
            broadcastState.clear();
            //将最新获取到的广告信息进行广播操作
            broadcastState.putAll(integerTuple2Map);
        }
    }
}

结果:

最新的广告信息>>>:7> {1=(aws,全球领先的云平台), 2=(aliyun,全球领先的云平台), 3=(阿里巴巴,电商龙头), 4=(aws,全球领先的云平台), 5=(瑞幸咖啡,就是好喝), 6=(瑞幸咖啡,就是好喝), 7=(美团,黄色小公司), 8=(aws,全球领先的云平台), 9=(腾讯,氪金使我变强), 10=(字节跳动,靠算法出名)}

最新的广告信息>>>:8> {1=(美团,黄色小公司), 2=(饿了么,蓝色小公司), 3=(aws,全球领先的云平台), 4=(baidu,搜索引擎), 5=(aws,全球领先的云平台), 6=(baidu,搜索引擎), 7=(美团,黄色小公司), 8=(字节跳动,靠算法出名), 9=(瑞幸咖啡,就是好喝), 10=(腾讯,氪金使我变强)}

======我在终端输入了 3,刚好对应上面一条信息的 3 位置========
拉宽后的结果>>>:4> (aws,全球领先的云平台)

最新的广告信息>>>:1> {1=(饿了么,蓝色小公司), 2=(aliyun,全球领先的云平台), 3=(google,科技大牛), 4=(瑞幸咖啡,就是好喝), 5=(美团,黄色小公司), 6=(baidu,搜索引擎), 7=(google,科技大牛), 8=(google,科技大牛), 9=(google,科技大牛), 10=(腾讯,氪金使我变强)}

最新的广告信息>>>:2> {1=(google,科技大牛), 2=(饿了么,蓝色小公司), 3=(字节跳动,靠算法出名), 4=(aliyun,全球领先的云平台), 5=(aws,全球领先的云平台), 6=(aws,全球领先的云平台), 7=(google,科技大牛), 8=(google,科技大牛), 9=(美团,黄色小公司), 10=(aliyun,全球领先的云平台)}

======我在终端输入了 1,刚好对应上面一条信息的 1 位置========
拉宽后的结果>>>:5> (google,科技大牛)

总结:

  • 1- 这里的规则流转化为广播流操作,和之前广播分区不一样
    • 数据广播到各个分区:数据.Brocast()
    • 数据流转化为广播流:数据.Brocast(描述器)
  • 2- BroadcastProcessFunction 方法中重写的 processElement 只读方法,用来拉宽操作
    • 获取数据+关联数据(判非空后收集)
  • 3- BroadcastProcessFunction 方法中重写的 processBroadcastElement 可修改方法,
    • 读取 state 数据:getBroadcastState()
    • 删除历史数据:clear()
    • 将新的实例添加到广播状态中:putAll()

3.4.4 BroadcastState 执行思路梳理

在这里插入图片描述


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/430590.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

IISExpress 跨域cookie的奇怪问题

测试环境 WIN10&#xff0c;IIS 10&#xff0c;IISExpress 10&#xff0c;Chrome 120&#xff0c;Microsoft Edge 114 网站A 端口7001 只有1个Default.aspx&#xff0c;无前端代码。逻辑很简单&#xff0c;SetCookie用来把客户端传过来的值写入到cookie中&#xff0c;GetCoo…

RK DVP NVP6158配置 学习

NVP6158简介 NVP6158C是一款4通道通用RX&#xff0c;提供高质量图像的芯片。它接受来自摄像机和其他视频信号的独立4通道通用输入来源。它将4通道通用1M至8M 7.5P视频格式数字化并解码为代表8位ITU-R BT.656/1120 4:2:2格式的数字分量视频&#xff0c;并将单独的BT.601格式与27…

基于ERNIR3.0文本分类的开发实践

参考&#xff1a; https://zhuanlan.zhihu.com/p/574666812?utm_id0 遇到的问题&#xff1a;如下 采用paddleNLP下文本分类实例进行分类训练后发现 生成的模型分类不准。打算自己开发脚本进行分类计算再进行服务化部署。

指针数组的理解

指针数组的概念&#xff1a;即用于存放指针变量的数组 代码如下&#xff1a;使用指针数组来模拟二维数组 int main() {//创建三个整型数组int arr1[] { 1,2,3,4,5 };int arr2[] { 2,3,4,5,6 };int arr3[] { 3,4,5,6,7 };int* p_arr[3] { arr1,arr2,arr3 };for (int i 0;…

WiFi|硬体:茶凳浅谈-高通802.11be WLAN AP Chipsets 参考设计与boardData之间的映射

前言: WiFi|硬体:茶凳浅谈-高通Wi-Fi 7立项前的选型 博文中提到一些选型的组合&#xff0c;比如: 主芯片的搭配IPQ9554 QCN9274 (2x2 2.4G, 2x2 5G) QCN9272 (2x2 6G) 主芯片的搭配IPQ9574 QCN9274 (2x2 2.4G, 2x2 5GL) QCN9274 (2x2 5GH, 2x2 6G) 红色标示出差异的部分…

电脑硬件变化报警|2024完整珍藏版

公司中常常会有一种泄密事情发生&#xff0c;是这样的&#xff1a; 使用移动硬盘、外部硬盘驱动器或其他外部存储设备&#xff0c;将文件复制到这些设备上&#xff0c;然后将设备连接到另一台电脑&#xff0c;即可将文件拷贝出去。 还有人这样说&#xff1a;“我人都在面前了…

CV论文--2024.3.4

1、Deep Networks Always Grok and Here is Why 中文标题&#xff1a;深度网络总是让人摸不着头脑&#xff0c;原因如下 简介&#xff1a;本文探讨了深度神经网络&#xff08;DNN&#xff09;中一种称为"延迟泛化"或"Grokking"的现象。在接近零的训练误差…

如何在服务器上建立国外私有云存储

随着数字化时代的到来&#xff0c;数据备份和存储成为了我们生活与工作中不可或缺的一部分。私有云存储&#xff0c;以其灵活性、安全性和便捷性&#xff0c;受到了越来越多用户的青睐。特别是对于需要在国外服务器上建立私有云存储的用户来说&#xff0c;这一需求更加迫切。下…

信钰证券|飞行汽车概念走势活跃,金盾股份“20cm”涨停

飞翔汽车概念5日盘中走势活泼&#xff0c;到发稿&#xff0c;金盾股份“20cm”涨停&#xff0c;万丰奥威涨超6%&#xff0c;光洋股份涨逾5%&#xff0c;商络电子、星源卓镁涨近4%。 金盾股份强势涨停&#xff0c;公司近来在出资者互动渠道表示&#xff0c;公司和清华大学联合研…

基于深度学习的人员指纹身份识别算法matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1 指纹图像预处理与特征提取 4.2 卷积神经网络架构 4.3 特征编码与匹配 4.4 损失函数与训练 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程…

一键抠图怎么把物品抠出来?一键完成!物品抠图不再是难事!

在我们的日常生活和工作中&#xff0c;抠图已经成为了一个常见的需求。无论是为了制作一张精美的海报&#xff0c;还是为了在一篇文章中突出展示某个物品&#xff0c;抠图都能帮助我们更好地实现目标。然而&#xff0c;对于许多人来说&#xff0c;使用专业的抠图软件如Photosho…

2024年5个高性价比2C4G云服务器推荐,2核4G服务器优惠价格表

租用2核4G服务器费用价格&#xff0c;2核4G云服务器多少钱一年&#xff1f;1个月费用多少&#xff1f;阿里云2核4G服务器30元3个月、轻量应用服务器2核4G4M带宽165元一年、企业用户2核4G5M带宽199元一年&#xff1b;腾讯云轻量2核4G服务器5M带宽165元一年、252元15个月、540元三…

5G智能制造食品工厂数字孪生可视化平台,推进食品行业数字化转型

5G智能制造食品工厂数字孪生可视化平台&#xff0c;推进食品行业数字化转型。随着科技的飞速发展&#xff0c;食品工业正迎来一场前所未有的数字化转型。在这场转型中&#xff0c;5G智能制造工厂数字孪生可视化平台发挥着至关重要的作用。它不仅提高了生产效率&#xff0c;降低…

深度学习算法的基本原理

深度学习是一种机器学习方法&#xff0c;其核心是通过构建深层神经网络来学习数据的表示和特征&#xff0c;以解决各种复杂的任务。以下是深度学习算法的基本原理&#xff0c;希望对大家有所帮助。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发公司&#xff0c;欢迎…

40个Python字符串实例

Python 字符串是 Python 编程语言中最常用的数据类型之一&#xff0c;它可以表示文本或一组字符。Python 中的字符串是不可变的序列&#xff0c;意味着一旦创建&#xff0c;其值就不能被修改。下面是一些关于 Python 字符串的介绍。 概述 创建字符串&#xff1a;可以使用单引…

MWC 2024丨美格智能CEO杜国彬出席中国联通创新成果发布会并发表主题演讲

2月26日&#xff0c;中国联通在MWC2024 巴塞罗那期间举办了以“算网为基&#xff0c;智领未来”为主题的创新成果发布会&#xff0c;集中展示最新的创新成果与最佳实践。 中国通信标准化协会理事长闻库、GSMA首席财务官Louise Easterbrook、中国联通副总经理梁宝俊、华为ICT销…

[C语言]——C语言常见概念(2)

目录 一.第⼀个C语言程序 二.main函数 三.print和库函数 1.print 2.库函数 四.关键字介绍 一.第⼀个C语言程序 #include <stdio.h> int main() {printf("hello C\n");return 0;//约定返回0&#xff0c;在c语言中&#xff0c;正常返回0&#xff0c;异常…

HI3516DV500 HI3516DRFCV500 HI3516DRBCV500 海思安防监控芯片 提供原厂开发包

总体介绍 Hi3516DV500是一颗面向视觉行业推出的高 清智能SoC。该芯片最高支持2路sensor输入&#xff0c;支持最高5M30fps的ISP图像处理能力&#xff0c;支持2F WDR、多级降噪、六轴防抖、多光谱融合等多种 传统图像增强和处理算法&#xff0c;支持通过AI算法对输 入图像进行实…

安装mysql this application requires visual studio 2019 x64报错

提示 this application requires visual studio 2019 x64 缺少依赖 安装依赖 选择对应版本 安装 依赖安装地址 成功进入安装界面

redis02 安装

官网下载 传送门https://redis.io/download/#redis-downloads 安装Redis mac m1安装 下载你需要版本的软件包放到指定的目录下进行解压 cd 到解压好的redis目录 运行下面的命令进行编译测试 sudo make test 中途可能会提示你安装make工具&#xff0c;按提示安装即可&…