Flink的处理函数

之前的流处理API,无论是基本的转换、聚合,还是更为复杂的窗口操作,其实都是基于DataStream进行转换的,所以可以统称为DataStream API。

在Flink更底层,我们可以不定义任何具体的算子(比如map,filter,或者window),而只是提炼出一个统一的“处理”(process)操作——它是所有转换算子的一个概括性的表达,可以自定义处理逻辑,所以这一层接口就被叫作“处理函数”(process function

一.基本处理函数(ProcessFunction

1.1 处理函数的功能和使用

转换算子一般针对某种具体操作来定义的,能拿到的信息有限。而使用底层的处理函数,则可以使用处理函数提供的“定时服务”(TimerServer) 来获取到当前当前水位线、事件等更为详细的信息,及注册“定时事件”。而且处理函数继承了AbstractRichFunction抽象类,所以拥有富函数类的所有特性,同样可以访问状态(state)和其他运行时信息。此外,处理函数还可以直接将数据输出到侧输出流(side output)中。所以,处理函数是最为灵活的处理方法,可以实现各种自定义的业务逻辑。

1.2 ProcessFunction解析

ProcessFunction的使用就基于 DataStream 调用 .process() 方法传入一个ProcessFunction作为参数,用来定义处理逻辑。

从源码可以看到,ProcessFunction 继承了 AbstractRichFunction(抽象富函数类),两个泛类型参数代表输入类型与输出类型。里面单独定于了两个非常重要的方法:一个是必须要实现的抽象方法.processElement();另一个是非抽象方法.onTimer()。

  • processElement():具体的数据处理逻辑,且对于流中的每个元素都会调用一次。三个参数分别为 value当前数据、ctx上下文、out采集器。
  • onTimer():当注册的定时器被触发,会执行该方法。三个参数分别为 timestamp时间戳、ctx上下文、out采集器。Flink中,只有“按键分区流”KeyedStream才支持设置定时器的操作。

通过几个参数的分析不难发现,ProcessFunction可以轻松实现flatMap、map、filter这样的基本转换功能;而通过富函数提供的获取上下文方法.getRuntimeContext(),也可以自定义状态(state)进行处理,这也就能实现聚合操作的功能了。

1.3 处理函数的分类 

DataStream在调用一些转换方法之后,有可能生成新的流类型;例如调用 .KeyBy() 后得到的是KeyedStream,然后调用 .window() 后得到的 WindowedStream。但是对于不同的流类型,都可以直接调用 .process() 方法进行自定义处理,此时传入的参数就叫做处理函数。当然,它们尽管本质相同,都是可以访问状态和时间信息的底层API,可彼此之间也会有所差异。

Flink提供了8个不同的处理函数:

(1) ProcessFunction

最基本的处理函数,基于DataStream直接调用.process()时作为参数传入。 

(2) KeyedProcessFunction

对流按键分区后的处理函数,基于KeyedStream调用.process()时作为参数传入。要想使用定时器,比如基于KeyedStream。

(3) ProcessWindowFunction

开窗之后的处理函数,也是全窗口函数的代表。基于WindowedStream调用.process()时作为参数传入。

(4) ProcessAllWindowFunction

同样是开窗之后的处理函数,基于AllWindowedStream调用.process()时作为参数传入。

(5) CoProcessFunction

合并(connect)两条流之后的处理函数,基于ConnectedStreams调用.process()时作为参数传入。

(6) ProcessJoinFunction

间隔连接(interval join)两条流之后的处理函数,基于IntervalJoined调用.process()时作为参数传入。

(7) BroadcastProcessFunction

广播连接流处理函数,基于BroadcastConnectedStream调用.process()时作为参数传入。这里的“广播连接流”BroadcastConnectedStream,是一个未keyBy的普通DataStream与一个广播流(BroadcastStream)做连接(conncet)之后的产物。

(8) KeyedBroadcastProcessFunction

按键分区的广播连接流处理函数,同样是基于BroadcastConnectedStream调用.process()时作为参数传入。与BroadcastProcessFunction不同的是,这时的广播连接流,是一个KeyedStream与广播流(BroadcastStream)做连接之后的产物。

二.按键分区处理函数(KeyedProcessFunction

上面提到,只有在KeyedStream中才支持使用TimerService设置定时器的操作。所以我们一般是先对流做 KeyBy 分区操作后,再去调用 .process() 定义具体的操作逻辑逻辑;一般传入 KeyedProcessFunction。

2.1 定时器(Timer)和定时服务(TimerService)

在.onTimer()方法中可以实现定时处理的逻辑,当之前注册的定时器被触发,则会调用该方法。注册定时器的功能,是通过上下文中提供的“定时服务”来实现的。

通过KeyedProcessFunction提供的上下文可以获取以下等内容:

ds
    .keyBy( t -> t.getId())
    .process(new KeyedProcessFunction<String, WaterSensor, Object>() {

        /**
         * 来一条数据调用一次
         * @param value 当前输入的数据
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {

            // 获取定时服务
            TimerService timerService = ctx.timerService();

            // 注册定时器:以事件时间为基准
            timerService.registerEventTimeTimer(long time);

            // 注册定时器:以处理时间为基准
            timerService.registerProcessingTimeTimer(long time);

            // 当前的处理时间:即系统时间
            timerService.currentProcessingTime();

            // 删除触发时间为time的处事件时间定时器
            timerService.deleteEventTimeTimer(long time);

            // 删除触发时间为time的处理时间定时器
            timerService.deleteEventTimeTimer(long time);

            // 获取当前水位线 ***获取的上一条数据的水位线
            timerService.currentWatermark();

        }    

        /**
         * 时间进展到定时器注册的时间则会调用该方法
         * @param timestamp 当前时间戳
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {

        }
    });

TimerService会以键(key)和时间戳为标准,对定时器进行去重;也就是说对于每个key和时间戳,最多只有一个定时器,如果注册了多次,onTimer()方法也将只被调用一次。

2.2 KeyedProcessFunction案例 

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl());

    // ***定义 WaterMark 策略
    WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);
    // ***指定 watermark策略
    SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
            .assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

    sensorWithWaterMark
            .keyBy( t -> t.getId())
            .process(new KeyedProcessFunction<String, WaterSensor, Object>() {
            /**
             * 来一条数据调用一次
             * @param value 当前输入的数据
             * @param ctx 上下文信息
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
                // 获取定时服务
                TimerService timerService = ctx.timerService();
                // 当前 Key
                String currentKey = ctx.getCurrentKey();
                // 数据中的事件时间
                Long timestamp = ctx.timestamp();
                // 注册以事件时间为基准的5s定时器
                timerService.registerEventTimeTimer(5000);
                System.out.println("当前 Key 为=" + currentKey + "当前时间为=" + timestamp + "注册了一个5s的定时器");

            }

            /**
             * 时间进展到定时器注册的时间则会调用该方法
             * @param timestamp 当前时间戳
             * @param ctx 上下文信息
             * @param out 采集器
             * @throws Exception
             */
            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, WaterSensor, Object>.OnTimerContext ctx, Collector<Object> out) throws Exception {
                // 当前 Key
                String currentKey = ctx.getCurrentKey();
                System.out.println("当前 Key 为=" + currentKey + "现在时间为" + timestamp + "定时器触发");
            }
        });

    env.execute();

}

 输入:

[root@VM-55-24-centos ~]# nc -lk 1234
S1,1,1
S1,4,4
S2,3,3
S2,5,5
S3,8,8
S3,9,9

输出:

当前 Key 为=S1当前时间为=1000注册了一个5s的定时器
当前 Key 为=S1当前时间为=4000注册了一个5s的定时器
当前 Key 为=S2当前时间为=3000注册了一个5s的定时器
当前 Key 为=S2当前时间为=5000注册了一个5s的定时器
当前 Key 为=S3当前时间为=8000注册了一个5s的定时器
当前 Key 为=S3当前时间为=9000注册了一个5s的定时器 // 触发定时器 9000ms-3000ms-1ms = 5999
当前 Key 为=S1现在时间为5000定时器触发
当前 Key 为=S3现在时间为5000定时器触发
当前 Key 为=S2现在时间为5000定时器触发

2.3 KeyedProcessFunction中的当前Watermark

在实现的 processElement() 中获取当前水位线

sensorWithWaterMark
        .keyBy( t -> t.getId())
        .process(new KeyedProcessFunction<String, WaterSensor, Object>() {
        /**
         * 来一条数据调用一次
         * @param value 当前输入的数据
         * @param ctx 上下文信息
         * @param out 采集器
         * @throws Exception
         */
        @Override
        public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, Object>.Context ctx, Collector<Object> out) throws Exception {
            
            // 获取定时服务
            TimerService timerServiceService = ctx.timerService();

            // 获取当前水位线
            long currentWatermark = timerServiceService.currentWatermark();

            System.out.println("当前数据为=" + value + "当前水位线为=" + currentWatermark );
        }
    });

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,3,3
s1,6,6
s1,8,8

输出:

当前数据为=WaterSensor{id='s1', ts=1, vc=1}当前水位线为=-9223372036854775808
当前数据为=WaterSensor{id='s1', ts=3, vc=3}当前水位线为=-2001
当前数据为=WaterSensor{id='s1', ts=6, vc=6}当前水位线为=-1
当前数据为=WaterSensor{id='s1', ts=8, vc=8}当前水位线为=2999

可以看到,在process中的当前水位线其实是 上一条数据的事件时间 - 水位线延迟时间 - 1ms。

2.4 KeyedProcessFunction 小结

  1. 定时服务(TimerServer)只有 KeyedStream(键控流) 才能使用
  2. 事件时间定时器,是通过 Watermark 触发的
  3. Watermark = 当前最大事件时间 - Watermark 延迟时间 - 1ms
  4. 在 process 中获取到的 Watermark 其实是上一条数据的 Watermark

其他处理函数类似。

三. 应用案例——Top N

案例需求:实时统计一段时间内的出现次数最多的水位。例如,统计最近10秒钟内出现次数最多的两个水位,并且每5秒钟更新一次。我们知道,这可以用一个滑动窗口来实现。于是就需要开滑动窗口收集传感器的数据,按照不同的水位进行统计,而后汇总排序并最终输出前两名。这其实就是著名的“Top N”问题。 

3.1 使用ProcessAllWindowFunction

思路:直接开窗,使用全窗口函数处理窗口内所有的数据,使用HashMap存储,再对map进行统计排序输出。统计十秒内数据,五秒输出一次,其实就是滑动窗口大小为10,滑动步长为5。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl());

    WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L);

    SingleOutputStreamOperator<WaterSensor> sensorWithWaterMark = sensorDS
            .assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);

    sensorWithWaterMark
            // 滑动窗口,窗口大小10s,滑动步长5s
            .windowAll(SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(5)))
            .process(new MyProcessAllWindowFunction())
            .print();
    env.execute();
}

    /**
     * 自定义全窗口处理函数
     *      全窗口函数:窗口触发时调用一次
     */
    public static class MyProcessAllWindowFunction extends ProcessAllWindowFunction<WaterSensor , String , TimeWindow>{
        @Override
        public void process(ProcessAllWindowFunction<WaterSensor, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
            // 定义一个hashmap用来存,key=vc,value=count值
            Map<Integer, Integer> vcCountMap = new HashMap<>();
            for (WaterSensor element : elements) {
                Integer vc = element.getVc();
                if (vcCountMap.containsKey(vc)) {
                    vcCountMap.put(vc, vcCountMap.get(vc) + 1);
                } else {
                    vcCountMap.put(vc, 1);
                }
            }

            List<Map.Entry<Integer, Integer>> list = new ArrayList<>(vcCountMap.entrySet());

            // 使用Collections.sort()按value降序排序
            Collections.sort(list, (o1, o2) ->  o2.getValue() - o1.getValue());
            System.out.println(list);
            StringBuffer result = new StringBuffer();
            for (int i = 0; i < Math.min(2 , list.size()); i++) {
                result.append("Top " + (i+1) + ": vc = " + list.get(i).getKey() + ",出现次数 = " + list.get(i).getValue());
                result.append("\n");
            }
            out.collect(result.toString());
        }
    }

}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s2,3,1
s1,5,2
s3,6,2
s1,6,1
s2,7,3
s3,8,1
s1,8,3
s2,9,2
s1,10,1
s3,11,2
s1,13,2

输出:

// 注释:[ 0 , 5 ] 的窗口
Top 1: vc = 1,出现次数 = 2 

// 注释:[ 0 , 10 ] 的窗口
Top 1: vc = 1,出现次数 = 4
Top 2: vc = 2,出现次数 = 3

3.2 使用KeyedProcessFunction

上面的方法使用全窗口将所有的数据都放在一个分区内,强行将并行度设置成了1,这是Flink不推荐的做法。

则可以使用KeyedProcessFunction进行优化:

1.对统计字段(vc)进行 KeyBy 分区

2.进行增量聚合,统计vc出现的次数,封装数据(vc,count,窗口标记(窗口结束数据))

3.对标记(窗口)进行分组,对数据进行排序、取TopN

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl())
                    .assignTimestampsAndWatermarks(WatermarkStrategy
                                            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
                                            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
                    );

    // 对 vc 进行分组,统计窗口内vc出现的次数 将每条数据封装成 vc,count,窗口结束时间
    SingleOutputStreamOperator<Tuple3<Integer, Integer, Long>> windowAgg  = sensorDS
            .keyBy(sensor -> sensor.getVc())
            .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
            .aggregate(
                    new VcCountAgg(),
                    new WindowResult()
            );

    // 按照打的标记(windowEndTime)进行分组,保证同一个窗口时间范围的结果一起 , 分组后排序,取 TopN
    windowAgg.keyBy(v -> v.f2)
                    .process(new TopN(2))
                    .print();
    env.execute();
}
/**
 * 增量聚合:累计同分组出现的次数
 */
public static class VcCountAgg implements AggregateFunction<WaterSensor,Integer,Integer> {

    @Override
    public Integer createAccumulator() {
        return 0;
    }

    @Override
    public Integer add(WaterSensor waterSensor, Integer integer) {
        return integer+1;
    }

    @Override
    public Integer getResult(Integer integer) {
        return integer;
    }

    @Override
    public Integer merge(Integer integer, Integer acc1) {
        return null;
    }
}


/**
 * 全窗口函数,窗口内数据全部到达才会执行一次
 * 泛型如下:
 * 第一个:输入类型 = 增量函数的输出  count值,Integer
 * 第二个:输出类型 = Tuple3(vc,count,windowEndTime) ,带上 窗口结束时间 的标签
 * 第三个:key类型 , vc,Integer
 * 第四个:窗口类型
 */
public static class WindowResult extends ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow> {

    @Override
    public void process(Integer vc, ProcessWindowFunction<Integer, Tuple3<Integer, Integer, Long>, Integer, TimeWindow>.Context context, Iterable<Integer> elements, Collector<Tuple3<Integer, Integer, Long>> out) throws Exception {
        // 获取迭代器的数据(只有一条数据)
        Integer count = elements.iterator().next();
        // 获取窗口结束时间 作为标记
        long endTime = context.window().getEnd();
        // 将 vc、vc对应的count 连带窗口标记 返回
        out.collect(Tuple3.of(vc , count , endTime));
    }
}
/**
 *  处理组内的每一条数据 一条数据触发一次
 */
public static class TopN extends KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String> {

    // 存不同窗口的 统计结果,key=windowEnd,value=list数据
    private Map<Long, List<Tuple3<Integer, Integer, Long>>> dataListMap;

    // 要取的Top数量
    private int threshold;

    public TopN(int threshold) {
        this.threshold = threshold;
        dataListMap = new HashMap<>();
    }


    @Override
    public void processElement(Tuple3<Integer, Integer, Long> value, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.Context ctx, Collector<String> out) throws Exception {
        // 进入这个方法,只是一条数据,要排序,得到齐才行 ===》 存起来,不同窗口分开存
        Long windowEnd = value.f2;
        // 将对应的窗口的数据放入map中对应key的list中
        if(dataListMap.containsKey(windowEnd)){
            // 该 vc 存在,则直接添加到数组中
            dataListMap.get(windowEnd).add(value);
        }else{
            // 不包含vc,是该vc的第一条
            List<Tuple3<Integer, Integer, Long>> dataList = new ArrayList<>();
            dataList.add(value);
            dataListMap.put(windowEnd , dataList);
        }

        // 注册一个定时器 窗口触发时触发定时器(windowEndTime + 1ms) 输出计算结果
        // 同一个窗口范围,应该同时输出,只不过是一条一条调用processElement方法,只需要延迟1ms即可
        ctx.timerService().registerProcessingTimeTimer(windowEnd + 1);
    }

    // 注册一个定时器 窗口触发时进行排序,取TopN,输出结果
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<Long, Tuple3<Integer, Integer, Long>, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        super.onTimer(timestamp, ctx, out);
        // 定时器触发,同一个窗口范围的计算结果攒齐了,开始 排序、取TopN
        Long windowEnd = ctx.getCurrentKey();
        // 1. 排序
        List<Tuple3<Integer, Integer, Long>> dataList = dataListMap.get(windowEnd);
        dataList.sort((o1, o2) -> o2.f1 - o1.f1);

        // 2. 取TopN
        StringBuilder outStr = new StringBuilder();

        outStr.append("================================\n");
        // 遍历 排序后的 List,取出前 threshold 个, 考虑可能List不够2个的情况  ==》 List中元素的个数 和 2 取最小值
        for (int i = 0; i < Math.min(threshold, dataList.size()); i++) {
            Tuple3<Integer, Integer, Long> vcCount = dataList.get(i);
            outStr.append("Top" + (i + 1) + "\n");
            outStr.append("vc=" + vcCount.f0 + "\n");
            outStr.append("count=" + vcCount.f1 + "\n");
            outStr.append("窗口结束时间=" + vcCount.f2 + "\n");
            outStr.append("================================\n");
        }

        out.collect(outStr.toString());

    }
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,1
s1,2,1
s2,3,3
s1,4,3
s1,5,1
s2,8,1
s1,9,3
s3,11,3
s1,12,3
s1,13,3

输出:

================================
Top1
vc=1
count=2
窗口结束时间=5000
================================
Top2
vc=3
count=2
窗口结束时间=5000
================================

================================
Top1
vc=1
count=4
窗口结束时间=10000
================================
Top2
vc=3
count=3
窗口结束时间=10000
================================

3.3 侧输出流(Side Output

侧输出流可以认为是“主流”上分叉出的“支流”,所以可以由一条流产生出多条流,而且这些流中的数据类型还可以不一样。利用这个功能可以很容易地实现“分流”操作。

具体实现上,可以在处理函数中的上下文中调用 .output() 方法即可。

案例:对每个传感器,水位超过10则输出告警信息。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);

    SingleOutputStreamOperator<WaterSensor> sensorDS = env
                    .socketTextStream("xxx.xxx.xxx.xxx", 1234)
                    .map(new MyMapFunctionImpl())
                    .assignTimestampsAndWatermarks(WatermarkStrategy
                                            .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3)) // 设置最大等待时间为3s
                                            .withTimestampAssigner((SerializableTimestampAssigner<WaterSensor>) (waterSensor, l) -> waterSensor.getTs() * 1000L)
                    );

    // 定义输出流标签
    OutputTag<String> warnTag = new OutputTag<String>("warn-tag", Types.STRING);
    
    SingleOutputStreamOperator<WaterSensor> process = sensorDS
            .keyBy(WaterSensor::getId)
            .process(new KeyedProcessFunction<String, WaterSensor, WaterSensor>() {

                @Override
                public void processElement(WaterSensor value, KeyedProcessFunction<String, WaterSensor, WaterSensor>.Context ctx, Collector<WaterSensor> out) throws Exception {
                    if (value.getVc() >= 10) {
                        // 使用侧输出流告警
                        ctx.output(warnTag, "当前水位线:" + value.getVc() + ",触发阈值10!");
                    }
                    out.collect(value);
                }
            });

    // 输出主流
    process.print("主流");

    // 输出侧流
    process.getSideOutput(warnTag).printToErr("侧流-Warn");

    env.execute();
}

输入:

[root@VM-55-24-centos ~]# nc -lk 1234
s1,1,3
s2,5,7
s1,9,10
s1,5,9
s5,11,11

输出:
 

主流> WaterSensor{id='s1', ts=1, vc=3}
主流> WaterSensor{id='s2', ts=5, vc=7}
侧流-Warn> 当前水位线:10,触发阈值10!
主流> WaterSensor{id='s1', ts=9, vc=10}
主流> WaterSensor{id='s1', ts=5, vc=9}
侧流-Warn> 当前水位线:11,触发阈值10!
主流> WaterSensor{id='s5', ts=11, vc=11}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/249594.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

文章解读与仿真程序复现思路——电网技术EI\CSCD\北大核心《考虑碳交易机制的含风电电力系统日前优化调度》

解读标题 "考虑碳交易机制的含风电电力系统日前优化调度" 意味着在该研究或应用中&#xff0c;对含有风电的电力系统进行日前优化调度&#xff0c;并考虑了碳交易机制。 具体解读如下&#xff1a; "含风电电力系统"&#xff1a;指涉及到风能作为电力系统的…

前端设计模式之旅:命令模式

引言 使用命令模式&#xff0c;我们可以将执行特定任务的对象与调用该方法的对象解耦。 核心思想 命令模式的核心思想是将请求封装成一个对象&#xff0c;从而使请求的发起者和请求的执行者解耦。 请求的发起者只需要知道如何创建命令对象并将其传递给请求者&#xff0c;而不需…

Python基础入门第四节,第五节课笔记

第四节 第一个条件语句 if 条件: 条件成立执行的代码1 条件成立执行的代码2 ...... else: 条件不成立执行的代码1 条件不成立执行的代码2 …… 代码如下: 身高 float(input("请输入您的身高(米):")) if 身高 >1.3:print(f您的身高是{身高},已经超过1.3米,您需…

python+appium自动化常见操作

1、点击、输入操作 #点击 driver.find_element(id,com.lemon.lemonban:id/navigation_my).click() #输入 driver.find_element(id,com.lemon.lemonban:id/et_password).send_keys(abc)2、隐形等待 driver.implicitly_wait(10)3、显性等待 #显性等待 locator (xpath,xpath) wai…

数学公式推导中 “:=“和“:=“的区别

A:B 将A定义为&#xff08;记为&#xff0c;令为&#xff09;B A:B 将B定义为&#xff08;记为&#xff0c;令为&#xff09;A

fckeditor编辑器在Chrome浏览器下编辑时多出空格解决方法

查看专栏目录 Network 灰鸽宝典专栏主要关注服务器的配置&#xff0c;前后端开发环境的配置&#xff0c;编辑器的配置&#xff0c;网络服务的配置&#xff0c;网络命令的应用与配置&#xff0c;windows常见问题的解决等。 文章目录 结尾语网络的梦想 dedecms网站后台采用fckedi…

『OPEN3D』1.5.1 动手实现点云暴力最近邻

本专栏地址: https://blog.csdn.net/qq_41366026/category_12186023.html?spm=1001.2014.3001.5482https://blog.csdn.net/qq_41366026/category_12186023.html?spm=1001.2014.3001.5482 1、暴力最近邻法 暴力最近邻法 (Brute-force Nearest Neighbour Search,BF 搜索) 是…

《人工智能导论》知识思维导图梳理【第6章节】

文章目录 第六章 知识图谱1 知识图谱概述2 知识图谱相关概念3 知识图谱的逻辑结构4 知识图谱的数据存储5 知识图谱的构建过程6 例题 markdown内容的分享 第六章 知识图谱 1 知识图谱概述 2 知识图谱相关概念 3 知识图谱的逻辑结构 4 知识图谱的数据存储 5 知识图谱的构建过程 6…

论文阅读——Mask DINO(cvpr2023)

DINO是检测&#xff0c;Mask DINO是检测分割。 几个模型对比&#xff1a; 传统的检测分割中&#xff0c;检测头和分割头是平行的&#xff0c;Mask DINO使用二分图匹配bipartite matching提高匹配结果的准确性。 box对大的类别不计算损失&#xff0c;因为太大了&#xff0c;会…

Gitee:远程仓库步骤

第一步&#xff1a;新建仓库 第二步&#xff1a;初始化本地仓库&#xff0c;git init 创建分支 git branch 新分支名 第三步&#xff1a;git add . &#xff1a;添加到暂存区 第四步&#xff1a;git config –global user.email关联邮箱&#xff0c;user.name用户名 第…

UE5 Landscaping MapBox 学习笔记

1. Landscaping MapBox 操作录屏 https://www.bilibili.com/video/BV113411U7T9/?spm_id_from333.337.search-card.all.click&vd_source707ec8983cc32e6e065d5496a7f79ee6 安装Landscaping与LandscapingMapbox两个插件 打开Landscaping窗口&#xff0c;这里应该要在Proj…

【CDP】CDP 集群通过Knox 访问Yarn Web UI,无法跳转到Flink Web UI 问题解决

一、前言 记录下在CDP 环境中&#xff0c;通过Knox 访问Yarn Web UI&#xff0c;无法跳转到Flink Web UI 的BUG 解决方法。 二、问题复现 登录 Knox Web UI 找到任一 Flink 任务 点击 ApplicationMaster 跳转 Flink WEB UI 出问题 内容空白&#xff0c;无法正常跳转到…

python 小程序学生选课系统源码

开发工具&#xff1a; PyCharm&#xff0c;mysql5.7&#xff0c;微信开发者工具 技术说明&#xff1a; python django html 小程序 功能介绍&#xff1a; 学生&#xff1a; 登录&#xff0c;选课&#xff08;查看课程及选择&#xff09;&#xff0c;我的成绩&#xff0c;…

Unity 使用AddTorque方法给刚体施加力矩详解

给刚体施加力&#xff0c;除了使用AddForce方法&#xff0c;我们还可以使用AddTorque方法。该方法是通过施加力矩给刚体以力。AddTorque方法从形式上跟AddForce差不多&#xff0c;它也有4个重载方法&#xff1a; 1、AddTorque(Vector3 torque)&#xff1b;使用Vector3类型参数…

kakfa实战指引-实时海量流式数据处理

前言 我们最终决定从头开始构建一些东西。我们的想法是&#xff0c;与其专注于保存成堆的数据&#xff0c;如关系数据库、键值存储、搜索索引或缓存&#xff0c;不如专注于将数据视为不断发展和不断增长的流&#xff0c;并围绕这个想法构建一个数据系统——实际上是一个数据架…

基于YOLOv8深度学习的高精度车辆行人检测与计数系统【python源码+Pyqt5界面+数据集+训练代码】目标检测、深度学习实战

《博主简介》 小伙伴们好&#xff0c;我是阿旭。专注于人工智能、AIGC、python、计算机视觉相关分享研究。 ✌更多学习资源&#xff0c;可关注公-仲-hao:【阿旭算法与机器学习】&#xff0c;共同学习交流~ &#x1f44d;感谢小伙伴们点赞、关注&#xff01; 《------往期经典推…

zync spi flash 频率配置

spi flash的频率配置 代码流程及最终的频率值。 驱动目录 基于4.14.55 内核&#xff0c; \drivers\spi\spi-dw-fmsh.c (控制器) \drivers\spi\spi-dw.c \drivers\mtd\devices\m25p80.c &#xff08;设备&#xff09; \drivers\spi\spi.c spi dts配置说明 spi0: spie000100…

3、Kafka 线上集群部署方案怎么做?

文章目录 1、操作系统的选择1.1、I/O 模型的使用1.2、数据网络传输效率1.3、社区支持度 2、磁盘的选择3、磁盘容量的规划3.1、举例思考本问题&#xff1a;3.2、计算一下&#xff1a;3.3、规划磁盘容量时你需要考虑下面这几个元素&#xff1a; 4、带宽规划4.1、计算 总结 1、操作…

提供一个数据库的表,然后,分页显示表中所有信息,一页10个,此表130条信息。最后再以饼图显示男 女 未知 人数的情况。

运行之后显示的效果&#xff1a; 如果是新项目&#xff0c;建立项目后&#xff0c;把mysql驱动放到指定的目录下即&#xff1a; WebContent\WEB-INF-lib 我用的驱动是 mysql-connector-j-8.0.33.jar 展示页 listpage.jsp <%page import"java.util.Map.Entry"%&g…

break用法

break他是用于从循环语句中跳出一层循环体的&#xff0c;提前结束循环 但是值得注意的点事break只能用在循环语句和switch当中 那么我们上代码进行具体的理解&#xff1a; 如果圆的面积大于100就会终止循环&#xff0c;那么如何体现出他只能终结一个循环呢&#xff0c;请看下…