Flink窗口(2)—— Window API

目录

窗口分配器

时间窗口

计数窗口

全局窗口

窗口函数

增量聚合函数

全窗口函数(full window functions)

增量聚合和全窗口函数的结合使用

Window API 主要由两部分构成:窗口分配器(Window Assigners)和窗口函数(Window Functions)

stream.keyBy(<key selector>)
 .window(<window assigner>) //指明窗口的类型
 .aggregate(<window function>) //定义窗口具体的处理逻辑

在window()方法中传入一个窗口分配器;

在aggregate()方法中传入一个窗口函数;

窗口分配器

指定窗口的类型,定义数据应该被“分配”到哪个窗口

方法:.window()

参数:WindowAssigner

返回值:WindowedStream

如果是非按键分区窗口,那么直接调用.windowAll()方法,同样传入一个WindowAssigner,返回的是 AllWindowedStream

时间窗口

滚动处理时间窗口

stream.keyBy(...)  
//1..of()方法需要传入一个 Time 类型的参数 size,表示滚动窗口的大小
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) //窗口大小
//2.通过设置偏移量offset 来调整起始点的时间戳
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) //窗口大小,偏移量
.aggregate(...)

默认的窗口起始点时间戳是窗口大小的整倍数

如果我们定义 1 天的窗口,默认就从 0 点开始;如果定义 1 小时的窗口,默认就从整点开始

如果不想用默认值,就需要设置好偏移量

偏移量的作用:标准时间戳其实就是1970 年 1 月 1 日 0 时 0 分 0 秒 0 毫秒开始计算的一个毫秒数,而这个时间是以 UTC 时间,也就是 0 时区(伦敦时间)为标准的。我们所在的时区是东八区,也就是 UTC+8,跟 UTC 有 8小时的时差。我们定义 1 天滚动窗口时,如果用默认的起始点,那么得到就是伦敦时间每天 0点开启窗口,这时是北京时间早上 8 点。那怎样得到北京时间每天 0 点开启的滚动窗口呢?只要设置-8 小时的偏移量就可以了

滑动处理时间窗口

stream.keyBy(...)
//窗口大小,滑动步长(同样也可以设置偏移量)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

处理时间会话窗口

stream.keyBy(...)
//超时时间
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

以上是静态设置了超时时间,也可以动态设置:

.window(ProcessingTimeSessionWindows.withDynamicGap(newSessionWindowTimeGapExtractor<Tuple2<String, Long>>() {
 @Override
 public long extract(Tuple2<String, Long> element) { 
// 提取 session gap 值返回, 单位毫秒
//提取了数据元素的第一个字段,用它的长度乘以 1000 作为会话超时的间隔
 return element.f0.length() * 1000;
 }
}

滚动事件时间窗口

stream.keyBy(...)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) 
.aggregate(...)

滑动事件时间窗口

stream.keyBy(...)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.aggregate(...)

事件时间会话窗口

stream.keyBy(...)
.window(EventTimeSessionWindows.withGap(Time.seconds(10)))
.aggregate(...)

处理时间和事件时间的逻辑完全相同

计数窗口

滚动计数窗口:.countWindow(10) //窗口大小

滑动计数窗口:.countWindow(10,3) //窗口大小,滑动步长

每个窗口统计 10 个数据,每隔 3 个数据就统计输出一次结果

全局窗口

.window(GlobalWindows.create());

需要自定义触发器

 

窗口函数

WindowedStream——>DataStream

增量聚合函数

像 DataStream 的简单聚合一样,每来一条数据就立即进行计算,中间只要保持一个简单的聚合状态就可以

区别在于不立即输出结果,而是要等到窗口结束时间

归约函数(ReduceFunction):和简单聚合时使用的ReduceFunction完全一样


聚合函数(AggregateFunction):取消类型一致的限制,直接基于 WindowedStream 调 用.aggregate()方法,不需要经过map处理;这个方法需要传入一个AggregateFunction 的实现类作为参数,源码如下:

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * Creates a new accumulator, starting a new aggregate.
     *
     * <p>The new accumulator is typically meaningless unless a value is added via {@link
     * #add(Object, Object)}.
     *
     * <p>The accumulator is the state of a running aggregation. When a program has multiple
     * aggregates in progress (such as per key and window), the state (per key and window) is the
     * size of the accumulator.
     *
     * @return A new accumulator, corresponding to an empty aggregate.
     */
    ACC createAccumulator();

    /**
     * Adds the given input value to the given accumulator, returning the new accumulator value.
     *
     * <p>For efficiency, the input accumulator may be modified and returned.
     *
     * @param value The value to add
     * @param accumulator The accumulator to add the value to
     * @return The accumulator with the updated state
     */
    ACC add(IN value, ACC accumulator);

    /**
     * Gets the result of the aggregation from the accumulator.
     *
     * @param accumulator The accumulator of the aggregation
     * @return The final aggregation result.
     */
    OUT getResult(ACC accumulator);

    /**
     * Merges two accumulators, returning an accumulator with the merged state.
     *
     * <p>This function may reuse any of the given accumulators as the target for the merge and
     * return that. The assumption is that the given accumulators will not be used any more after
     * having been passed to this function.
     *
     * @param a An accumulator to merge
     * @param b Another accumulator to merge
     * @return The accumulator with the merged state
     */
    ACC merge(ACC a, ACC b);
}

IN:输入数据类型

ACC:累加器类型

OUT:输出数据类型

AggregateFunction 接口中有四个方法:

除了继承AggregateFunction,自定义聚合函数之外,Flink为我们提供了一系列预定义的简单聚合方法,如sum()/max()/maxBy()/min()/minBy(),可以直接基于WindowedStream调用

全窗口函数(full window functions)

全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算

典型的批处理方式,适用于一些基于全部数据才能进行的运算等等

窗口函数(WindowFunction)

stream
 .keyBy(<key selector>)
 .window(<window assigner>)
    //基于 WindowedStream 调用.apply()方法,传入一个 WindowFunction 的实现类
 .apply(new MyWindowFunction());

WindowFunction的实现类中可以获取到包含窗口所有数据的可迭代集合(Iterable),还可以拿到窗口本身的信息

功能可以被 ProcessWindowFunction(处理窗口函数,见下) 全覆盖


处理窗口函数(ProcessWindowFunction)

增强版的 WindowFunction

基于 WindowedStream 调用.process()方法,传入一个 ProcessWindowFunction 的实现类

ProcessWindowFunction的泛型:ProcessWindowFunction<IN,OUT,KEY,W>

分别是输入数据类型,输出数据类型,分区键的类型,Window类型(比如,是时间窗口,就是TimeWindow)

process()方法的定义:

示例代码如下,自定义窗口处理函数来处理数据:

public class UvCountByWindowExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO)
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.timestamp;
                            }
                        }));

        // 将数据全部发往同一分区,按窗口统计UV
        stream.keyBy(data -> true)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new UvCountByWindow())
                .print();

        env.execute();
    }

     //自定义窗口处理函数
    public static class UvCountByWindow extends ProcessWindowFunction<Event, String, Boolean, TimeWindow>{
        @Override
        public void process(Boolean aBoolean, Context context, Iterable<Event> elements, Collector<String> out) throws Exception {
            HashSet<String> userSet = new HashSet<>();
            // 遍历所有数据,放到Set里去重
            for (Event event: elements){
                userSet.add(event.user);
            }
            // 结合窗口信息,包装输出内容
            Long start = context.window().getStart();
            Long end = context.window().getEnd();
            out.collect("窗口: " + new Timestamp(start) + " ~ " + new Timestamp(end)
                    + " 的独立访客数量是:" + userSet.size());
        }
    }


}

这里的Event是一个POJO类,ClickSource是自定义的数据源,其代码如下:
Event.java:

public class Event {
    public String user;
    public String url;
    public Long timestamp;

    public Event() {
    }

    public Event(String user, String url, Long timestamp) {
        this.user = user;
        this.url = url;
        this.timestamp = timestamp;
    }

    @Override
    public String toString() {
        return "Event{" +
                "user='" + user + '\'' +
                ", url='" + url + '\'' +
                ", timestamp=" + new Timestamp(timestamp) +
                '}';
    }
}

ClickSource.java: 

public class ClickSource implements SourceFunction<Event> {
    // 声明一个布尔变量,作为控制数据生成的标识位
    private Boolean running = true;
    @Override
    public void run(SourceContext<Event> ctx) throws Exception {
        Random random = new Random();    // 在指定的数据集中随机选取数据
        String[] users = {"Mary", "Alice", "Bob", "Cary"};
        String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2"};

        while (running) {
            ctx.collect(new Event(
                    users[random.nextInt(users.length)],
                    urls[random.nextInt(urls.length)],
                    Calendar.getInstance().getTimeInMillis()
            ));
            // 隔1秒生成一个点击事件,方便观测
            Thread.sleep(1000);
        }
    }
    @Override
    public void cancel() {
        running = false;
    }

}

增量聚合和全窗口函数的结合使用

增量聚合函数处理计算会更高效;而全窗口函数的优势在于提供了更多的信息

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction

处理机制:

基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/317416.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Memcache简介与运维

开源、高性能、高并发的分布式内存缓存系统。 作用 缓存关系型数据库的结果&#xff0c;减少数据库自身访问的次数。 常见内存缓存服务软件对比 memcache 纯内存 redis、memcachedb 可持久化存储&#xff0c;同时会使用磁盘存 …

Typora使用及Markdow学习笔记1

编程如画&#xff0c;我是panda&#xff01; 最近有在学习Markdown&#xff0c;所以这次分享一下我的Markdown学习笔记 目录 前言 一、标题 二、段落 1.换行 2.分割线 三、文字显示 1.字体 2.上下标 四、列表 1.无序列表 2.有序列表 3.任务列表 五、区块 六、代…

外包干了5个月,感觉技术退步明显......

先说一下自己的情况&#xff0c;大专生&#xff0c;18年通过校招进入武汉某软件公司&#xff0c;干了接近4年的功能测试&#xff0c;今年年初&#xff0c;感觉自己不能够在这样下去了&#xff0c;长时间呆在一个舒适的环境会让一个人堕落&#xff01; 而我已经在一个企业干了四…

深入理解Spring IOC

1. IOC 理论 IOC 全称控制反转&#xff0c;英文名为 Inversion of Control&#xff0c;它还有一个别名为 DI&#xff08;Dependency Injection&#xff09;,即依赖注入。 在我们刚接触Spring的时候&#xff0c;我们就听说了IOC&#xff0c;但是对于IOC的理解&#xff0c;貌似…

ubuntu 20.04下 Tesla P100加速卡使用

1.系统环境&#xff1a;系统ubuntu 20.04, python 3.8 2.查看cuDNN/CUDA与tensorflow的版本关系如下&#xff1a; Build from source | TensorFlow 从上图可以看出&#xff0c;python3.8 对应的tensorflow/cuDNN/CUDA版本。 3.安装tensorflow #pip3 install tensorflow 新版…

ZooKeeper初探:分布式世界的守护者

欢迎来到我的博客&#xff0c;代码的世界里&#xff0c;每一行都是一个故事 ZooKeeper初探&#xff1a;分布式世界的守护者 前言Zookeeper的概述分布式系统中的角色和作用&#xff1a; Zookeeper的数据模型Znode的概念和层次结构&#xff1a;Znode的类型和应用场景&#xff1a;…

如何给AI下达精准的指令,哪些提示词对于AI是有效的?

刚上手那会&#xff0c;我倾向于将 prompt 翻译为“指令”&#xff0c;但这并不精确。“指令”通常对应instructions&#xff0c;属于 prompt 中的纯指令部分&#xff0c;通常是一个动宾结构&#xff08;做什么&#xff09;。剩下的部分更多是描述&#xff08;describe&#xf…

【从零开始学习微服务 | 第一篇】什么是微服务

目录 前言&#xff1a; 架构风格&#xff1a; 单体架构&#xff1a; 分布式架构&#xff1a; 微服务&#xff1a; 总结&#xff1a; 前言&#xff1a; 在当今快速发展的软件开发领域&#xff0c;构建大型应用程序已经成为一项巨大的挑战。传统的单体应用架构往往难以满足…

Shiro框架:Shiro内置过滤器源码解析

目录 1. 常见项目中过滤器配置 2.Url访问控制配置解析为内置过滤器 2.1 DefaultFilterChainManager构造并注册内置过滤器 2.2 构造过滤器链 3. Shiro内置过滤器解析 3.1 内置过滤器概览 3.2 公共继承类解析 3.2.1 顶层Filter接口 3.2.2 AbstractFilter 3.2.3 Nameab…

Github上传代码/删除仓库/新建分支的操作流程记录

首先先安装git&#xff0c;安装完git后&#xff0c;看如下操作指令&#xff1a; 输入自己的用户名和邮箱&#xff08;为注册GITHUB账号时的用户名和邮箱&#xff09;&#xff1a; git config --global user.name "HJX-exoskeleton" git config --global user.email …

扫码看图怎么做轮播效果?多组图片用扫码查看的方法

图片通过二维码来做展示现在是很常见的一种方式&#xff0c;用这种方式可以用于多种图片格式。那么当我们需要将图片做成多个分组的轮播图样式展示时&#xff0c;有什么好的方法能够做成这个效果呢&#xff1f;下面就来教大家使用二维码生成器制作图片二维码的操作方法&#xf…

Halcon边缘滤波器edges_image 算子

Halcon边缘滤波器edges_image 算子 基于Sobel滤波器的边缘滤波方法是比较经典的边缘检测方法。除此之外&#xff0c;Halcon也提供了一些新式的边缘滤波器&#xff0c;如edges_image算子。它使用递归实现的滤波器&#xff08;如Deriche、Lanser和Shen&#xff09;检测边缘&…

xtu oj 1475 冰墩墩和冰壶

题目描述 冰壶是被誉为“冰面上的国际象棋”&#xff0c;其计分规则是各自投壶&#xff0c;最后在大本营内&#xff0c;你有几个壶离圆心比对方所有壶离圆心都近就得到几分。 比如红方有两个壶&#xff0c;分别在坐标(1,1),(−2,1)&#xff1b;黄方也有两个壶&#xff0c;分别…

python中的Quene使用方法,包含多线程和多进程

在Python中&#xff0c;队列&#xff08;Queue&#xff09;是一种抽象的数据类型&#xff0c;它遵循先进先出&#xff08;FIFO&#xff09;的原则。队列是一种特殊的线性表&#xff0c;只允许在表的前端&#xff08;front&#xff09;进行删除操作&#xff0c;而在表的后端&…

蓝桥杯省赛无忧 STL 课件16 set

01 set集合 修改set比较方法的常见手段&#xff0c;后面的multiset类似 #include<bits/stdc.h> using namespace std; int main() {set<int,greater<int>> myset;myset.insert(25);myset.insert(17);myset.insert(39);myset.insert(42);for(const auto&…

黑马python就业课

文章目录 初级中级高级初级课程分享 初级 中级 高级 初级课程分享 链接&#xff1a;https://pan.baidu.com/s/1aiJHaThezv_mSI1rnV3d7g 提取码&#xff1a;xdpc

小H靶场笔记:Empire-Breakout

Empire&#xff1a;Breakout January 11, 2024 11:54 AM Tags&#xff1a;brainfuck编码&#xff1b;tar解压变更目录权限&#xff1b;Webmin&#xff1b;Usermin Owner&#xff1a;只惠摸鱼 信息收集 使用arp-scan和namp扫描C段存活主机&#xff0c;探测靶机ip&#xff1a;1…

二极管限幅电路理论分析,工作原理+作用

一、限幅是什么意思&#xff1f; 限幅也就是&#xff0c;将电压限制在某个范围内&#xff0c;去除交流信号的一部分但不会对波形的剩余部分造成影响。通常来说&#xff0c;限幅电路主要是由二极管构成&#xff0c;波形的形状取决于电路的配置和设计。二、限幅电路工作原…

软件测试|Python数据可视化神器——pyecharts教程(九)

使用pyecharts绘制K线图进阶版 简介 K线图&#xff08;Kandlestick Chart&#xff09;&#xff0c;又称蜡烛图&#xff0c;是一种用于可视化金融市场价格走势和交易数据的图表类型。它是股票、外汇、期货等金融市场中最常用的技术分析工具之一&#xff0c;可以提供关于价格变…

简单的天天酷跑小游戏实现

初级函数实现人物,背景,小乌龟的移动 #include <graphics.h> #include <iostream> #include <Windows.h> #include "tools.h" #include <mmsystem.h> #include <conio.h> #include <time.h>//时间头文件 #include <cstdlib&g…