flink重温笔记(八):Flink 高级 API 开发——flink 四大基石之 Window(涉及Time)

Flink学习笔记

前言:今天是学习 flink 的第八天啦!学习了 flink 高级 API 开发中四大基石之一: window(窗口)知识点,这一部分只要是解决数据窗口计算问题,其中时间窗口涉及时间,计数窗口,会话窗口,以及 windowFunction 的各类 API,前前后后花费理解的时间还是比较多的,查阅了很多官方文档,我一定要好好掌握!

Tips:二月底了,春天来临之际我要再度突破自己,加油!

文章目录

  • Flink学习笔记
    • 三、Flink 高级 API 开发
      • 1. Window
        • 1.1 为什么需要 Window
        • 1.2 窗口应用代码结构
        • 1.3 窗口的类型和概念
        • 1.4 三种时间语义
        • 1.5 窗口的使用
          • 1.5.1 滚动窗口
          • 1.5.2 滑动窗口
          • 1.5.3 会话窗口
        • 1.6 窗口的范围
        • 1.7 Window API
          • 1.7.1 Window API 调用方法
        • 1.8 Time Window 案例
          • 1.8.1 滚动窗口(无重叠数据)
          • 1.8.2 滑动窗口(有重叠数据)
        • 1.9 Count Window 案例
          • 1.9.1 滚动窗口(无重叠数据)
          • 1.9.2 滑动窗口(有重叠数据)
        • 1.10 Session Window 案例
        • 1.11 Window Function
          • 1.11.1 增量聚合函数
            • (1)reduce 和 aggregate 的区别
            • (2) ReduceFunction
            • (3) AggregateFunction
          • 1.11.2 全量聚合函数
            • (1) apply 和 process 的区别
            • (2) ProcessWindowFunction / ProcessAllWindowFunction
            • (3) 自定义聚合 apply

三、Flink 高级 API 开发

Flink 的四大基石:Checkpoint、State、Time、Window

在这里插入图片描述

1. Window

1.1 为什么需要 Window

例如:流数据处理中,在过去的1分钟内有多少用户点击了我们的网页。在这种情况下,我们必须定义一个窗口(window),用来收集最近1分钟内的数据,并对这个窗口内的数据进行计算。

1.2 窗口应用代码结构
  • Keyed Window
// Keyed Window
stream
        .keyBy(...)              <-  按照一个Key进行分组
        .window(...)            <-  将数据流中的元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process()      <-  窗口处理函数Window Function
  • Non-Keyed Window
// Non-Keyed Window
stream
        .windowAll(...)         <-  不分组,将数据流中的所有元素分配到相应的窗口中
        [.trigger(...)]            <-  指定触发器Trigger(可选)
        [.evictor(...)]            <-  指定清除器Evictor(可选)
        .reduce/aggregate/process()      <-  窗口处理函数Window Function

Tips:windowAll 不对数据流进行分组,所有数据将发送到下游算子单个实例上,下游并行度是 1。

1.3 窗口的类型和概念

类型:

  • CountWindow:按照指定的数据条数生成一个Window,与时间无关。
    • 滚动计数窗口,每隔N条数据,统计前N条数据
    • 滑动计数窗口,每隔N条数据,统计前M条数据
  • TimeWindow:按照时间生成Window。
    • 滚动时间窗口,每隔N时间,统计前N时间范围内的数据,窗口长度N,滑动距离N
    • 滑动时间窗口,每隔N时间,统计前M时间范围内的数据,窗口长度M,滑动距离N
    • 会话窗口,按照会话划定的窗口

概念:

  • 滚动窗口 — TumblingWindow:

    例子:假设有个红绿灯,提出个问题:*计算一下通过这个路口的汽车数量*

    细分:*按照窗口划分依据分为:滚动时间窗口、滚动计数窗口*

  • 滑动窗口 — SlidingWindow

    解释:每隔1分钟,统计前面2分钟内通过的车辆数

  • 滚动和滑动区别:

    滑动距离 > 窗口长度, 漏掉数据,如:每隔5分钟,统计前1分钟的数据(滑动距离5分钟,窗口长度1分钟,漏掉4分钟数据)

    滑动距离 < 窗口长度, 重复处理数据,如:每隔1分钟,统计前5分钟数据(滑动距离1分钟,窗口长度5分钟,重复处理4分钟数据)

    滑动距离 = 窗口长度, 不漏也不会重复,也就是滚动窗口

1.4 三种时间语义
  • EventTime [事件时间]

    事件发生的时间,例如:点击网站上的某个链接的时间

    使用Event Time的优势是结果的可预测性,缺点是缓存较大,增加了延迟,且调试和定位问题更复杂。

  • IngestionTime [摄入时间]

    某个 Flink 节点的 source operator接收到数据的时间,例如:某个source消费到kafka中的数据

    Ingestion Time 程序无法处理任何无序事件或延迟数据,Flink 会自动分配时间戳和自动生成水位线。

  • ProcessingTime [处理时间]

    某个 Flink 节点执行某个 operation的时间,例如:timeWindow接收到数据的时间

    它提供了最好的性能和最低的延迟,但是无法精准的体现出数据在产生的那个时刻的变化情况

在这里插入图片描述

1.5 窗口的使用
1.5.1 滚动窗口

使用方法:

  • 滚动窗口下窗口之间之间不重叠,且窗口长度是固定的
  • 可以用 TumblingEventTimeWindowsTumblingProcessingTimeWindows 创建一个基于 Event TimeProcessing Time 的滚动时间窗口。
  • 窗口的长度可以用 org.apache.flink.streaming.api.windowing.time.Time 中的 secondsminuteshours 和 days 来设置。

例子:窗口的起止时间是[0:00:00.000 - 0:59:59.999),如果设置了 Offset,那么窗口的起止时间将变为[0:15:00.000 - 1:14:59.999)

DataStream<T> input = ...

// 基于Event Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 基于Processing Time的滚动窗口
input
.keyBy(<KeySelector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<window function>(...)

// 在小时级滚动窗口上设置15分钟的Offset偏移
input
.keyBy(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
.<window function>(...)

注意:时间窗口使用的是****timeWindow*()也可以使用*window****()

1.5.2 滑动窗口

使用方法:

  • 滑动窗口以一个步长(Slide)不断向前滑动,窗口的长度固定
  • slide < size,重复处理数据,slide > size,漏掉数据

例子:系统时间基于格林威治标准时间(UTC-0),中国的当地时间可以设置offset为Time.hours(-8)。

val input: DataStream[T] = ...

// sliding event-time windows
input
.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<window function>(...)

// sliding processing-time windows offset by -8 hours
input
.keyBy(<...>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<window function>(...)
1.5.3 会话窗口

使用方法:

  • 会话窗口根据 Session gap 切分不同的窗口,当一个窗口在大于 Session gap 的时间内没有接收到新数据时,窗口将关闭。

  • 窗口的长度是可变的,每个窗口的开始和结束时间并不是确定的

  • 可以设置定长的 Session gap,也可以使用 SessionWindowTimeGapExtractor 动态地确定Session gap的长度

例子:

  • 使用定长和可变的 Session gap 来建立会话窗口,
  • 其中 SessionWindowTimeGapExtractor[T] 的泛型 T 为数据流的类型,
  • 可以根据数据流中的元素来生成 Session gap 。
val input: DataStream[T] = ...

// event-time session windows with static gap
input
        .keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)

// event-time session windows with dynamic gap
input
        .keyBy(...)
.window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
    // determine and return session gap
}
}))
.<window function>(...)

// processing-time session windows with static gap
input
        .keyBy(...)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<window function>(...)


// processing-time session windows with dynamic gap
input
        .keyBy(...)
.window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[T] {
override def extract(element: T): Long = {
    // determine and return session gap
}
}))
.<window function>(...)
1.6 窗口的范围
  • 1-窗口的判断是按照 毫秒 为单位

  • 2-窗口的开始: start,窗口的结束: start + 窗口长度 -1 毫秒

  • 3-开始时间 和 结束时间两者结合 决定了数据是属于哪个窗口的,数据的时间要满足:

    大于等于开始时间

    小于等于结束时间

  • 4-结束时间决定了窗口何时关闭和触发计算,规则是:

    数据的时间 满足 大于等于 结束时间 - 1毫秒

例子:

  • 比如窗口长度是5秒, 从0开始,那么窗口结束是: 0 + 5000 -1 = 4999
1.7 Window API
1.7.1 Window API 调用方法
  • 1- window 方法:仅针对keyby后的流可以使用,对分流后的每个子流加窗口

  • 2-windowAll 方法:使用了 keyby 分流后的流或者未使用 keyby 分流后的流,均可使用

    作用:对数据进行加窗口操作,并且会忽略是否进行了keyby分流

    区别:

    • 使用keyby分流后的流如果调用windowAll, 相当于未分流的效果
    • 未使用keyby分流后的数据,只能调用windowAll方法,无法调用window方法

1.8 Time Window 案例
1.8.1 滚动窗口(无重叠数据)

例子:自定义数据源,滚动时间窗口,5秒

package cn.itcast.day08.window;

/**
 * @author lql
 * @time 2024-02-24 22:24:47
 * @description TODO
 */

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import javax.xml.crypto.Data;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 滚动时间窗口
 * 案例:
 * 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字
 * 对数据加窗口, 窗口1对未分流的数据统计数字总和
 * 窗口2对按key分组后的数据统计每个key对应的数字总和
 */
public class TumblingTimeWindowDemo {
    public static void main(String[] args) throws Exception {
        // TODO 1) 初始化 flink 流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // TODO 2) 接入数据源
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GenerateRandomNumSource());
        streamSource.printToErr("生产的数据>>>");
        // TODO 3) 对数据应用窗口操作
        //窗口1对未分流的数据统计数字总和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
        //窗口2对按key分组后的数据统计每个key对应的数字总和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(5))).sum(1);
        sumOfAll.print("未分流数据的统计>>>");
        sumEashKey.print("分流后数据的统计>>>");

        //todo 4)运行任务
        env.execute();
    }

    /**
     * 自定义source
     * 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字
     */
    private static class GenerateRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {
        private Boolean isRunning = true;
        private final Random random = new Random();
        private final List<String> keyList = Arrays.asList("hadoop","spark","flink");

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
            while(isRunning){
                String key = keyList.get(random.nextInt(3));
                ctx.collect(Tuple2.of(key,random.nextInt(100)));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

生产的数据>>>:5> (spark,83)
生产的数据>>>:6> (spark,81)
生产的数据>>>:7> (flink,6)
生产的数据>>>:8> (hadoop,89)
生产的数据>>>:1> (spark,96)

分流后数据的统计>>>:7> (flink,6)
分流后数据的统计>>>:8> (hadoop,89)
分流后数据的统计>>>:1> (spark,260)
未分流数据的统计>>>:4> (spark,355)

生产的数据>>>:2> (hadoop,46)
生产的数据>>>:3> (flink,65)
生产的数据>>>:4> (hadoop,2)
生产的数据>>>:5> (hadoop,92)
生产的数据>>>:6> (hadoop,38)

分流后数据的统计>>>:7> (flink,65)
分流后数据的统计>>>:8> (hadoop,178)
未分流数据的统计>>>:5> (hadoop,243)

总结:

  • 分流的数据 key 和 value 是分明的
  • 未分流的数据 key 取第一个,value 是总和
  • Time.seconds() 是取值时间,TimeUnit.SECONDS 是为了 sleep 方法
1.8.2 滑动窗口(有重叠数据)

例子:自定义数据源,滚动窗口,窗口10秒,滑动5秒

package cn.itcast.day08.window;

/**
 * @author lql
 * @time 2024-02-24 23:24:08
 * @description TODO
 */

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 滑动时间窗口案例
 * 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字
 * 每隔5秒统计前10秒的数据, 分别统计
 *  1. 全量数字之和
 *  2. 分组后每个key对应的数字之和
 */
public class SlidingTimeWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //todo 2)接入数据源
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());
        streamSource.printToErr("生成的数据>>>");
        //todo 3)对数据应用窗口操作
        //窗口1对未分流的数据统计数字总和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);
        sumOfAll.print("未分流数据的统计>>>");
        sumEashKey.print("分流后数据的统计>>>");

        //todo 4)运行任务
        env.execute();

    }

    /**
     * 自定义source
     * 自定义一个Source, 每隔1秒产生一个的k,v  k是hadoop spark flink 其中某一个, v是随机数字
     */

    private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String, Integer>> {
        private Boolean isRunning = true;
        private final List<String> keyList = Arrays.asList("hadoop","spark","flink");
        private final Random random = new Random();
        @Override
        public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            while (isRunning) {
                String key = keyList.get(random.nextInt(3));
                sourceContext.collect(Tuple2.of(key, random.nextInt(100)));
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

=========第一批5秒数据============
生成的数据>>>:8> (hadoop,36)
生成的数据>>>:1> (spark,59)
生成的数据>>>:2> (hadoop,29)
生成的数据>>>:3> (spark,26)
生成的数据>>>:4> (spark,1)

分流后数据的统计>>>:8> (hadoop,65)
分流后数据的统计>>>:1> (spark,86)
未分流数据的统计>>>:2> (hadoop,151)

=========第二批5秒数据============
生成的数据>>>:5> (hadoop,37)
生成的数据>>>:6> (hadoop,75)
生成的数据>>>:7> (spark,69)
生成的数据>>>:8> (spark,11)
生成的数据>>>:1> (hadoop,4)

分流后数据的统计>>>:8> (hadoop,181)
分流后数据的统计>>>:1> (spark,166)
未分流数据的统计>>>:3> (hadoop,347)

=========第三批5秒数据============
生成的数据>>>:2> (flink,24)
生成的数据>>>:3> (spark,55)
生成的数据>>>:4> (flink,42)
生成的数据>>>:5> (flink,44)
生成的数据>>>:6> (hadoop,44)

分流后数据的统计>>>:8> (hadoop,160)
分流后数据的统计>>>:1> (spark,135)
分流后数据的统计>>>:7> (flink,110)
未分流数据的统计>>>:4> (hadoop,405)

总结:

  • 第二次计算的是第一批5秒和第二批5秒的数据
  • 第三次计算的是第二批5秒和第三批5秒的数据
1.9 Count Window 案例
1.9.1 滚动窗口(无重叠数据)

例子:每隔5条统计数据, 分别统计,未分组(windowall)和分组(window)

package cn.itcast.day08.window;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-02-25 18:41:37
 * @description TODO
 */
/**
 * 每隔5条统计数据, 分别统计
 * 1. 全量数字之和
 * 2. 分组后每个key对应的数字之和
 */
public class TumblingCountWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GenerateRandomNumEverySecond());
        streamSource.printToErr("生成的数据>>>");

        //窗口1对未分流的数据统计数字总和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.countWindowAll(5).sum(1);

        //窗口2对分流的数据统计数字总和
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).countWindow(5).sum(1);

        sumOfAll.print("未分流数据的统计>>>");
        sumEashKey.print("分流后数据的统计>>>");

        //todo 4)运行任务
        env.execute();
    }

    private static class GenerateRandomNumEverySecond implements SourceFunction<Tuple2<String,Integer>> {
        private boolean isRunning = true;
        private final Random random = new Random();
        private final List<String> keyList = Arrays.asList("hadoop","spark","flink");

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            while(isRunning) {
                String key = keyList.get(random.nextInt(3));
                sourceContext.collect(Tuple2.of(key,random.nextInt(100)));
                TimeUnit.SECONDS.sleep(1);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

生成的数据>>>:3> (spark,76)
生成的数据>>>:4> (hadoop,30)
生成的数据>>>:5> (flink,85)
生成的数据>>>:6> (spark,83)
生成的数据>>>:7> (flink,37)

未分流数据的统计>>>:6> (spark,311)

生成的数据>>>:8> (flink,26)
生成的数据>>>:1> (hadoop,70)
生成的数据>>>:2> (flink,19)
生成的数据>>>:3> (hadoop,49)
生成的数据>>>:4> (flink,15)

分流后数据的统计>>>:7> (flink,182)
未分流数据的统计>>>:7> (flink,179)

总结:

  • 未分流是将前面五条数据合并统计,key 是选择第一个,values 是5条数据的总和
  • 分流是将前面 key 相同的5条数据加起来,key 未凑到5个,就不触发计算
1.9.2 滑动窗口(有重叠数据)

例子:

package cn.itcast.day08.window;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * @author lql
 * @time 2024-02-25 22:18:03
 * @description TODO
 */
public class SlidingCountWindowDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // todo 1) source
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());
        streamSource.printToErr("生成的数据>>>");
        // 第一个窗口 未分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.countWindowAll(10, 5).sum(1);

        // 第二个窗口 分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).countWindow(10, 5).sum(1);

        sumOfAll.print("未分流数据的统计>>>");
        sumEashKey.print("分流后数据的统计>>>");

        //todo 4)运行任务
        env.execute();

    }

    private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {
        private boolean isRunning = true;
        private final Random random = new Random();
        private final List<String> keysList = Arrays.asList("hadoop","spark","flink");

        @Override
        public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            while(isRunning){
                String key = keysList.get(random.nextInt(3));
                sourceContext.collect(Tuple2.of(key,random.nextInt(100)));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
}

结果:

生成的数据>>>:8> (flink,23)
生成的数据>>>:1> (flink,54)
生成的数据>>>:2> (hadoop,85)
生成的数据>>>:3> (hadoop,88)
生成的数据>>>:4> (spark,35)

未分流数据的统计>>>:3> (flink,285)

生成的数据>>>:5> (spark,15)
生成的数据>>>:6> (hadoop,57)
生成的数据>>>:7> (hadoop,20)
生成的数据>>>:8> (spark,55)
生成的数据>>>:1> (hadoop,87)

分流后数据的统计>>>:8> (hadoop,337)
未分流数据的统计>>>:4> (flink,519)

生成的数据>>>:2> (spark,38)
生成的数据>>>:3> (hadoop,1)
生成的数据>>>:4> (spark,88)

分流后数据的统计>>>:1> (spark,231)

生成的数据>>>:5> (hadoop,16)
生成的数据>>>:6> (hadoop,28)

未分流数据的统计>>>:5> (spark,405)

总结:

  • 把它想象成长度为10的小框,每次移动五条数据,看看能框住多少
  • 未分流,就是简单框住多少个10条数据总和,除了第一次是框住5条数据
  • 分流,就是分类需要数量达到5才能计算
1.10 Session Window 案例

理解:会话窗口属于时间窗口,Session window的窗口大小,则是由数据本身决定

假设Session Window的时间gap如果是6秒,那么,上面的数据会被分成以下几个窗口

key,10:00:00
key,10:00:03
key,10:00:05
==========05 与 12相差大于6秒,需要分割=======================

key,10:00:12
key,10:00:15
==========15 与 24 相差大于6秒,需要分割======================

key,10:00:24
==========24 与 30 相差等于6秒,需要分割======================

key,10:00:30
==========30 与 42 相差大于6秒,需要分割======================

key,10:00:42

也就是说,窗口之间划分的条件是时间差小于gap

例子:定义一个会话时间窗口, 5秒gap

package cn.itcast.day08.window;

/**
 * @author lql
 * @time 2024-02-25 22:59:24
 * @description TODO
 */

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
 * 案例: 自定义一个Source, 每隔随机的秒(1~10)之间产生1条数据
 * 数据是key value, key: hadoop spark flink 其中一个, value: 是随机的数字
 * 需求1:定义一个会话时间窗口, 5秒gap, 统计全量数据之和
 * 需求2: 定义一个会话时间窗口, 5秒gap, 统计按照key分组后的每个组数据内的数字和
 */
public class TimeSessionWindowDemo {
    private static final SimpleDateFormat sdf = new SimpleDateFormat("mm:ss.SSS");

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // source
        DataStreamSource<Tuple2<String, Integer>> streamSource = env.addSource(new GeneraterRandomNumSource());
        streamSource.printToErr("生成的数据>>>");

        // 窗口 1,不分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumOfAll = streamSource.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1);

        // 窗口 2,分组
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumEashKey = streamSource.keyBy(t -> t.f0).window(ProcessingTimeSessionWindows.withGap(Time.seconds(5))).sum(1);
        sumOfAll.print("未分流数据的统计>>>");
        sumEashKey.print("分流后数据的统计>>>");

        //todo 4)运行任务
        env.execute();
    }

    private static class GeneraterRandomNumSource implements SourceFunction<Tuple2<String,Integer>> {
        private boolean isRunning = true;
        private final List<String> keysList = Arrays.asList("hadoop","spark","flink");
        private final Random random = new Random();
        @Override
        public void run(SourceContext<Tuple2<String, Integer>> sourceContext) throws Exception {
            while(isRunning) {
                String key = keysList.get(random.nextInt(3));
                sourceContext.collect(Tuple2.of(key,random.nextInt(100)));
                // 为了避免生成器连续两次只休眠5秒
                // 但是,这个循环的逻辑有点问题,因为它会不断地尝试重新生成随机数,直到生成的不是5为止,这可能会导致长时间的等待。
                long sleepTime = 5L;
                while (sleepTime == 5L){
                    sleepTime = random.nextInt(7);
                }
                System.out.println(sdf.format(new Date()) + ":sleep:"+sleepTime + "s");
                TimeUnit.SECONDS.sleep(sleepTime);
            }
        }

        @Override
        public void cancel() {
            isRunning = false;

        }
    }
}

结果:

18:19.058:sleep:1s
生成的数据>>>:3> (spark,61)
18:20.063:sleep:2s
生成的数据>>>:4> (spark,97)
18:22.066:sleep:2s
生成的数据>>>:5> (spark,46)
18:24.070:sleep:1s
生成的数据>>>:6> (hadoop,55)
18:25.078:sleep:1s
生成的数据>>>:7> (flink,27)
18:26.085:sleep:4s
生成的数据>>>:8> (flink,9)

分流后数据的统计>>>:1> (spark,204)
分流后数据的统计>>>:8> (hadoop,55)

18:30.095:sleep:1s
生成的数据>>>:1> (flink,4)
18:31.099:sleep:6s
生成的数据>>>:2> (flink,61)

分流后数据的统计>>>:7> (flink,101)
未分流数据的统计>>>:8> (spark,360)

18:37.104:sleep:6s
生成的数据>>>:3> (hadoop,91)

分流后数据的统计>>>:8> (hadoop,91)
未分流数据的统计>>>:1> (hadoop,91)

总结:

  • 打印结果位于生成数量之间,是因为计算是基于数据的时间差触发的
  • 分流的数据是,两个key相同的数据时间差触发计算
  • 未分流的数据是,两个数据时间差触发计算
  • 这里的时间是ProcessingTime [处理时间]
1.11 Window Function

窗口函数,即数据划分窗口后可以调用的处理函数。

1.11.1 增量聚合函数

指窗口每进入一条数据就计算一次

实现方法:(常见的增量聚合函数如下):

  • reduce(reduceFunction)
  • aggregate(aggregateFunction)
  • sum()
  • min()
  • max()
(1)reduce 和 aggregate 的区别

  • reduce 接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个
  • maxBy、minBy、sum 这3个底层都是由 reduce 实现的
  • aggregate 的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R>
(2) ReduceFunction

例子:Flink使用ReduceFunction来对窗口中的元素进行增量聚合

package cn.itcast.day08.WindowFunction;

/**
 * @author lql
 * @time 2024-02-26 18:40:13
 * @description TODO:测试 reduceFunction
 */

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.AllWindowedStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;

/**
 * ReduceFunction定义了如何把两个输入的元素进行合并来生成相同类型的输出元素的过程,Flink使用ReduceFunction来对窗口中的元素进行增量聚合
 * 需求:不分组,划分窗口
 * 然后调用reduce对窗口内的数据进行聚合
 */
public class CountWindowAllRedcueDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接受数据
        DataStreamSource<String> lines = env.socketTextStream("node1", 9999);

        //todo 3) 将数据转化为数字类型
        SingleOutputStreamOperator<Integer> nums = lines.map(Integer::parseInt);

        //todo 4)划分窗口
        //GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)
        AllWindowedStream<Integer, GlobalWindow> windowed = nums.countWindowAll(5);

        SingleOutputStreamOperator<Integer> result = windowed.reduce(new ReduceFunction<Integer>() {
            @Override
            public Integer reduce(Integer value1, Integer value2) throws Exception {
                return value1 + value2; //增量聚合,不是满足条件再计算的,因此该方式效率更高,更节省资源
            }
        });

        //todo 6)打印输出
        result.print();

        //todo 7)递交作业
        env.execute();
    }
}

结果:

终端依次输入1,2,3,……10

输出:
1> 15
2> 40

总结:

  • 增量聚合,运用 countWindowAll,每五个计算一次
  • GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)
  • 重写 reduce方法,增量就用 value1 + value2
  • 适用于直接来值,增量累加
(3) AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)

例子:测试AggFunction——求各个班级英语成绩平均分

package cn.itcast.day08.WindowFunction;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author lql
 * @time 2024-02-26 18:52:50
 * @description TODO
 */
public class TestAggFunctionOnWindow {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接受数据
        DataStreamSource<Tuple3<String, String, Long>> inputSource = env.fromElements(ENGLISH);

        //todo 3)划分窗口
        //GlobalWindow有几个并行?一个并行,只有一个分区(窗口中只有一个subtask)
        SingleOutputStreamOperator<Double> result = inputSource.keyBy(t -> t.f0).countWindow(3).aggregate(new AvgAggFunction());

        //todo 4)打印输出
        result.print();

        //todo 5)递交作业
        env.execute();
    }

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    //AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
    //Tuple3<String, String, Long>:班级名称、学生名称、学生分数
    //Tuple2<Long, Long>:学生总分数、学生人数
    //Double:平均分数

    private static class AvgAggFunction implements AggregateFunction<Tuple3<String,String,Long>, Tuple2<Long,Long>,Double> {

        /**
         * 创建累加器保存中间状态(sum,count)
         * Long:总成绩
         * Long:学生个数
         * @return
         */
        @Override
        public Tuple2<Long, Long> createAccumulator() {
            return new Tuple2<>(0L,0L);
        }

        /**
         * 将元素追加到累加器并返回累加器
         * @param value 输入类型
         * @param acc 累加器acc类型
         * @return
         */
        @Override
        public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) {
            // acc(历史累加总成绩,学生个数)
            // value(班级,姓名,成绩)
            // 成绩累加的同时,同学人数加一
            return new Tuple2<>(acc.f0+value.f2,acc.f1+1);
        }

        /**
         * 从累加器提取数据
         * @param accumulator
         * @return
         */
        @Override
        public Double getResult(Tuple2<Long, Long> accumulator) {
            return (double) accumulator.f0 / accumulator.f1;
        }

        /**
         * 累加器合并
         * @param acc1
         * @param acc2
         * @return
         */
        @Override
        public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) {
            return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
        }
    }
}

结果:

1> 33.333333333333336
3> 66.66666666666667

总结:

  • 因为 aggregate 是一个聚合类别,这里求平均值,所以 new 一个 AvgAggFunction
  • AvgAggFunction 实现 AggregateFunction 接口
  • AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。
  • 基于增量累加原理,重写 4 个方法:
    • createAccumulator:创建累加器保存中间状态(sum,count),返回值是初始化值
    • add:将元素追加到累加器并返回累加器
    • getResult:从累加器中提取值
    • merge:将累加器合并
1.11.2 全量聚合函数
  • 指在窗口触发的时候才会对窗口内的所有数据进行一次计算

  • 等窗口的数据到齐,才开始进行聚合计算,可实现对窗口内的数据进行排序等需求

实现方法

  • apply(windowFunction)
  • process(processWindowFunction)
(1) apply 和 process 的区别

  • apply 和 process 都是处理全量计算,但工作中正常用 process。
  • process更加底层,更加强大,有 open/close 生命周期方法,又可获取RuntimeContext。
(2) ProcessWindowFunction / ProcessAllWindowFunction
  • 全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合

  • ProcessWindowFunction 可以结合 ReduceFunction, AggregateFunction, 或者 FoldFunction 来做增量计算(推荐用法)

例子:

package cn.itcast.day08.WindowFunction;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-27 15:52:54
 * @description TODO:演示processWindowFunction实现全量聚合
 */
public class TestProcessWinFunctionOnWindow {
    public static void main(String[] args) throws Exception {
        // Todo 1): 获取流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Todo 2): 获取数据源
        DataStreamSource<Tuple3<String,String,Long>> inputSource = env.fromElements(ENGLISH);

        // Todo 3): 计算各班平均分
        inputSource.keyBy(t->t.f0).countWindow(2).process(new MyProcessWindowFunction()).print();

        // Todo 4): 启动程序
        env.execute();

    }

    public static final Tuple3[] ENGLISH = new Tuple3[] {
            Tuple3.of("class1", "张三", 100L),
            Tuple3.of("class1", "李四", 40L),
            Tuple3.of("class1", "王五", 60L),
            Tuple3.of("class2", "赵六", 20L),
            Tuple3.of("class2", "小七", 30L),
            Tuple3.of("class2", "小八", 50L),
    };

    /**
     * Tuple3<String, String, Long>:传入值类型
     * Double:返回值类型
     * String:分组字段类型
     * GlobalWindow:countWindow需要使用GlobalWindow, window使用TimeWindow
     */


    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double,String, GlobalWindow> {
        //iterable:窗口内的所有元素的集合

        @Override
        public void process(String s, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> collector) throws Exception {
            long sum = 0;
            long count = 0;
            for (Tuple3<String, String, Long> in : iterable) {
                sum += in.f2;
                count++;
            }
            collector.collect((double) sum / count );
        }
    }
}

结果:

3> 70.0
1> 25.0

总结:

  • countwindow(2) 这里的计数窗口,就是求和的时候,只求两个数据之和
  • 继承 ProcessWindowFunction,重写 process 方法
  • iterable:窗口内的所有元素的集合
  • Tuple3.of(“class2”,“小八”,97L), 加上L,表示该数据是 long 类型的常量
(3) 自定义聚合 apply

例子:使用apply方法来实现单词统计

package cn.itcast.day08.WindowFunction;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

/**
 * @author lql
 * @time 2024-02-28 01:14:05
 * @description TODO:使用apply方法来实现单词统计
 */
public class WindowApplyDemo {
    public static void main(String[] args) throws Exception {
        //todo 1)初始化flink流处理的运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //todo 2)接入数据源
        DataStreamSource<String> lines = env.socketTextStream("node1", 8888);

        //todo 3) 数据扁平化
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = lines.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        //todo 4) 数据分组
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowStream = wordAndOne.keyBy(t -> t.f0).window(TumblingProcessingTimeWindows.of(Time.seconds(10)));

        //todo 5) apply 自定义聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = windowStream.apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                int sum = 0;
                String key = null;
                for (Tuple2<String, Integer> wordAndOne : input) {
                    sum += wordAndOne.f1;
                    key = wordAndOne.f0;
                }
                out.collect(Tuple2.of(key, sum));
            }
        });

        // todo 6) 打印结果
        result.print();
        env.execute();
    }
}

结果:

8> (hadoop,1)
1> (spark,1)
7> (flink,1)
1> (kafka,1)

总结:

  • apply 方法中 WindowFunction 需要指定输出类型,而不是单纯 Object

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/415711.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

046-WEB攻防-注入工具SQLMAPTamper编写指纹修改高权限操作目录架构

046-WEB攻防-注入工具&SQLMAP&Tamper编写&指纹修改&高权限操作&目录架构 #知识点&#xff1a; 1、SQLMAP-常规猜解&字典配置 2、SQLMAP-权限操作&文件命令 3、SQLMAP-Tamper&使用&开发 4、SQLMAP-调试指纹&风险等级 演示案例&#xf…

华为配置AP接入GPON网络配置示例

配置AP接入GPON网络配置示例 组网图形 图1 配置AP接入GPON网络示例 表1 版本信息 网元 设备选型 版本 OLT EA5800 V100R019C20 AC AC6805 V200R019C10 AP AirEngine 6760-X1 配套安装OptiXstar S800E GPON光模块 V200R019C10 Switch S6320-SI V200R019C10 ^^^ 组…

[python]随机选取的方式——random.choices()

关于随机选取的函数。 1. 列表随机选取 1.1. 随机等概率选取一个结果 首先我们来想象一下&#xff0c;现在有一个列表&#xff0c;要在其中随机选取一个数字&#xff0c;比如&#xff1a; a [1,2,3,4,5] 这里我们需要用到一种比较简单的随机选取方式&#xff0c;即random…

Vueuse:打造高效的 Vue.js 开发利器

Vueuse&#xff1a;打造高效的 Vue.js 开发利器 Vueuse 是一个功能强大的 Vue.js 生态系统工具库&#xff0c;它提供了一系列的可重用的 Vue 组件和函数&#xff0c;帮助开发者更轻松地构建复杂的应用程序。本文将介绍 Vueuse 的主要特点和用法&#xff0c;以及它在 Vue.js 开发…

VS Code(Visual Studio Code)本地(local)和远程(ssh)Docker Container 下的 Python 开发和调试

VS Code&#xff08;Visual Studio Code&#xff09;本地&#xff08;local&#xff09;和远程&#xff08;ssh&#xff09;Docker Container 下的 Python 开发和调试 1. 目的需求2. VS Code 简介3. 使用实践&#xff1a;一个简单的实例3.1 准备工作3.1.1 远程服务器3.1.2 本地…

揭示IP风险画像的作用与价值

在当今数字化时代&#xff0c;互联网的快速发展为企业和个人带来了巨大的机遇&#xff0c;同时也带来了各种安全风险和威胁。随着网络攻击手段的不断升级和演变&#xff0c;传统的安全防御手段已经无法满足对抗复杂多变的网络威胁的需求。IP风险画像作为一种新型的网络安全解决…

24计算机考研深大经验分享(计算机专业考研综合安排)

文章目录 背景科目选择高数选课一轮二轮冲刺阶段 线代一轮二轮 概率论计算机学科专业基础408数据结构计算机组成原理操作系统计算机网络总结 英语政治 末言 背景 首先贴一下初试成绩。这篇分享主要是给零基础的同学使用的&#xff0c;基础好的同学可以自行了解补充一下&#xf…

[设计模式Java实现附plantuml源码~行为型] 对象状态及其转换——状态模式

前言&#xff1a; 为什么之前写过Golang 版的设计模式&#xff0c;还在重新写Java 版&#xff1f; 答&#xff1a;因为对于我而言&#xff0c;当然也希望对正在学习的大伙有帮助。Java作为一门纯面向对象的语言&#xff0c;更适合用于学习设计模式。 为什么类图要附上uml 因为很…

智能边缘小站 CloudPond(低延迟、高带宽和更好的数据隐私保护)

智能边缘小站 CloudPond(低延迟、高带宽和更好的数据隐私保护) 边缘小站的主要功能是管理用户在线下部署的整机柜设施&#xff0c;一个边缘小站关联一个华为云指定的区域和一个用户指定的场地&#xff0c;相关的资源运行状况监控等。 边缘计算 迈入5G和AI时代&#xff0c;新…

unity 场景烘焙中植物叶片(单面网络)出现的白面

Unity版本 2021.3.3 平台 Windows 在场景烘焙中烘焙植物的模型的时候发现植物的叶面一面是合理的&#xff0c;背面是全白的&#xff0c;在材质球上勾选了双面烘焙&#xff0c;情况如下 这个问题可能是由于植物叶片的单面网格导致的。在场景烘焙中&#xff0c;单面网格只会在一…

区块链游戏解说:什么是 Arcade Champion

作者&#xff1a;lesleyfootprint.network 编译&#xff1a;cicifootprint.network 数据源&#xff1a;Arcade Champion Dashboard 什么是 Arcade Champion Arcade Champion 代表了移动游戏世界的重大革新。它将经典街机游戏的怀旧与创新元素结合在一起&#xff0c;包括 NF…

创建型设计模式 - 建造者设计模式 - JAVA

建造者设计模式 一. 简介二. 使用场景分析三. 代码案例3.1 创建ComputerBuilder 类3.2 修改子类3.3 修改工厂3.4 测试 四. 建造者模式案例 前言 这是我在这个网站整理的笔记,有错误的地方请指出&#xff0c;关注我&#xff0c;接下来还会持续更新。 作者&#xff1a;神的孩子都…

Unity中的UI系统之GUI

目录 概述工作原理和主要作用基础控件重要参数及文本和按钮多选框和单选框输入框和拖动条图片绘制和框 复合控件工具栏和选择网络滚动视图和分组窗口 自定义整体样式自定义皮肤样式 概述 什么是UI系统 UI是User Interface&#xff08;用户界面&#xff09;的简称&#xff0c;用…

Bert基础(五)--解码器(下)

1、 多头注意力层 下图展示了Transformer模型中的编码器和解码器。我们可以看到&#xff0c;每个解码器中的多头注意力层都有两个输入&#xff1a;一个来自带掩码的多头注意力层&#xff0c;另一个是编码器输出的特征值。 让我们用R来表示编码器输出的特征值&#xff0c;用M来…

Unity安装与简单设置

安装网址&#xff1a;https://unity.cn 设置语言&#xff1a; 设置安装位置&#xff1a;否则C盘就会爆了 获取一个个人的资格证&#xff1a; 开始安装&#xff1a; 安装完毕。 添加模块&#xff1a;例如简体中文 新建项目&#xff1a; 布局2*3、单栏布局、 设置…

el-table实现转置表格

vue版本&#xff1a;vue2.6.10 elementui版本&#xff1a;2.15.14 实现效果&#xff1a;el-table实现行列互换 代码&#xff1a; <template><div class"app-container"><span>原始数据</span><el-table:data"datas"border>…

循环结构:for循环,while循环,do-while,死循环

文章目录 for循环for案例&#xff1a;累加for循环在开发中的常见应用场景 whilewhile循环案例&#xff1a; for和while的区别&#xff1a;do-while三种循环的区别小结死循环 快捷键 ctrlaltt for循环 看循环执行多少次&#xff0c;就看有效数字有几个 快捷键 fori 示例代码&am…

Netty01NIO

NIO基础 NIO &#xff1a;non-blocking io 非阻塞 IO 笔记 www.zgtsky.top 网课&#xff1a;黑马Netty 三大组件 Channel & Buffer channel 有一点类似于 stream&#xff0c;它就是读写数据的双向通道&#xff0c;可以从 channel 将数据读入 buffer&#xff0c;也可以…

工作微信统一管理(还带监管功能)

1.会话页面(可统一管理多个微信号、聚合聊天、手动搜索添加好友、通过验证请求、查看好友的朋友圈等) 2.聊天历史(可查看 所有聊天记录&#xff0c;包括手机.上撤回、删除的消息) 3.群发助手(可以一 -次群发多个好友和群&#xff0c;还可以选择定时发送&#xff0c;目前还在内测…

docker (十二)-私有仓库

docker registry 我们可以使用docker push将自己的image推送到docker hub中进行共享&#xff0c;但是在实际工作中&#xff0c;很多公司的代码不能上传到公开的仓库中&#xff0c;因此我们可以创建自己的镜像仓库。 docker 官网提供了一个docker registry的私有仓库项目&#…