解密Flink的状态管理:探索流处理框架的数据保留之道,释放流处理的无限潜能!

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

    • 一、什么是状态
    • 二、应用场景
    • 三、Flink中状态的分类
    • 四、算子状态
      • 1. 列表状态(List State)
      • 2. 广播状态(Broadcast State)
    • 五、键控状态
      • 1. ValueState
      • 2. ListState
      • 3. ReducingState
      • 4. AggregatingState<IN, OUT>
        • 1)类实现累加器 - 示例代码
        • 2)元组实现累加器 - 示例代码
      • 5. MapState<UK, UV>

一、什么是状态

  在流式计算中,有些计算的中间结果需要进行保存,为下一个计算提供参考,比如,有一个数据流,我需要实时的计算这个流中的总消费金额,那么就需要一个变量来存储截止目前的总消费金额,当下一条数据来的时候我就直接在以前总消费金额的基础上,加上这条数据的消费金额就可以了。那么这个例子中的那个存储总消费金额的变量(或者说是累加变量)就叫状态。

二、应用场景

  在流式处理中,状态的应用场景非常广泛。

去重

  如果我们需要对数据流中的数据进行去重统计时,我们可以利用状态管理。通过状态来记录数据是否流过应用,当新数据流入时,根据状态来判断去重。

检测

  检测输入流中的数据是否符合某个特定的模式。这里的模式不是指数据的格式,而是指数据之间的关系是否符合某个需求模型。比如,根据一个网站访问记录流中的数据,判断用户是否连续登录,然后给予相应的奖励。

聚合

  对某个特定时间内的数据进行聚合统计分析。比如统计每小时的 PV 量。

三、Flink中状态的分类

  Flink中包括两种基本的状态Managed StateRaw State,分别是管理状态和原始状态

在这里插入图片描述

  原始状态基本用不到,因为官方提供的管理状态已经够我们使用了


  管理状态又分为两类,分别是算子状态(Operator State)和键控状态(Keyed State)

  1. 算子状态可用于所有的算子,但是常用于source算子和sink算子;他是一个算子的子任务对应一个状态,也就是一个并行度里面一个状态;它通过实现CheckpointedFunction接口创建;它支持的数据结构有ListState,UnionListStste 和 BroadCastState。

  2. 键控状态只能用于KeyedStream上的算子;这个是一个key对应一个状态,他只和key有关;创建的时候重写RichFunction,通过里面的getRuntimeContext().get…State()获取状态对象;键控状态支持的数据结构有 ValueState,ListState,MapState,ReduceState,AggregatingState

在这里插入图片描述

四、算子状态

1. 列表状态(List State)

  将状态表示为一组数据的列表。向状态中添加元素add()、更新状态中的所有元素update(),取出状态中的所有元素get(),它会返回一个迭代器。


例:将输入的单词存入到状态中,当程序重启的时候,可以把状态中存的单词恢复。

示例代码:

public class Flink01_State_Operator_List {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        // 启用checkpoint 周期是2000毫秒,也就是2秒,每隔2s将状态保存一下
        env.enableCheckpointing(2000);

        env
                .socketTextStream("hadoop101",9999)
                .map(new MyMapFunction())
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 算子状态,不能用内部类了,因为要实现两个接口,算子状态要实现CheckpointedFunction接口
    private static class MyMapFunction implements MapFunction<String,String>, CheckpointedFunction {

        List<String> words = new ArrayList();
        private ListState<String> wordsState;

        @Override
        public String map(String line) throws Exception {
	        //抛个异常他就会自动重启,输入x就让他抛异常
            if (line.contains("x")) {
                throw new RuntimeException("手动抛出异常...");  
            }
            String[] data = line.split(" ");
            words.addAll(Arrays.asList(data));
            return words.toString();
        }

        // 保存状态:周期性的执行
        // 每个并行度都会周期性的执行
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 这个方法是把数据存入到算子状态(状态列表)
//            System.out.println("MyMapFunction.snapshotState");
            //            wordsState.clear();       清空状态
//            wordsState.addAll(words);   向状态中写数据
            // 上面两个方法能用下面这一个方法代替
            wordsState.update(words);
        }

        // 程序启动的时候每个并行度执行一次
        // 这个方法可以把状态中的数据恢复到Java的集合中
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 从状态中恢复数据
            //System.out.println("MyMapFunction.initializeState");
            System.out.println("程序重启,从状态中恢复数据...");
            // 获取列表状态
            wordsState = context.getOperatorStateStore().getListState(new ListStateDescriptor<String>("wordsState", String.class));
            // 从列表中获取数据
            // 将状态中的数据遍历出来,在添加到集合中,也就是恢复数据
//            Iterable<String> it = wordsState.get();
            for (String word : wordsState.get()) {
                words.add(word);
            }
        }
    }
}

输入数据:
在这里插入图片描述

运行结果:
在这里插入图片描述

2. 广播状态(Broadcast State)

  广播状态一般是两个流用,一个数据流,一个广播流,用广播流中的数据控制数据流中数据的处理逻辑。向状态里面写数据用put(),从状态里面拿数据用get()


例:通过广播流输入1,2,3…控制数据流中的数据使用不同的处理逻辑

示例代码:

public class Flink03_State_Operator_BroadCast {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        //获取一个数据流
        DataStreamSource<String> dataStream = env.socketTextStream("hadoop101", 8888);

        // 获取一个配置流
        DataStreamSource<String> configStream = env.socketTextStream("hadoop101", 9999);

        // 1. 把配置流做成一个广播流   需要一个map状态描述器   一个key的类型,一个value
        MapStateDescriptor<String, String> bcStateDesc = new MapStateDescriptor<>("bcState", String.class, String.class);
        BroadcastStream<String> bcStream = configStream.broadcast(bcStateDesc);

        // 2. 让数据流去connect广播流
        BroadcastConnectedStream<String, String> coStream = dataStream.connect(bcStream);

        // 泛型分别表示,数据流类型,广播流类型,输出类型
        coStream.process(new BroadcastProcessFunction<String, String, String>() {
            // 4. 处理数据流中的数据:从广播状态中取配置
            @Override
            public void processElement(String value,
                                       ReadOnlyContext ctx,
                                       Collector<String> out) throws Exception {
                System.out.println("Flink03_State_Operator_BroadCast.processElement");
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
                String conf = broadcastState.get("aSwitch");

                if("1".equals(conf)){
                    out.collect(value +" 使用 1 号逻辑...");
                }else if ("2".equals(conf)){
                    out.collect(value +" 使用 2 号逻辑...");
                }else if ("3".equals(conf)){
                    out.collect(value +" 使用 3 号逻辑...");
                }else {
                    out.collect(value +" 使用 default 号逻辑...");
                }
            }
            // 3. 把广播流中的数据放入到广播状态
            @Override
            public void processBroadcastElement(String value, // 广播流中的数据
                                                Context ctx,  // 上下文
                                                Collector<String> out) throws Exception {
                System.out.println("Flink03_State_Operator_BroadCast.processBroadcastElement");
                // 获取广播状态,把配置信息写入到状态中
                BroadcastState<String, String> broadcastState = ctx.getBroadcastState(bcStateDesc);
                broadcastState.put("aSwitch",value);
            }
        })
                .print();


        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:

在这里插入图片描述

运行结果:

在这里插入图片描述

五、键控状态

1. ValueState

  保存单个值. 每个key有一个状态值. 向状态中保存数据使用 update(T)方法, 获取状态中的数据使用value()方法。


例:检测传感器的水位值,如果连续的两个水位值超过10,就输出报警。

示例代码:

public class Flink04_State_Key_Value {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    // 状态
                    private ValueState<Integer> lastVcState;

                    // 每个并行度执行一次   初始化的时候执行一次
                    @Override
                    public void open(Configuration parameters) throws Exception {
//                        System.out.println("Flink04_State_Key_Value.open");
                        // 因为他已经把状态封装在运行时上下文了,所以直接获取就行了
                        lastVcState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastVcState", Integer.class));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        // 获取状态里面的值  通过 .value() 方法
                        Integer lastVc = lastVcState.value();

                        System.out.println(lastVc+ "" +
                                "  " + value.getVc());

                        if (lastVc != null) {
                            if (value.getVc() >10 && lastVc > 10) {
                                out.collect(ctx.getCurrentKey()+" 连续两次超过10,发出红色预警...");
                            }
                        }

                        // 更新状态的值   只能保存一个值,所以用update更新
                        lastVcState.update(value.getVc());
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:

在这里插入图片描述

运行结果:

在这里插入图片描述

2. ListState

  保存元素列表。添加一个元素用add(T),添加多个元素用addAll(List<T>),获取元素用get()他会返回一个迭代器,可遍历出每个元素,覆盖所有元素用update(List<T>)


例:针对每个传感器输出最高的3个水位值

示例代码:

public class Flink05_State_Key_List {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port",1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    private ListState<Integer> top3VcState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        top3VcState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("top3VcState", Integer.class));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        // 因为用的是list状态,可以存多个值,所以每来一个数据要先存进状态
                        top3VcState.add(value.getVc());
                        // 获取状态里面的元素
                        Iterable<Integer> iterable = top3VcState.get();

                        List<Integer> list = AnqclnUtil.toList(iterable);

                        list.sort(new Comparator<Integer>() {
                            @Override
                            public int compare(Integer o1, Integer o2) {
                                return o2.compareTo(o1);
                            }
                        });

                        // 因为要取的是前三,所以第四个元素进来的时候就不要了
                        if (list.size() ==4){
                            list.remove(list.size()-1);
                        }

                        top3VcState.update(list);

                        out.collect(ctx.getCurrentKey()+" 最高的三个水位值:"+list);
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:
在这里插入图片描述

运行结果:
在这里插入图片描述

3. ReducingState

  存储单个值,表示把所有元素的聚合结果添加到状态中,当向状态中添加元素的时候,他会使用指定的ReduceFunction进行聚合。添加元素是add(T),取出元素是get()


例:计算每个传感器的水位和

示例代码:

public class Flink06_State_Key_Reduce {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {


                    private ReducingState<WaterSensor> vcSumState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        vcSumState = getRuntimeContext().getReducingState(
                                new ReducingStateDescriptor<WaterSensor>(
                                        "vcSumState",
                                        new ReduceFunction<WaterSensor>() {
                                            @Override
                                            public WaterSensor reduce(WaterSensor value1,
                                                                      WaterSensor value2) throws Exception {
                                                value1.setVc(value1.getVc() + value2.getVc());
                                                return value1;
                                            }
                                        },
                                        WaterSensor.class));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        // 将传过来的每个元素加入到状态里面去,然后就行了,他会自己聚合,因为在上面创建状态的时候就已经写了聚合的逻辑
                        vcSumState.add(value);
                        out.collect(ctx.getCurrentKey()+" 的水位和为:"+vcSumState.get().getVc());
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:

在这里插入图片描述
运行结果:

在这里插入图片描述

4. AggregatingState<IN, OUT>

  存储单个值。 与ReducingState类似, 都是进行聚合。 不同的是,AggregatingState的聚合的结果和输入的元素类型可以不一样。存数据用add(),取数据用get()


例:计算每个传感器的平均水位

1)类实现累加器 - 示例代码

public class Flink07_State_Key_Aggregate {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {


                    private AggregatingState<WaterSensor, Double> avgVcState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        avgVcState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<WaterSensor, Avg, Double>(
                                "avgVcState",
                                new AggregateFunction<WaterSensor, Avg, Double>() {
                                    @Override
                                    public Avg createAccumulator() {
                                        return new Avg();
                                    }

                                    @Override
                                    public Avg add(WaterSensor value, Avg acc) {
                                        acc.sum += value.getVc();
                                        acc.count++;
                                        return acc;
                                    }

                                    @Override
                                    public Double getResult(Avg acc) {
                                        return acc.avg();
                                    }

                                    @Override
                                    public Avg merge(Avg a, Avg b) {
                                        return null;
                                    }
                                },
                                Avg.class
                        ));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        avgVcState.add(value);

                        out.collect(ctx.getCurrentKey()+" 的平均水位:"+avgVcState.get());
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static class Avg {
        public Integer sum = 0;
        public Long count = 0L;

        public Double avg(){
            return sum *1.0 / count;
        }
    }
}

输入数据:

在这里插入图片描述

运行结果:

在这里插入图片描述

2)元组实现累加器 - 示例代码

public class Flink08_State_Key_Aggregate_Tuple2 {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    private AggregatingState<WaterSensor, Double> avgVcState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        avgVcState = getRuntimeContext().getAggregatingState(new AggregatingStateDescriptor<WaterSensor, Tuple2<Integer, Long>, Double>(
                                "avgVcState",
                                new AggregateFunction<WaterSensor, Tuple2<Integer, Long>, Double>() {
                                    @Override
                                    public Tuple2<Integer, Long> createAccumulator() {
                                        return new Tuple2<Integer, Long>(0,0L);
                                    }

                                    @Override
                                    public Tuple2<Integer, Long> add(WaterSensor value, Tuple2<Integer, Long> acc) {
                                        acc.f0 += value.getVc();
                                        acc.f1++;
                                        return acc;
                                    }

                                    @Override
                                    public Double getResult(Tuple2<Integer, Long> acc) {
                                        return acc.f0 * 1.0 / acc.f1;
                                    }

                                    @Override
                                    public Tuple2<Integer, Long> merge(Tuple2<Integer, Long> a, Tuple2<Integer, Long> b) {
                                        return null;
                                    }
                                },
//                                TypeInformation.of(new TypeHint<Tuple2<Integer, Long>>() {})
                                // 类型还可以这样声明,简单
                                Types.TUPLE(Types.INT,Types.LONG)
                        ));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        avgVcState.add(value);
                        out.collect(ctx.getCurrentKey()+" 的平均水位:"+avgVcState.get());
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:

在这里插入图片描述

运行结果:

在这里插入图片描述

5. MapState<UK, UV>

  存储键值对列表。

  添加键值对: put(UK, UV)putAll(Map<UK, UV>)

  根据key获取值: get(UK)

  获取所有: entries()keys()values()

  检测是否为空: isEmpty()


例:去重: 去掉重复的水位值. 思路: 把水位值作为MapState的key来实现去重, value随意

示例代码:

public class Flink09_State_Key_Map {
    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 1000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env
                .socketTextStream("hadoop101", 9999) // socket只能是1
                .map(line -> {
                    String[] data = line.split(",");
                    return new WaterSensor(
                            data[0],
                            Long.valueOf(data[1]),
                            Integer.valueOf(data[2])
                    );
                })
                .keyBy(WaterSensor::getId)
                // 键控状态必须在keyBy后使用
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    private MapState<Integer, Object> vcMapState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        vcMapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Object>(
                                "vcMapState",
                                TypeInformation.of(new TypeHint<Integer>() {}),
                                TypeInformation.of(new TypeHint<Object>() {})
                        ));
                    }

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {

                        vcMapState.put(value.getVc(),new Object());

                        Iterable<Integer> keys = vcMapState.keys();


                        out.collect(ctx.getCurrentKey() + " 的所有不同水位: " + AnqclnUtil.toList(keys));
                    }
                })
                .print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

输入数据:

在这里插入图片描述

运行结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/72964.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

平替 Docker - 玩转容器新利器 Podman Desktop (视频)

《OpenShift 4.x HOL教程汇总》 在 podman-desktop 1.2.1 podman 4.4 环境中验证。 文章目录 什么是 podman 和 podman-desktop安装 podman 和 podman-desktop 基本环境Image、Container 和 Pod 的基本操作拉取 Image运行 Container 将 Pod 部署到 Kubernetes安装 Kind 扩展插…

在 IntelliJ IDEA 中使用 Docker 开发指南

目录 一、IDEA安装Docker插件 二、IDEA连接Docker 1、Docker for Windows 连接 2、SSH 连接 3、Connection successful 连接成功 三、查看Docker面板 四、使用插件生成镜像 一、IDEA安装Docker插件 打开 IntelliJ IDEA&#xff0c;点击菜单栏中的 "File" -&g…

面试热题(最大子数组和)

给你一个整数数组 nums &#xff0c;请你找出一个具有最大和的连续子数组&#xff08;子数组最少包含一个元素&#xff09;&#xff0c;返回其最大和。 子数组 是数组中的一个连续部分。 输入&#xff1a;nums [-2,1,-3,4,-1,2,1,-5,4] 输出&#xff1a;6 解释&#xff1a;连续…

一台阿里云服务器怎么部署多个网站?以CentOS系统为例

本文阿里云百科介绍如何在CentOS 7系统的ECS实例上使用Nginx搭建多个Web站点。本教程适用于熟悉Linux操作系统&#xff0c;希望合理利用资源、统一管理站点以提高运维效率的用户。比如&#xff0c;您可以在一台云服务器上配置多个不同分类的博客平台或者搭建多个Web站点实现复杂…

二叉搜索树K和KV结构模拟

一 什么是二叉搜索树 这个的结构特性非常重要&#xff0c;是后面函数实现的结构基础&#xff0c;二叉搜索树的特性是每个根节点都比自己的左树任一节点大&#xff0c;比自己的右树任一节点小。 例如这个图&#xff0c; 41是根节点&#xff0c;要比左树大&#xff0c;比右树小&…

yo!这里是STL::list类简单模拟实现

目录 前言 重要接口实现 框架 默认成员函数 迭代器&#xff08;重点&#xff09; 1.引言 2.list迭代器类实现 3.list类中调用实现 增删查改 后记 前言 我们知道&#xff0c;stl中的vector对应数据结构中的顺序表&#xff0c;string类对应字符串&#xff0c;而今天要…

[C++ 网络协议] 套接字和地址族、数据序列

目录 1. 套接字 1.1 在Linux平台下构建套接字 1.1.1 用于接听的套接字(服务器端套接字) 1.1.2 用于发送请求的套接字(客户端套接字) 1.2 在Windows平台下构建套接字 1.2.1 Winsock的初始化 1.2.2 用于接听的套接字(服务器端套接字) 1.2.3 用于发送请求的套接字(客户端套…

Flink多流处理之coGroup(协同分组)

这篇文章主要介绍协同分组coGroup的使用,先讲解API代码模板,后面会结图解介绍coGroup是如何将流中数据进行分组的. 1 API介绍 数据源# 左流数据 ➜ ~ nc -lk 6666 101,Tom 102,小明 103,小黑 104,张强 105,Ken 106,GG小日子 107,小花 108,赵宣艺 109,明亮# 右流数据 ➜ ~ n…

【C与C++的相互调用方法】

C与C的相互调用方法 C与C为什么相互调用的方式不同C中调用CC中调用C致谢 C与C为什么相互调用的方式不同 C 和 C 之间的相互调用方式存在区别&#xff0c;主要是由于 C 和 C 语言本身的设计和特性不同。 函数调用和参数传递方式不同&#xff1a;C 和 C 在函数调用和参数传递方面…

docker — 容器网络

一、概述 Docker容器每次重启后容器ip是会发生变化的。 这也意味着如果容器间使用ip地址来进行通信的话&#xff0c;一旦有容器重启&#xff0c;重启的容器将不再能被访问到。 而Docker 网络就能够解决这个问题。 Docker 网络主要有以下两个作用&#xff1a; 容器间的互联…

docker部署springboot

基础知识 什么是docker 官网&#xff1a; Docker Docs: How to build, share, and run applications | Docker Documentation Docker 是一个基于go语言开发的开源的应用容器引擎&#xff0c;让开发者可以打包他们的应用以及依赖包到一个可移植的容器中&#xff0c;然后发布到…

97. Interleaving String 72. Edit Distance 121. 122. 123

​​​​​​97. Interleaving String 72. Edit Distance 一个bottomup&#xff08;棋盘从右下角外围逼近[0,0]&#xff09;如果横轴是string1的index i&#xff0c;纵轴string2的index j&#xff0c;那么&#xff0c;很奇妙的是i和j一起&#xff08;从右下角的格子看&#xf…

11.Eclipse 注释模板的说明及设置

1.在eclipse中点击Window——>java——>Code Style——>CodeTemplates——>Comments 2.常用Variable 3. 我的注释模板 ①Files 文件 /** * Title: ${file_name}* Description: ${todo}* author Jeremy* date ${currentDate:date(yyyy-MM-dd hh:mm:ss)} */ ②Typ…

Kotlin入门:变量和函数——02

目录 一、Kotlin 基本数据类型 ​编辑 二、变量 val 关键字&#xff1a; var 关键字: 类型推断: 可空类型: 三、函数 基本函数语法&#xff1a; 单表达式函数&#xff1a; 默认参数值&#xff1a; 命名参数&#xff1a; 一、Kotlin 基本数据类型 Kotlin 的基本数…

树结构--介绍--二叉树遍历的递归实现

目录 树 树的学术名词 树的种类 二叉树的遍历 算法实现 遍历命名 二叉树的中序遍历 二叉树的后序遍历 二叉树的后序遍历迭代算法 二叉树的前序遍历 二叉树的前序遍历迭代算法 树 树是一种非线性的数据结构&#xff0c;它是由n(n≥0)个有限节点组成一个具有层次关系…

中电金信:ChatGPT一夜爆火,知识图谱何以应战?

随着ChatGPT的爆火出圈 人工智能再次迎来发展小高潮 那么作为此前搜索领域的主流技术 知识图谱前路又将如何呢&#xff1f; 事实上&#xff0c;ChatGPT也并非“万能”&#xff0c;作为黑箱模型&#xff0c;ChatGPT很难验证生成的知识是否准确。并且ChatGPT是通过概率模型执行推…

Django入门

Day1 django环境安装 创建虚拟环境 # step1 创建虚拟环境 python3 -m venv datawhale_django # step2 mac进入虚拟环境 source ./datawhale_django/bin/activate # step3 退出虚拟环境 deactivate安装包 pip3 install django ​pip3 install djangorestframework​​ pip3 …

Jenkins自动化打包脚本

一、背景 jenkins可以设置定时任务打包&#xff0c;也已手动点按钮打包&#xff0c;还可以通过执行http请求打包&#xff0c;今天我们就通过shell脚本&#xff0c;通过curl命令进行jenkins打包。 二、步骤 2.1 在jenkins上构建项目 设置触发器 2.2 通过shell脚本触发远程构…

电商财务新时代:轻松自动对账,财务效率倍增

电商领域频繁的多平台财务对账常常令企业头痛不已。然而&#xff0c;随着轻易云数据集成平台的崭新解决方案&#xff0c;财务对账的痛点迎刃而解。本文通过引人入胜的实例&#xff0c;深入探讨电商财务对账的现状&#xff0c;突出轻易云数据集成平台在自动对账中的强大作用&…

感受RFID服装门店系统的魅力

嘿&#xff0c;亲爱的时尚追随者们&#xff01;今天小编要给你们带来一股时尚新风潮&#xff0c;让你们感受一下什么叫做“RFID服装门店系统”&#xff0c;这个超酷的东西&#xff01; 别着急&#xff0c;先别翻白眼&#xff0c;小编来解释一下RFID是什么玩意儿。它是射频识别…