Flink Window中典型的增量聚合(ReduceFunction / AggregateFunction)

一、什么是增量聚合函数

在Flink Window中定义了窗口分配器,我们只是知道了数据属于哪个窗口,可以将数据收集起来了;至于收集起来到底要做什么,其实还完全没有头绪,这也就是窗口函数所需要做的事情。所以在窗口分配器之后,我们还要再接上一个定义窗口如何进行计算的操作,这就是所谓的“窗口函数”(window functions)。
窗口可以将数据收集起来,最基本的处理操作当然就是基于窗口内的数据进行聚合。
我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction。
在这里插入图片描述

二、ReduceFunction

源码解析

@FunctionalInterface
@Public
public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T var1, T var2) throws Exception;
}

实际案例
在Flink中,使用socket模拟实时的数据流DataStream,通过定义一个滚动窗口,窗口的大小为10s,按照id分区,使用reduce聚合函数实现value的累加统计

package com.flink.DataStream.WindowFunctions;

import com.flink.POJOs.WaterSensor;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class FlinkWindowReduceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        DataStreamSource<String> streamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        // 注意这里为什么返回的是KeyedStream(建控流/分区流),而不是DataStream
        KeyedStream<WaterSensor, String> keyedStream = streamSource
                // 使用map函数将输入的string转为一个WaterSensor类
                .map(new MapFunction<String, WaterSensor>() {
                    @Override
                    public WaterSensor map(String s) throws Exception {
                        // 这里写的很详细,如何把string转为的WaterSensor类
                        String[] strings = s.split(",");
                        String id = strings[0];
                        Long ts = Long.valueOf(strings[1]);
                        Integer vc = Integer.valueOf(strings[2]);
                        WaterSensor waterSensor = new WaterSensor();
                        waterSensor.setId(id);
                        waterSensor.setTs(ts);
                        waterSensor.setVc(vc);
                        return waterSensor;
                        //return new WaterSensor(strings[0],Long.valueOf(strings[1]),Integer.valueOf(strings[2])
                    }
                })
                // 按照id做keyBy分区(提问:KeyBy是如何实现分区的?)
                .keyBy(new KeySelector<WaterSensor, String>() {
                    // 也可以直接使用lamda表达式更简单
                    @Override
                    public String getKey(WaterSensor waterSensor) throws Exception {
                        // getId()方法就是return的waterSensor.id
                        return waterSensor.getId();
                    }
                });
        /**
         * 窗口操作主要有两个部分:窗口分配器(Window Assigners)和窗口函数(WindowFunctions)
         * .window()方法需要传入一个窗口分配器,它指明了窗口的类型
         * */
        SingleOutputStreamOperator<WaterSensor> outputStreamOperator = keyedStream
                // 设置滚动窗口的大小(10秒)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                // 使用匿名函数实现增量聚合函数ReduceFunction
                .reduce(new ReduceFunction<WaterSensor>() {
                    @Override
                    public WaterSensor reduce(WaterSensor waterSensor1, WaterSensor waterSensor2) throws Exception {
                        System.out.println("调用reduce方法,之前的结果:" + waterSensor1 + ",现在来的数据:" + waterSensor2);
                        return new WaterSensor(waterSensor1.getId(), System.currentTimeMillis(), waterSensor1.getVc() + waterSensor2.getVc());
                    }
                });
        outputStreamOperator.print();
        streamExecutionEnvironment.execute();
    }
}

启动Flink程序,启动nc,模拟输入

nc -lk 8888
# 00-10秒输入
a,11111,1
# 11-20秒输入
a,11111,2
a,22222,3
# 21-30秒输入
a,11111,4

查看控制台打印结果

WaterSensor{id='a', ts=11111, vc=1}
调用reduce方法,之前的结果:WaterSensor{id='a', ts=11111, vc=2},现在来的数据:WaterSensor{id='a', ts=22222, vc=3}
WaterSensor{id='a', ts=1702022598011, vc=5}
WaterSensor{id='a', ts=11111, vc=4}

在这里插入图片描述

三、AggregateFunction

虽然ReduceFunction 可以解决大多数归约聚合的问题,但是我们通过上述案例可以发现:这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。
Flink Window API 中的 aggregate 就突破了这个限制,可以定义更加灵活的窗口聚合操作。这个方法需要传入一个 AggregateFunction 的实现类作为参数。AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了。

@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
    ACC createAccumulator();

    ACC add(IN var1, ACC var2);

    OUT getResult(ACC var1);

    ACC merge(ACC var1, ACC var2);
}

接口中有四个方法:
1.createAccumulator()
创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次。
2.add()
将输入的元素添加到累加器中。
3.getResult()
从累加器中提取聚合的输出结果。
4.merge()
合并两个累加器,并将合并后的状态作为一个累加器返回。
所以可以看到,AggregateFunction 的工作原理是:首先调用 createAccumulator()为任务初始化一个状态(累加器);而后每来一个数据就调用一次 add()方法,对数据进行聚合,得到的结果保存在状态中;等到了窗口需要输出时,再调用 getResult()方法得到计算结果。很明显,与 ReduceFunction 相同,AggregateFunction 也是增量式的聚合;而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/238315.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

VR转接线方案/VR Link串流数据线方案/VR眼镜PD快充方案

虚拟现实技术(英文名称&#xff1a;Virtual Reality&#xff0c;缩写为VR)&#xff0c;又称虚拟实境或灵境技术&#xff0c;是20世纪发展起来的一项全新的实用技术。虚拟现实技术囊括计算机、电子信息、仿真技术&#xff0c;其基本实现方式是以计算机技术为主&#xff0c;利用并…

数据库系列之简要对比下GaussDB和OpenGauss数据库

GaussDB作为一款企业级的数据库产品&#xff0c;和开源数据库OpenGauss之间又是什么样的关系&#xff0c;刚开始接触的时候是一头雾水&#xff0c;因此本文简要对比下二者的区别&#xff0c;以加深了解。 1、GaussDB和OpenGauss数据库简要对比 GaussDB是华为基于PostgreSQL数据…

表格中上传文件的表单验证

<template><!-- 新增记录 --><div class"newRecord"><div class"danger-detail"><div class"detail-right"><el-form :model"ruleForm" :rules"rules" ref"ruleForm" label-wid…

数据库系统原理与实践 笔记 #12

文章目录 数据库系统原理与实践 笔记 #12事务管理和并发控制与恢复(续)并发控制SQL-92中的并发级别基于锁的协议基于锁的协议的隐患锁的授予封锁协议两阶段封锁协议多粒度粒度层次的例子意向锁类型相容性矩阵多粒度封锁模式基于时间戳的协议基于时间戳协议的正确性基于有效性检…

解决:TypeError: write() argument must be str, not Tag

解决&#xff1a;TypeError: write() argument must be str, not Tag 文章目录 解决&#xff1a;TypeError: write() argument must be str, not Tag背景报错问题报错翻译报错位置代码报错原因解决方法今天的分享就到此结束了 背景 在使用之前的代码时&#xff0c;报错&#xf…

如何轻松解决企业报修系统问题?有什么比较实用的工单管理系统?

许多企业报修方面的问题就是兼职师傅多&#xff0c;难以管理&#xff0c;导致整个报修流程都比较滞后。关于这个问题&#xff0c;我个人的建议是引入工单管理系统&#xff0c;依靠线上平台进行统一的管理。 引入工单管理系统的好处主要有&#xff1a;   1、沟通报修更加高效 …

频率、概率

频率 在相同的条件下进行试验&#xff0c;假设试验进行了次&#xff0c;其中随机事件A发生了次&#xff0c;那么就称为随机事件A发生的频率。 概率 假设随机试验E的样本空间是S&#xff0c;对于其中每个随机事件&#xff0c;都对应了一个实数&#xff0c;把这个实数称为随机…

Qt/C++音视频开发59-使用mdk-sdk组件/原qtav作者力作/性能凶残/超级跨平台

一、前言 最近一个月一直在研究mdk-sdk音视频组件&#xff0c;这个组件是原qtav作者的最新力作&#xff0c;提供了各种各样的示例demo&#xff0c;不仅限于支持C&#xff0c;其他各种比如java/flutter/web/android等全部支持&#xff0c;性能上也是杠杠的&#xff0c;目前大概…

力扣每日一题day34[110. 平衡二叉树]

给定一个二叉树&#xff0c;判断它是否是高度平衡的二叉树。 本题中&#xff0c;一棵高度平衡二叉树定义为&#xff1a; 一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过 1 。 示例 1&#xff1a; 输入&#xff1a;root [3,9,20,null,null,15,7] 输出&#xff1a;t…

Linux学习第46天:Linux音频驱动试验:能不能?不行也得行。

Linux版本号4.1.15 芯片I.MX6ULL 大叔学Linux 品人间百味 思文短情长 CAN 是目前应用非常广泛的现场总线之一&#xff0c;主要应用于汽车电子和工业领域&#xff0c;尤其是汽车 领域&#xff0c;汽车上大量的传感器与模块都是通过 C…

【SpringBoot篇】详解基于Redis实现短信登录的操作

文章目录 &#x1f970;前言&#x1f6f8;StringRedisTemplate&#x1f339;使用StringRedisTemplate⭐常用的方法 &#x1f6f8;为什么我们要使用Redis代替Session进行登录操作&#x1f386;具体使用✨编写拦截器✨配置拦截器&#x1f33a;基于Redis实现发送手机验证码操作&am…

DNSLog漏洞探测(三)之XSS漏洞实战

DNSLog漏洞探测(三)之XSS漏洞实战 通过前面的学习&#xff0c;我们已经明白了什么是DNSLog平台&#xff0c;那么DNSLog平台到底能为我们做些什么呢&#xff1f; DNSLog的平台实际使用很长见的一种情况就是针对漏洞无回显的情况&#xff0c;我们通过让受害者的服务器主动发起对…

Error: Failed to resolve vue/compiler-sfc——vite项目启动报错——npm run serve

运行项目时&#xff0c;报错如下&#xff1a; Error: Failed to resolve vue/compiler-sfc 根据报错信息的提示&#xff1a;vue的版本必须大于3.2.25&#xff0c;经过查看package.json文件&#xff0c;可以看到vue的版本为3.2.36&#xff0c;是满足条件的。 因此考虑缓存问题&…

Python部分基础知识入门学习,十分钟快速上手

文章目录 一、基础语法二、变量类型三、运算符四、条件语句关于Python技术储备一、Python所有方向的学习路线二、Python基础学习视频三、精品Python学习书籍四、Python工具包项目源码合集①Python工具包②Python实战案例③Python小游戏源码五、面试资料六、Python兼职渠道 一、…

并发编程的基本概念

进程与线程 进程 程序由指令和数据组成&#xff0c;但这些指令要运行&#xff0c;数据要读写&#xff0c;就必须将指令加载至 CPU&#xff0c;数据加载至内存。在指令运行过程中还需要用到磁盘、网络等设备。进程就是用来加载指令、管理内存、管理 IO 的当一个程序被运行&…

【计算机网络】UDP报文详解

目录 一. UDP协议概述 二. UDP报文格式 1. 首部 2. 校验和 三. UDP的缓冲区 结束语 一. UDP协议概述 UDP——用户数据报协议&#xff0c;是传输层的一个重要协议 基于UDP的应用层协议有&#xff1a;DNS&#xff0c;TFTP&#xff0c;SNMP&#xff0c;NTP 协议全称默认端…

pt34-python-Celery

Celery异步网络框架 定义 Celery 是一个简单、灵活且可靠的&#xff0c;处理大量消息的分布式系统&#xff0c;它是一个专注于实时处理的任务队列&#xff0c;同时也支持任务调度。中文官网&#xff1a;http://docs.jinkan.org/docs/celery/ 在线安装 sudo pip3 install -U…

语音验证码有哪些优势

稳定高效 链接各国运营商语音通道&#xff0c;不受时间地域限制&#xff0c;到达率较高&#xff0c;验证效果稳定高效&#xff0c;大大提高了客户回填率和转化率&#xff0c;减少客户流失。 海量并发 智能运维系统&#xff0c;实时监控&#xff0c;自动切换&#xff0c;无需…

【二分查找】【滑动窗口】LeeCode2528:最大化城市的最小电量

作者推荐 【动态规划】【广度优先】LeetCode2258:逃离火灾 本文涉及的基础知识点 二分查找算法合集 滑动窗口 题目 给你一个下标从 0 开始长度为 n 的整数数组 stations &#xff0c;其中 stations[i] 表示第 i 座城市的供电站数目。 每个供电站可以在一定 范围 内给所有城…

PR分屏模板|Premiere动态多画面多屏特效视频模板剪辑素材

这一个很棒的分屏效果PR幻灯片模板视频素材&#xff01;为您的视频制作多屏幕动画&#xff01; 非常易于定制、更改颜色。 仅支持Premier Pro 2024及最新版本。 高清分辨率&#xff1a;19201080/30fps。 持续时间–00:37。 21媒体占位符&#xff08;照片或视频&#xff09;。 包…