FlinkSQL聚合函数(Aggregate Function)详解

使用场景: 聚合函数即 UDAF,常⽤于进多条数据,出⼀条数据的场景。

在这里插入图片描述

上图展示了⼀个 聚合函数的例⼦ 以及 聚合函数包含的重要⽅法

案例场景:

关于饮料的表,有三个字段,分别是 id、name、price,表⾥有 5 ⾏数据,找到所有饮料⾥最贵的饮料的价格,即执⾏⼀个 max() 聚合拿到结果,遍历所有 5 ⾏数据,最终结果就只有⼀个数值。

开发流程:

实现 AggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的;

必须实现以下⽅法:

  • Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,其存储了聚合的中间结果,⽐如在执⾏ max() 时会存储当前的 max 值;
  • accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,调⽤ accumulate() ⽅法更新 accumulator,处理每⼀条输⼊数据,方法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,⽅法的参数类型可以不同,并且⽀持变⻓参数。
  • Output输出参数 getValue(Acc accumulator) : 通过调⽤ getValue ⽅法来计算和返回最终的结果。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 在回撤流的场景下必须实现,在计算回撤数据时调⽤,如果没有实现会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景下必须要实现,此外,这个⽅法对于优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,在数据 shuffle 前先进⾏⼀次聚合计算。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型信息的⽅法:

默认情况下,⽤户的 Input 输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚合中间结果 createAccumulator() 的返回结果)、 Output输出参数数据类型( Output输出参数 getValue(Acc accumulator) 的 Output输出参数 )会被 Flink 使⽤反射获取到。

对于 accumulator 和 Output 输出参数类型,Flink SQL 的类型推导在遇到复杂类型时会推导出错误的结果(注意:Input输⼊参数 因为是上游算⼦传⼊的,类型信息是确认的,不会出现推导错误),⽐如⾮基本类型 POJO 的复杂类型。

同 ScalarFunction 和 TableFunction, AggregateFunction 提供了 AggregateFunction#getResultType() 和AggregateFunction#getAccumulatorType() 指定最终返回值类型和 accumulator 的类型,两个函数的返回值类型是TypeInformation。

  • getResultType() : 即 Output 输出参数 getValue(Acc accumulator) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型。

案例: 加权平均值

  • 定义⼀个聚合函数来计算某⼀列的加权平均
  • 在 TableEnvironment 中注册函数
  • 在查询中使⽤函数

实现思路:

为了计算加权平均值,accumulator 需要存储加权总和以及数据的条数,定义了类 WeightedAvgAccumulator 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,保证精确⼀次的语义。

WeightedAvg(聚合函数)的 accumulate ⽅法有三个输⼊参数,第⼀个是 WeightedAvgAccum accumulator,另外两个是⽤户⾃定义的输⼊:输⼊的值 ivalue 和 输⼊的权重 iweight,尽管 retract()、merge()、resetAccumulator() ⽅法在⼤多数聚合类型中都不是必须实现的,但在样例中提供了他们的实现,并且定义了 getResultType() 和 getAccumulatorType()。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import java.io.Serializable;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * 输入数据:
 * a,1,1
 * a,10,2
 *
 * 输出结果:
 * res1=>:1> +I[a, 1.0]
 * res2=>:1> +I[a, 1.0]
 * res3=>:1> +I[a, 1.0]
 *
 * res1=>:1> -U[a, 1.0]
 * res1=>:1> +U[a, 7.0]
 * res3=>:1> -U[a, 1.0]
 * res3=>:1> +U[a, 7.0]
 * res2=>:1> -U[a, 1.0]
 * res2=>:1> +U[a, 7.0]
 */
public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple3<String, Double, Double>> tpStream = source.map(new MapFunction<String, Tuple3<String, Double, Double>>() {
            @Override
            public Tuple3<String, Double, Double> map(String input) throws Exception {
                return new Tuple3<>(input.split(",")[0],
                        Double.parseDouble(input.split(",")[1]),
                        Double.parseDouble(input.split(",")[2]));
            }
        });

        Table table = tEnv.fromDataStream(tpStream, "field,iValue,iWeight");

        tEnv.createTemporaryView("SourceTable", table);

        Table res1 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call(WeightedAvg.class, $("iValue"), $("iWeight")));

        // 注册函数
        tEnv.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

        // Table API 调⽤函数
        Table res2 = tEnv.from("SourceTable")
                .groupBy($("field"))
                .select($("field"), call("WeightedAvg", $("iValue"), $("iWeight")));

        // SQL API 调⽤函数
        Table res3 = tEnv.sqlQuery("SELECT field, WeightedAvg(`iValue`, iWeight) FROM SourceTable GROUP BY field");

        tEnv.toChangelogStream(res1).print("res1=>");
        tEnv.toChangelogStream(res2).print("res2=>");
        tEnv.toChangelogStream(res3).print("res3=>");

        env.execute();

    }

    // ⾃定义⼀个计算权重 avg 的 accmulator
    public static class WeightedAvgAccumulator implements Serializable {
        public Double sum = 0.0;
        public Double count = 0.0;
    }

    // 输⼊:Long iValue, Integer iWeight
    public static class WeightedAvg extends AggregateFunction<Double, WeightedAvgAccumulator> {
        // 创建⼀个 accumulator
        @Override
        public WeightedAvgAccumulator createAccumulator() {
            return new WeightedAvgAccumulator();
        }

        public void accumulate(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum += iValue * iWeight;
            acc.count += iWeight;
        }

        public void retract(WeightedAvgAccumulator acc, Double iValue, Double iWeight) {
            acc.sum -= iValue * iWeight;
            acc.count -= iWeight;
        }

        // 获取返回结果
        @Override
        public Double getValue(WeightedAvgAccumulator acc) {
            if (acc.count == 0) {
                return null;
            } else {
                return acc.sum / acc.count;
            }
        }

        // Session window 使⽤这个⽅法将⼏个单独窗⼝的结果合并
        public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
            for (WeightedAvgAccumulator a : it) {
                acc.count += a.count;
                acc.sum += a.sum;
            }
        }

        public void resetAccumulator(WeightedAvgAccumulator acc) {
            acc.count = 0.0;
            acc.sum = 0.0;
        }
    }
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/135103.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[C++随想录] map和set的封装

map和set的封装 1. 红黑树模版的改变1.1 RBTree类模板 头的改变1.2 封装迭代器类1.2.1 构造 && 拷贝构造1.2.2. 1.2.3. - -1.2.4. 其他运算符重载 1.3 RBTree类实现普通迭代器和const迭代器 2. set的底层逻辑3. map的底层逻辑4. 源码4.1 RBTree类4.2 set类4.3 map类 1.…

搭建Docker

一、概念 云服务器大家肯定不陌生了&#xff0c;相比较传统物理服务器来说他的价格&#xff0c;个性化的配置服务&#xff0c;节省了很多的运维成本&#xff0c;越来越多的企业以及个人开发者更加的青睐于云服务器。有了属于自己的服务器就可以部署搭建自己个人网站了&#xf…

js动态显示当前时间

目录 1、封装时间函数 2、在页面写一个div标签&#xff0c;用来存放时间 3、获取div标签&#xff0c;开启定时器&#xff0c;时间为1000ms 4、先调用时间函数&#xff0c;防止页面加载延迟&#xff0c;再在定时器里调用 完整代码 效果图 1、封装时间函数 function getTi…

asp.net 在线音乐网站系统VS开发sqlserver数据库web结构c#编程Microsoft Visual Studio

一、源码特点 asp.net 在线音乐网站系统是一套完善的web设计管理系统&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式开发。开发环境为vs2010&#xff0c;数据库为sqlserver2008&#xff0c;使用c#语言 开发 asp.net 在线音乐网站系统1 应用…

Least Square Method 最小二乘法(图文详解,必懂)

最小二乘法是一种求解线性回归模型的优化方法&#xff0c;其目标是最小化数据点和拟合直线之间的残差平方和。这意味着最小二乘法关注的是找到一个直线&#xff0c;使得所有数据点与该直线的偏差的平方和最小。在数学公式中&#xff0c;如果y是实际值&#xff0c;y是函数估计值…

计算机组成原理第四章(存储系统)(一)

一、存储器概述 1.分类&#xff1a; 存取方式&#xff1a;随机存储器&#xff08;RAM&#xff09;、顺序存储器&#xff08;SAM&#xff09;、直接存储器&#xff08;DAM&#xff09; 存储介质&#xff1a;磁性材料存储器、半导体存储器、光存储器 功能和存取速度&#xff1a; …

【阿里云】函数计算 X 通义千问快速部署

写在前面&#xff1a;博主是一只经过实战开发历练后投身培训事业的“小山猪”&#xff0c;昵称取自动画片《狮子王》中的“彭彭”&#xff0c;总是以乐观、积极的心态对待周边的事物。本人的技术路线从Java全栈工程师一路奔向大数据开发、数据挖掘领域&#xff0c;如今终有小成…

2023年施工升降机司机(建筑特殊工种)证模拟考试题库及施工升降机司机(建筑特殊工种)理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年施工升降机司机(建筑特殊工种)证模拟考试题库及施工升降机司机(建筑特殊工种)理论考试试题是由安全生产模拟考试一点通提供&#xff0c;施工升降机司机(建筑特殊工种)证模拟考试题库是根据施工升降机司机(建筑特…

论文笔记--Baichuan 2: Open Large-scale Language Models

论文笔记--Baichuan 2: Open Large-scale Language Models 1. 文章简介2. 文章概括3 文章重点技术3.1 预训练3.1.1 预训练数据3.1.2 模型架构 3.2 对齐3.2.1 SFT3.2.2 Reward Model(RM)3.2.3 PPO 3.3 安全性 4. 文章亮点5. 原文传送门 1. 文章简介 标题&#xff1a;Baichuan 2…

IP-guard Webserver view 远程命令执行漏洞【2023最新漏洞】

IP-guard Webserver view 远程命令执行漏洞【2023最新漏洞】 一、漏洞描述二、漏洞影响三、漏洞危害四、FOFA语句五、漏洞复现1、手动复现yaml pocburp发包 2、自动化复现小龙POC检测工具下载地址 免责声明&#xff1a;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传…

【小沐学写作】PPT、PDF文件添加水印(Python)

文章目录 1、简介2、ppt添加水印2.1 PowerPoint幻灯片母版2.2 iSlide插件&#xff08;收费&#xff09;2.2.1 iSlide简介2.2.2 iSlide定价2.2.3 iSlide水印 2.3 Python代码2.3.1 Aspose.Slides for Python&#xff08;收费&#xff09; 3、pdf添加水印3.1 Python代码3.1.1 PyPD…

头歌答案--数据持久化(非数据库)

目录 ​编辑 数据持久化&#xff08;非数据库&#xff09; 第1关&#xff1a;数据持久化&#xff08;非数据库&#xff09; 任务描述 多线程、多进程爬虫 第1关&#xff1a;多线程、多进程爬虫 任务描述 Scrapy爬虫基础 任务描述 MySQL数据库编程 第1关&#xff1a;…

【视觉SLAM十四讲学习笔记】第二讲——初识SLAM

专栏系列文章如下&#xff1a; 【视觉SLAM十四讲学习笔记】第一讲 一个机器人&#xff0c;如果想要探索某一块区域&#xff0c;它至少需要知道两件事&#xff1a; 我在什么地方——定位周围环境是什么样——建图 一方面需要明白自身的状态&#xff08;即位置&#xff09;&#…

八种架构设计模式优缺点

目录 1、软件架构 2、架构设计模式 2.1、单库单应用模式 2.2、内容分发模式 2.3、查询分离模式 2.4 微服务模式 2.5 多级缓存模式 1、软件架构 软件架构是指对软件系统整个结构和组成部分之间的关系进行抽象和定义的过程&#xff0c;旨在解决系统设计和实现过程中的复杂…

2023年【汽车驾驶员(高级)】找解析及汽车驾驶员(高级)复审考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 汽车驾驶员&#xff08;高级&#xff09;找解析是安全生产模拟考试一点通总题库中生成的一套汽车驾驶员&#xff08;高级&#xff09;复审考试&#xff0c;安全生产模拟考试一点通上汽车驾驶员&#xff08;高级&#…

作业类型7333RG没有为成本中心7333+7000501在年度2023

创建 作业类型KP26 更改成本中心为生产部。 查看作业类型KL02, 可以看到这个有效期已超期&#xff0c;重新创建有效期KL01 但是创建成本要素时报错&#xff0c; 用KA06创建成本要素 返回KL01更正成本要素 这个有效期要注意不要重叠&#xff0c;否则无法创建新的有效期

【博士每天一篇文献-模型】A mechanistic model of connector hubs, modularity and cognition

阅读时间&#xff1a;2023-11-10 1 介绍 年份&#xff1a;2018 作者&#xff1a;Maxwell A. Bertolero, B. T. Thomas Yeo 期刊&#xff1a; nature human behaviour 引用量&#xff1a;180 2 创新点 作者提出了一个机制模型&#xff0c;解释了连接中枢的功能以及其对认知表…

Linux内核有什么之内存管理子系统有什么第七回 —— 小内存分配(5)

接前一篇文章&#xff1a;Linux内核有什么之内存管理子系统有什么第六回 —— 小内存分配&#xff08;4&#xff09; 本文内容参考&#xff1a; linux进程虚拟地址空间 《趣谈Linux操作系统 核心原理篇&#xff1a;第四部分 内存管理—— 刘超》 4.6 深入理解 Linux 虚拟内存…

手把手带你创建一个自己的GPTs

大家好&#xff0c;我是五竹。 最近GPT又进行了大升级&#xff0c;这一下又甩了国内AI几条街&#xff0c;具体更新了哪些内容之前的一篇文章中其实已经说过了&#xff1a;ChatGPT 王炸升级&#xff01;更强版 GPT-4 上线&#xff01; 其中最重要的一点就是支持自定义GPT&…

【博士每天一篇文献-算法】A pseudo-inverse decomposition-based self-organizing modular echo

阅读时间&#xff1a;2023-11-6 1 介绍 年份&#xff1a;2022 作者&#xff1a;王雷&#xff0c;北京信息科技大学自动化学院 期刊&#xff1a; Applied Soft Computing 引用量&#xff1a;12 提出了一种基于伪逆分解的自组织模块化回声状态&#xff08;PDSM-ESN&#xff09…