大数据之Flink(三)

9.3、转换算子
9.3.1、基本转换算子
9.3.1.1、映射map

一一映射

package transform;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 */
public class MapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<Object> map = sensorDataStreamSource.map((v) -> {
            return v.getId();
        });
        map.print();
        env.execute();
    }
}

9.3.1.2、过滤

转换操作,对数据流进行过滤,通过布尔条件表达式设置过滤条件,true正常输出,false被过滤掉

package transform;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 * s1数据:一进一出
 * s2数据:一进二出
 * s3数据:一进零出
 */
public class FilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<WaterSensor> filter = sensorDataStreamSource.filter((v) -> {
            return "s1".equals(v.getId());
        });
        filter.print();
        env.execute();
    }
}

9.3.1.3、扁平映射flatMap

将数据流中的整体拆分成个体使用。消费一个元素可产生多个元素。(一进多出)flatMap为flatten和map的结合,即按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理。

package transform;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @Title: MapDemo
 * @Author lizhe
 * @Package transform
 * @Date 2024/5/31 19:55
 * @description:
 */
public class FlatMapDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                new WaterSensor("s1", 1L, 11),
                new WaterSensor("s2", 2L, 22),
                new WaterSensor("s3", 3L, 3)
        );
        SingleOutputStreamOperator<String> flatmap = sensorDataStreamSource.flatMap(new FlatMapFunction<WaterSensor, String>() {
            @Override
            public void flatMap(WaterSensor value, Collector<String> out) throws Exception {
                if ("s1".equals(value.getId())) {
                    out.collect(String.valueOf(value.getVc()));
                } else if ("s2".equals(value.getId())) {
                    out.collect(String.valueOf(value.getTs()));
                    out.collect(String.valueOf(value.getVc()));
                }
            }
        });
        flatmap.print();
        env.execute();
    }
}

map使用的是return来控制一进一出,flatMap使用Collector,可调用多次采集器实现一进多出

9.3.1.4、聚合算子Aggregation

计算结果不仅依赖当前数据,还与之前的数据有关

  1. 按键分区keyby

    DataStream没有直接聚合的API。在flink中要聚合先进行可以不用keyby分区。keyby通过指定key将一条流划分成不同的分区,分区就是并行处理的子任务。

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    /**
     * @Title: MapDemo
     * @Author lizhe
     * @Package transform
     * @Date 2024/5/31 19:55
     * @description:
     * s1数据:一进一出
     * s2数据:一进二出
     * s3数据:一进零出
     */
    public class KeybyDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(2);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 11),
                    new WaterSensor("s1",11L,11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
            * 按照id分组
            * 返回一个键控流KeyedStream,keyBy不是算子
            * keyby分组与分区的关系:
            * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
            * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
            * */
                 KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            keyBy.print();
            env.execute();
        }
    }
    
    
  2. 简单聚合

    按键分区后可以进行聚合操作,基本的API包括:sum、min、max、minBy、maxBy。

    sum

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 11),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
            SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
            sum.print();
    
    //        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();
    
    //        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();
            env.execute();
        }
    }
    
    

    max

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();
    
            SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
            max.print();
    
    //        SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
    //        maxBy.print();
            env.execute();
        }
    }
    
    

    输出结果:

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=1, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    maxby

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class SimpleAggDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            //传位置索引适用于tuple类型,不适合pojo类型
    //        SingleOutputStreamOperator<WaterSensor> result = keyBy.sum(2);
    //        SingleOutputStreamOperator<WaterSensor> sum = keyBy.sum("vc");
    //        sum.print();
    
    //        SingleOutputStreamOperator<WaterSensor> max = keyBy.max("vc");
    //        max.print();
    
            SingleOutputStreamOperator<WaterSensor> maxBy = keyBy.maxBy("vc");
            maxBy.print();
            env.execute();
        }
    }
    
    

    输出结果

    WaterSensor{id='s1', ts=1, vc=1}
    WaterSensor{id='s1', ts=11, vc=11}
    WaterSensor{id='s2', ts=2, vc=22}
    WaterSensor{id='s3', ts=3, vc=3}
    

    max与maxby对比(min与minby同理):

    max只会取比较字段的最大值,非比较字段保留第一次的值

    maxby会取比较字段最大值这个对象

  3. 规约函数Reduce

    reduce:两两聚合,每个key第一条数据直接存起来并输出,聚合的结果作为下一次的第一条数据

    package aggreagte;
    
    import bean.WaterSensor;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    
    public class ReduceDemo {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStreamSource<WaterSensor> sensorDataStreamSource = env.fromElements(
                    new WaterSensor("s1", 1L, 1),
                    new WaterSensor("s1", 11L, 11),
                    new WaterSensor("s1", 22L, 22),
                    new WaterSensor("s2", 2L, 22),
                    new WaterSensor("s3", 3L, 3)
            );
            /*
             * 按照id分组
             * 返回一个键控流KeyedStream,keyBy不是算子
             * keyby分组与分区的关系:
             * 1)keyby对数据进行分组,保证相同key的数据在同一个分区
             * 2)分区:一个子任务可以理解为一个分区,一个分区(子任务)中可以存在多个分组(key)
             * */
            KeyedStream<WaterSensor, String> keyBy = sensorDataStreamSource.keyBy(new KeySelector<WaterSensor, String>() {
                @Override
                public String getKey(WaterSensor value) throws Exception {
                    return value.getId();
                }
            });
            SingleOutputStreamOperator<WaterSensor> reduce = keyBy.reduce(new ReduceFunction<WaterSensor>() {
                @Override
                public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                    System.out.println("value1="+value1);
                    System.out.println("value2="+value2);
                    return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
                }
            });
            reduce.print();
            env.execute();
        }
    }
    
9.3.1.5、自定义函数及分区

自定义函数分为:函数类、匿名函数、富函数类

物理分区即数据进入到多个线程中的哪个线程。常见分区策略:随机分配、轮询分配、重缩放、广播。

轮询(rebalance):一般一个source对应一个kafka的partition,如果partition数据源不均匀(数据倾斜),可通过轮询分配进行负载均衡。

缩放(rescale):实现轮询,局部组队,比rebalance高效。

广播(broadcast):下发到下游所有子任务

9.3.1.6、分流

将一条数据拆分成完全独立的两条或多条流。基于一个DataStream通过筛选条件将符合条件的数据放到对应的流里。
在这里插入图片描述
只要针对同一条流进行多次独立调用filter()方法进行筛选就可以得到拆分之后的流,但是效率较低,所有数据都要过滤多次

package split;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class SplitByFilterDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.socketTextStream("192.168.132.101", 7777);
        dataStreamSource.filter(value -> Integer.parseInt(value)%2==0).print("偶数流");
        dataStreamSource.filter(value -> Integer.parseInt(value) % 2 == 1).print("奇数流");
        env.execute();
    }
}

使用侧输出流实现分流,可实现数据筛选、告警等

  1. 使用process算子
  2. 定义OutputTag对象
  3. 调用ctx.output
  4. 通过主流获取侧输出流
package split;

import bean.WaterSensor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import java.lang.reflect.Type;


public class SideOutputDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //如果是s1放到侧输出流s1中
        OutputTag<WaterSensor> s1 = new OutputTag<>("s1", Types.POJO(WaterSensor.class));
        //如果是s2放到侧输出流s2中
        OutputTag<WaterSensor> s2 = new OutputTag<>("s2", Types.POJO(WaterSensor.class));
        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );

                } );
        SingleOutputStreamOperator<WaterSensor> process = dataStreamSource.process(new ProcessFunction<WaterSensor, WaterSensor>() {
            @Override
            public void processElement(WaterSensor value, Context ctx, Collector<WaterSensor> out) throws Exception {
                String id = value.getId();
                if (id.equals("s1")) {


                    ctx.output(s1, value);
                } else if (id.equals("s2")) {

                    ctx.output(s2, value);
                } else {
                    //其他放到主流中
                    out.collect(value);
                }
            }
        });
        //打印主输出流
        process.print("主输出流");
        //打印侧输出流
        process.getSideOutput(s1).print("s1侧输出流");
        process.getSideOutput(s2).print("s2侧输出流");
        env.execute();
    }
}

9.3.1.7、合流

1、联合union

最简单的合流操作就是将多条流合到一起,要求流中的数据类型必须相同,合并后新流包括所有流的元素,数据类型不变,一次可以合并多条流。
在这里插入图片描述

package combineDemo;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


public class UnionDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<Integer> source2 = env.fromElements(11, 22, 33, 44, 55);
        DataStreamSource<String> source3 = env.fromElements("111", "222","333","444","555");
        DataStream<Integer> union = source1.union(source2).union(source3.map(value -> Integer.parseInt(value)));
        union.print();
        env.execute();
    }
}

2、连接connect
为合并不同数据类型的数据flink提供connect合流操作。connect连接后得到的是ConnectedStream,形式上统一但内部内部各自数据形式不变,彼此之间相互独立。如要得到新的DataStream要使用“同处理”(co-process),如map、flatmap等,各自处理各自的。

connect一次只能连接两条流。

package combineDemo;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;


public class ConnectDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> source1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> source2 = env.fromElements("111", "222","333","444","555");
        ConnectedStreams<Integer, String> connect = source1.connect(source2);
        SingleOutputStreamOperator<String> map = connect.map(new CoMapFunction<Integer, String, String>() {
            @Override
            public String map1(Integer value) throws Exception {
                return value.toString();
            }

            @Override
            public String map2(String value) throws Exception {
                return value;
            }
        });
        map.print();
        env.execute();
    }
}

ConnectedStreams可以直接调用keyBy()进行按键分区得到的还是一个ConnectedStreams,通过keyBy()将两条流中key相同的数据放到了一起,然后针对来源的流再各自处理。(类似inner join)

package combineDemo;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ConnectKeybyDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Tuple2<Integer, String>> source1 = env.fromElements(
                Tuple2.of(1, "a1"),
                Tuple2.of(1, "a2"),
                Tuple2.of(2, "b"),
                Tuple2.of(3, "c")
        );
        DataStreamSource<Tuple3<Integer, String,Integer>> source2 = env.fromElements(
                Tuple3.of(1, "aa1",1),
                Tuple3.of(1, "aa2",2),
                Tuple3.of(2, "bb",1),
                Tuple3.of(3, "cc",1)
        );
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> connect = source1.connect(source2);
        //多并行度下,要根据关联条件进行keyby,才能保证key相同的数据在一个子任务(线程)里,这样才能匹配上
        ConnectedStreams<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>> keyBy = connect.keyBy(s1 -> s1.f0, s2 -> s2.f0);
        SingleOutputStreamOperator<String> process = keyBy.process(new CoProcessFunction<Tuple2<Integer, String>, Tuple3<Integer, String, Integer>, String>() {
            //每条流定一个hashmap用来存储数据
            Map<Integer, List<Tuple2<Integer, String>>> s1Cache = new HashMap<>();
            Map<Integer, List<Tuple3<Integer, String, Integer>>> s2Cache = new HashMap<>();

            @Override
            public void processElement1(Tuple2<Integer, String> value, Context ctx, Collector<String> out) throws Exception {
                Integer id = value.f0;
                //先把s1数据存到map中
                if(!s1Cache.containsKey(id)){
                    ArrayList<Tuple2<Integer, String>> s1Value = new ArrayList<>();
                    s1Value.add(value);
                    s1Cache.put(id, s1Value);
                }else {
                    s1Cache.get(id).add(value);
                }
                if (s2Cache.containsKey(id)){
                    for (Tuple3<Integer, String, Integer> s2Element : s2Cache.get(id)) {
                        out.collect("s1"+value+"---------"+"s2"+s2Element);
                    }
                }
            }

            @Override
            public void processElement2(Tuple3<Integer, String, Integer> value, Context ctx, Collector<String> out) throws Exception {
                Integer id = value.f0;
                //先把s1数据存到map中
                if(!s2Cache.containsKey(id)){
                    ArrayList<Tuple3<Integer, String, Integer>> s2Value = new ArrayList<>();
                    s2Value.add(value);
                    s2Cache.put(id, s2Value);
                }else {
                    s2Cache.get(id).add(value);
                }
                if (s1Cache.containsKey(id)){
                    for (Tuple2<Integer, String> s1Element : s1Cache.get(id)) {
                        out.collect("s1"+s1Element+"---------"+"s2"+value);
                    }
                }
            }
        });
        process.print();
        env.execute();
    }
}

9.4、输出算子sink

将计算结果写到外部存储
在这里插入图片描述
输出到外部系统参考官网。

9.4.1、输出到文件FileSink
package sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;
import java.time.ZoneId;
import java.util.concurrent.TimeUnit;


public class SinkFile {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //必须开启,否则文件一直是.inprogress
        env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource<String> source = env.fromElements("111", "222","333","444","555");
        FileSink<String> sink = FileSink
                //官网示例
                .forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toSeconds(5))
//                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024L)
                                .build())
                .build();

//        FileSink<String> fileSink = FileSink
//                //输出行式存储文件
//                .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>())
//                //输出文件配置
//                .withOutputFileConfig(
//                        OutputFileConfig.builder()
//                                .withPartPrefix("test")
//                                .withPartSuffix(".log")
//                                .build()
//                )
//                //文件分桶
//                .withBucketAssigner(new DateTimeBucketAssigner<>("yy-MM-dd", ZoneId.systemDefault()))
//                //文件滚动策略
//                .withRollingPolicy(DefaultRollingPolicy.builder()
//                        .withRolloverInterval(5L * 1000L)
//                        .withMaxPartSize(1L * 1024L)
//                        .build()
//                ).build();
        source.sinkTo(sink);


        env.execute();

    }
}

9.4.2、输出到kafka

参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/kafka/

package sink;

import bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.table.KafkaSinkSemantic;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;
import java.util.Properties;


public class SinkKafkaDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.132.100:9092,192.168.132.101:9092,192.168.132.102:9092");
        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KafkaSerializationSchema<WaterSensor> serializationSchema = new KafkaSerializationSchema<WaterSensor>() {
            @Override
            public ProducerRecord<byte[], byte[]> serialize(WaterSensor s,Long time  ) {
                return new ProducerRecord<>(
                        "test", // target topic
                        s.toString().getBytes(StandardCharsets.UTF_8)); // record contents
            }
        };
        dataStreamSource.addSink(new FlinkKafkaProducer<WaterSensor>(
                "test",serializationSchema,properties,FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        ));
        env.execute();

    }
}

![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/c12239189c82417f8d17f9f8312dcf97.png)

##### 9.4.3、输出到MySQL

参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/jdbc/

```java
package sink;

import bean.WaterSensor;
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.nio.charset.StandardCharsets;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;


public class SinkMysqlDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        dataStreamSource.print();
        SinkFunction<WaterSensor> waterSensorSinkFunction = JdbcSink.sink(
                "insert into ws (id,ts,vc)  values (?, ?, ?)",                       // mandatory
                new JdbcStatementBuilder<WaterSensor>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                        preparedStatement.setString(1, waterSensor.id);
                        preparedStatement.setLong(2, waterSensor.ts);
                        preparedStatement.setInt(3, waterSensor.vc);

                    }
                },                  // mandatory
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),                  // optional
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://localhost:3306/testflink?" +
                                "autoReconnect=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai")
//                        .withDriverName("org.Mysql.Driver")
                        .withUsername("root")
                        .withPassword("123")
                        .withConnectionCheckTimeoutSeconds(60)
                        .build()                  // mandatory
        );
        dataStreamSource.addSink(waterSensorSinkFunction);
        env.execute();

    }
}

10、时间和窗口

窗口一般是划定的一段时间范围,即时间窗。窗口本事是截取有界数据的一种方式,对这个范围内的数据进行处理。

10.1、窗口分类
  1. 按照驱动类型分:时间窗口(定点发车)、计数窗口(人齐发车)
  2. 按照窗口分配数据的规则分:滚动窗口、滑动窗口、会话窗口、全局窗口
10.2、窗口API概述

按键分区和非按键分区

1、按键分区

按键分区后数据流被key分成多条逻辑流KeyedStream,窗口计算会在多个并行子任务上同时执行。相同key的数据会在一个子任务中,相当于每个key都定义了一组窗口各自独立进行统计计算。

2、非按键分区

原始流dataStreamSource不会分成多条逻辑流,窗口逻辑只能在一个任务上执行,相当于并行度为1。

10.3、窗口分配器

Window Assigners 是构建窗口算子的第一步,用来定义数据被分配到哪个窗口,即指定窗口的类型。一般使用.window()方法,传入Window Assigners参数,返回WindowedStream。非按键分区使用.windowAll(),返回AllWindowedStream.

基于时间:

  • 按键分区滚动窗口,窗口长度2秒

    keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(2)));
    
  • 按键分区滑动时间窗口,窗口长度10s,滑动步长2s

    keyedStream.window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(2)));
    
  • 按键分区会话窗口,窗口长度2s

    keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(2)));
    

基于计数:

  • 按键分区滚动窗口,窗口长度为5个元素

    keyedStream.countWindow(5);
    
  • 按键分区滑动窗口,窗口长度5个元素,滑动步长2个元素

    keyedStream.countWindow(5,2);
    
10.4、窗口函数

窗口分配器只收集数据,窗口函数Window Function进行计算操作。

各种流的相互关系
在这里插入图片描述

  • 增量聚合:来一条算一条,窗口触发时输出计算结果
  • 全窗口函数:数据来了不计算先存上,等窗口触发时计算并输出结果
10.4.1、增量聚合函数

每来一个数据就聚合一次

1、归约函数ReduceFunction

相同key的第一条数据来的时候不会调用reduce方法,来一条数据就算一条,窗口触发输出计算结果

package window;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;



public class WindowAPIDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> windowStream = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<WaterSensor> reduce = windowStream.reduce(new ReduceFunction<WaterSensor>() {
            @Override
            public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
                return new WaterSensor(value1.id, value2.ts, value1.vc + value2.vc);
            }
        });
        reduce.print();
        env.execute();

    }
}

2、聚合函数Aggregate Function

ReduceFunction能解决大多归约聚合问题,但聚合状态类型、输出结果类型和输入数据类型必须一样。Aggregate Function更加灵活,有三种类型:输入IN、累加器ACC、输出OUT。输入IN是输入流中元素的数据类型;累加器类型ACC是聚合中间状态类型;输出OUT是最终计算结果类型。

  • 第一条数据来创建窗口和累加器
  • 增量聚合:来一条算一条(调用一次add方法)
  • 窗口输出调用一次getresult方法
  • 输入、输出、中间累加器的类型可以不一样
package window;

import bean.WaterSensor;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;


public class WindowAggregateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> aggregate = window.aggregate(new AggregateFunction<WaterSensor, Integer, String>() {
            @Override
            public Integer createAccumulator() {
                System.out.println("初始化累加器");
                return 0;
            }

            @Override
            public Integer add(WaterSensor value, Integer accumulator) {
                System.out.println("调用add");
                return value.vc + accumulator;
            }

            @Override
            public String getResult(Integer accumulator) {
                System.out.println("输出结果");
                return accumulator.toString();
            }

            @Override
            public Integer merge(Integer a, Integer b) {
                //只有会话窗口才用
                return null;
            }
        });

        aggregate.print();
        env.execute();

    }
}

10.4.2、全窗口函数

1、窗口函数

.apply(),但是该方法能提供的上下文信息比较少,已经被ProcessWindowFunction全覆盖

window.apply(new WindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void apply(String key, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {

            }
        });

2、处理窗口函数

ProcessWindowFunction除了能拿到窗口数据外还能获取上下文对象。上下文包括窗口信息、当前的时间和状态信息(处理时间、事件时间水位线)

窗口触发时才调用一次,统一计算窗口内的所有数据

package window;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class WindowProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
//        dataStreamSource.windowAll();
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> process = window.process(new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
            @Override
            public void process(String key, Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
                long start = context.window().getStart();
                long end = context.window().getEnd();
                String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
                String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
                long l = elements.spliterator().estimateSize();
                out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
            }
        });
        process.print();
        env.execute();

    }
}

10.4.3、增量聚合与全窗口函数接合使用

增量聚合Aggregate+全窗口的ProcessWindow

  1. 增量聚合函数处理数据:来一条算一条
  2. 窗口触发时,增量聚合结果(只有一条数据)传给全窗口函数
  3. 经过全窗口函数的处理后输出

从而实现了两者的优点(reduce函数也能传全窗口函数)

  1. 增量聚合:来一条算一条只存储中间计算结果,占用空间少
  2. 全窗口函数:可以通过上下文实现灵活的功能
package window;

import bean.WaterSensor;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;


public class WindowAggregateAndProcessDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =  StreamExecutionEnvironment.getExecutionEnvironment();;
        env.setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> dataStreamSource = env.socketTextStream("192.168.132.101", 7777)
                .map(value ->{
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0],Long.parseLong(datas[1]),Integer.parseInt(datas[2]) );
                } );
        KeyedStream<WaterSensor, String> keyedStream = dataStreamSource.keyBy(value -> value.getId());
        WindowedStream<WaterSensor, String, TimeWindow> window = keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
        SingleOutputStreamOperator<String> outputStreamOperator = window.aggregate(new MyAgg(), new MyProcess());
        outputStreamOperator.print();
        env.execute();

    }
    public static class MyAgg implements  AggregateFunction<WaterSensor, Integer, String>{

        @Override
        public Integer createAccumulator() {
            System.out.println("初始化累加器");
            return 0;
        }

        @Override
        public Integer add(WaterSensor value, Integer accumulator) {
            System.out.println("调用add");
            return value.vc + accumulator;
        }

        @Override
        public String getResult(Integer accumulator) {
            System.out.println("输出结果");
            return accumulator.toString();
        }

        @Override
        public Integer merge(Integer a, Integer b) {
            //只有会话窗口才用
            return null;
        }
    }
    public static class MyProcess extends ProcessWindowFunction<String, String, String, TimeWindow> {

        @Override
        public void process(String key, Context context, Iterable<String> elements, Collector<String> out) throws Exception {
            long start = context.window().getStart();
            long end = context.window().getEnd();
            String winStart = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
            String winEnd = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
            long l = elements.spliterator().estimateSize();
            out.collect("key=" + key + "的窗口[" + winStart + "," + winEnd + "]包含" + l + "条数据" + elements.toString());
        }
    }
}

10.5、小结

触发器、移除器:现成的几个窗口都有默认的实现,一般不需要定义

以时间滚动窗口为例:

  • 窗口什么时候触发输出:时间进展>=窗口的最大时间戳(end-1ms)

  • 窗口是怎么划分的:start=取窗口长度的整数倍,向下取整,end=start+窗口长度,窗口左闭右开[start,end)

  • 窗口生命周期:

    创建:属于本窗口的第一条数据来的时候现new的,放入一个singleton单例的集合中;

    销毁(关窗):时间进展>=窗口的最大时间戳(end-1ms)+允许迟到时间(默认为0)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/875178.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

分类预测|基于改进的灰狼IGWO优化支持向量机SVM的数据分类预测matlab程序 改进策略:Cat混沌与高斯变异

分类预测|基于改进的灰狼IGWO优化支持向量机SVM的数据分类预测matlab程序 改进策略&#xff1a;Cat混沌与高斯变异 文章目录 一、基本原理原理流程1. **定义目标函数**2. **初始化GWO**3. **评估适应度**4. **更新狼的位置**5. **更新狼的等级**6. **重复迭代**7. **选择最佳解…

春招审核新策略:Spring Boot系统实现

3系统分析 3.1可行性分析 通过对本大学生入学审核系统实行的目的初步调查和分析&#xff0c;提出可行性方案并对其一一进行论证。我们在这里主要从技术可行性、经济可行性、操作可行性等方面进行分析。 3.1.1技术可行性 本大学生入学审核系统采用Spring Boot框架&#xff0c;JA…

综合案例-数据可视化-柱状图

一、基础柱状图 我们绘制一个关于三种水果销售额的柱状图&#xff0c;X轴数据为三种水果的名称&#xff0c;用列表[苹果,香蕉,橘子]添加进去&#xff0c;Y轴数据为三种水果的销售额&#xff0c;用列表[50,70,60]添加进去。 步骤&#xff1a; 导包构建柱状图对象添加X轴数据生…

Android 12系统源码_窗口管理(八)WindowConfiguration的作用

前言 在Android系统中WindowConfiguration这个类用于管理与窗口相关的设置&#xff0c;该类存储了当前窗口的显示区域、屏幕的旋转方向、窗口模式等参数&#xff0c;应用程序通过该类提供的信息可以更好的适配不同的屏幕布局和窗口环境&#xff0c;以提高用户体验。 一、类定…

喜报 | 知从科技荣获 “AutoSec 安全之星 - 优秀汽车软件供应链安全方案奖”

近日&#xff0c;「AutoSec 2024第八届中国汽车网络安全周暨第五届智能汽车数据安全展」在上海盛大举行。本届大会由谈思实验室和谈思汽车主办、上海市车联网协会联合主办&#xff0c;以汽车“网络数据安全、软件安全、功能安全”为主题&#xff0c;设置了“31X”模式&#xff…

关于 PC打开“我的电脑”后有一些快捷如腾讯视频、百度网盘、夸克网盘、迅雷等各种捷方式在磁盘驱动器上面统一删除 的解决方法

若该文为原创文章&#xff0c;转载请注明原文出处 本文章博客地址&#xff1a;https://hpzwl.blog.csdn.net/article/details/142029325 长沙红胖子Qt&#xff08;长沙创微智科&#xff09;博文大全&#xff1a;开发技术集合&#xff08;包含Qt实用技术、树莓派、三维、OpenCV…

numpy(基于Numpy外文文档的学习)

学习目标&#xff1a; Understand the difference between one-, two- and n-dimensional arrays in NumPy; Understand how to apply some linear algebra operations to n-dimensional arrays without using for-loops;&#xff08;调用一些简单的方法&#xff09; Underst…

外包干了三年,快要废了。。。

先简单说一下自己的情况&#xff0c;普通本科&#xff0c;在外包干了3年多的功能测试&#xff0c;这几年因为大环境不好&#xff0c;我整个人心惊胆战的&#xff0c;怕自己卷铺盖走人了&#xff0c;我感觉自己不能够在这样蹉跎下去了&#xff0c;长时间呆在一个舒适的环境真的会…

Docker 部署 Redis (图文并茂超详细)

部署 Redis ( Docker ) [Step 1] : 拉取 Redis 镜像, 推荐使用 7 的 Redis 版本 docker pull redis:7.0.12[Step 2] : 创建 Redis 相关目录 ➡️ 启动 Redis 容器 ➡️ 拷贝文件 ➡️ 授权文件夹 ➡️ 删除容器 # 创建 Redis 相关目录 mkdir -p /data/redis/{conf,data,log…

写的一致性问题之失效模式

文章目录 1、先删除redis缓存&#xff0c;再写入mysql&#xff1a;1.1、高并发情况下分析出现的问题 2、先写入mysql&#xff0c;再删除redis缓存 失效模式存在的问题&#xff1a;在事务提交之前可能会有其他读操作重新把旧数据放入redis缓存中 1、先删除redis缓存&#xff0c;…

深入解析全连接层:PyTorch 中的 nn.Linear、nn.Parameter 及矩阵运算

文章目录 数学概念&#xff08;全连接层&#xff0c;线性层&#xff09;nn.Linear()nn.Parameter()Q1. 为什么 self.weight 的权重矩阵 shape 使用 ( out_features , in_features ) (\text{out\_features}, \text{in\_features}) (out_features,in_features)而不是 ( in_featur…

Bev pool 加速(2):自定义c++扩展

文章目录 1. c++扩展2. 案例2.1 案例12. 1.1 代码实现(1) c++ 文件(2) setup.py编写(3) python 代码编写2.2 案例22.2.1 模型搭建2.2.2 c++ 扩展实现(1)c++ 扩展代码(2)setup.py编写(3)python 调用c++扩展在bevfusion论文中,将bev_pooling定义为view transform中的效率瓶…

PROTOTYPICAL II - The Practice of FPGA Prototyping for SoC Design

The Art of the “Start” The semiconductor industry revolves around the “start.” Chip design starts lead to more EDA tool purchases, more wafer starts, and eventually to more product shipments. Product roadmaps develop to extend shipments by integrating…

FloodFill算法

文章目录 1. 图像渲染&#xff08;733&#xff09;2. 岛屿数量&#xff08;200&#xff09;3. 岛屿的最大面积&#xff08;695&#xff09;4. 被围绕的区域&#xff08;130&#xff09; 1. 图像渲染&#xff08;733&#xff09; 题目描述&#xff1a; 算法原理&#xff1a; …

DAY13信息打点-Web 应用源码泄漏开源闭源指纹识别GITSVNDS备份

#知识点 0、Web架构资产-平台指纹识别 1、开源-CMS指纹识别源码获取方式 2、闭源-习惯&配置&特性等获取方式 3、闭源-托管资产平台资源搜索监控 演示案例&#xff1a; ➢后端-开源-指纹识别-源码下载 ➢后端-闭源-配置不当-源码泄漏 ➢后端-方向-资源码云-源码泄漏 …

1、https的全过程

目录 一、概述二、SSL过程如何获取会话秘钥1、首先认识几个概念&#xff1a;2、没有CA机构的SSL过程&#xff1a;3、没有CA机构下的安全问题4、有CA机构下的SSL过程 一、概述 https是非对称加密和对称加密的过程&#xff0c;首先建立https链接需要经过两轮握手&#xff1a; T…

算法提高模板强连通分量tarjan算法

AC代码&#xff1a; #include<bits/stdc.h>using namespace std;typedef long long ll; const int MOD 998244353; const int N 2e5 10;//强联通分量模板 //tarjan算法 vector<int>e[N]; int n, m, cnt; int dfn[N], low[N], ins[N], idx; int bel[N];//记录每…

Redis高可用,Redis性能管理

文章目录 一&#xff0c;Redis高可用&#xff0c;Redis性能管理二&#xff0c;Redis持久化1.RDB持久化1.1触发条件&#xff08;1&#xff09;手动触发&#xff08;2&#xff09;自动触发 1.2 Redis 的 RDB 持久化配置1.3 RDB执行流程(1) 判断是否有其他持久化操作在执行(2) 父进…

Chainlit集成Langchain并使用通义千问实现文生图网页应用

前言 本文教程如何使用通义千问的大模型服务平台的接口&#xff0c;实现图片生成的网页应用&#xff0c;主要用到的技术服务有&#xff0c;chainlit 、 langchain、 flux。合利用了大模型的工具选择调用能力。实现聊天对话生成图片的网页应用。 阿里云 大模型服务平台百炼 API…

R语言统计分析——功效分析3(相关、线性模型)

参考资料&#xff1a;R语言实战【第2版】 1、相关性 pwr.r.test()函数可以对相关性分析进行功效分析。格式如下&#xff1a; pwr.r.test(n, r, sig.level, power, alternative) 其中&#xff0c;n是观测数目&#xff0c;r是效应值&#xff08;通过线性相关系数衡量&#xff0…