Flink SQL 表值聚合函数(Table Aggregate Function)详解

使用场景: 表值聚合函数即 UDTAF,这个函数⽬前只能在 Table API 中使⽤,不能在 SQL API 中使⽤。

函数功能:

在 SQL 表达式中,如果想对数据先分组再进⾏聚合取值:

select max(xxx) from source_table group by key1, key2

上⾯ SQL 的 max 语义产出只有⼀条最终结果,如果想取聚合结果最⼤的 n 条数据,并且 n 条数据,每⼀条都要输出⼀次结果数据,上⾯的 SQL 就没有办法实现了。

所以 UDTAF 为了处理这种场景,可以⾃定义 怎么取 , 取多少条 最终的聚合结果,UDTAF 和 UDAF 是类似的。

在这里插入图片描述

案例场景: 有⼀个饮料表有 3 列,分别是 id、name 和 price,⼀共有 5 ⾏,需要找到价格最⾼的两个饮料,类似于 top2,表值聚合函数,需要遍历所有 5 ⾏数据,输出结果为 2 ⾏数据的⼀个表。

开发流程:

实现 TableAggregateFunction 接⼝,其中所有的⽅法必须是 public 的、⾮ static 的

必须实现以下⽅法:

Acc聚合中间结果 createAccumulator() : 为当前 Key 初始化⼀个空的 accumulator,存储了聚合的中间结果,⽐如在执⾏ max() 时会存储每⼀条中间结果的 max 值;

accumulate(Acc accumulator, Input输⼊参数) : 每⼀⾏数据,都会调⽤ accumulate() ⽅法更新 accumulator,⽅法对每⼀条输⼊数据执⾏,⽐如执⾏ max() 时,遍历每⼀条数据执⾏;这个⽅法必须声明为 public 和⾮ static 的,accumulate ⽅法可以重载,每个⽅法的参数类型可以不同,⽀持变⻓参数。

emitValue(Acc accumulator, Collector collector) 或者 emitUpdateWithRetract(Acc accumulator, RetractableCollector collector) :

当所有的数据处理完之后,调⽤ emit ⽅法来计算和输出最终结果,可以⾃定义输出多少条以及怎样输出结果。

对于 emitValue 以及 emitUpdateWithRetract 区别,以 TopN 举例,emitValue 每次都会发送所有的最⼤的 n 个值,⽽这在流式任务中会有性能问题,为提升性能,可以实现 emitUpdateWithRetract ⽅法,这个⽅法在 retract 模式下会增量输出结果,⽐如只在有数据更新时,做到撤回⽼数据,再发送新数据,⽽不需要每次都发出全量的最新数据。

如果同时定义了 emitUpdateWithRetract、emitValue ⽅法,那 emitUpdateWithRetract 会优先于 emitValue ⽅法被使⽤,因为引擎会认为 emitUpdateWithRetract 会更加⾼效,它的输出是增量的。

某些场景下必须实现:

  • retract(Acc accumulator, Input输⼊参数) : 回撤流的场景必须实现,在计算回撤数据时调⽤,如果没有实现则会直接报错。
  • merge(Acc accumulator, Iterable it) : 在批式聚合以及流式聚合中的 Session、Hop 窗⼝聚合场景必须实现,这个⽅法对优化也有帮助,例如,打开了两阶段聚合优化,需要 AggregateFunction 实现 merge ⽅法,从⽽在第⼀阶段先进⾏数据聚合。
  • resetAccumulator() : 在批式聚合中是必须实现的。

关于⼊参、出参数据类型:

默认情况下,⽤户的 Input输⼊参数( accumulate(Acc accumulator, Input输⼊参数) 的⼊参 Input输⼊参数 )、accumulator( Acc聚 合中间结果 createAccumulator() 的返回结果)、 Output输出参数 数据类型( emitValue(Acc acc,Collector<Output输出参数> out) 的 Output输出参数 )会被 Flink 反射获取,但对于accumulator 和 Output输出参数类型来说,Flink SQL 的类型推导在遇到复杂类型的时候可能会推导出错误的结果(注意: Input输⼊参数 因为是上游算⼦传⼊的,所以类型信息是确认的,不会出现推导错误的情况),⽐如那些⾮基本类型 POJO 的复杂类型,所以跟 ScalarFunction 和 TableFunction ⼀样, AggregateFunction 提供了TableAggregateFunction#getResultType() 和 TableAggregateFunction#getAccumulatorType() 来分别指定最终返回值类型和accumulator 的类型,两个函数的返回值类型都是 TypeInformation。

  • getResultType() : 即 emitValue(Acc acc, Collector<Output输出参数> out) 的输出结果数据类型;
  • getAccumulatorType() : 即 Acc聚合中间结果 createAccumulator() 的返回结果数据类型;

案例场景: Top2

定义⼀个 TableAggregateFunction 来计算给定列的最⼤的 2 个值

在 TableEnvironment 中注册函数

在 Table API 查询中使⽤函数(当前只在 Table API 中⽀持 TableAggregateFunction)

实现思路:

计算最⼤的 2 个值,accumulator 需要保存当前的最⼤的 2 个值,定义了类 Top2Accum 作为 accumulator,Flink 的 checkpoint 机制会⾃动保存 accumulator,在失败时进⾏恢复,来保证精确⼀次的语义。

Top2 表值聚合函数(TableAggregateFunction)的 accumulate() ⽅法有两个输⼊,第⼀个是 Top2Accum accumulator,另⼀个是⽤户定义的输⼊:输⼊的值 v,尽管 merge() ⽅法在⼤多数聚合类型中不是必须的,但在样例中提供了它的实现。并且定义了 getResultType() 和 getAccumulatorType() ⽅法。

代码案例:

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

/**
 * 输入数据:
 * a,1
 * a,2
 * a,3
 * 
 * 输出结果:
 * res=>:1> +I[a, 1, 1]
 * res=>:1> -D[a, 1, 1]
 * res=>:1> +I[a, 2, 1]
 * res=>:1> +I[a, 1, 2]
 * res=>:1> -D[a, 2, 1]
 * res=>:1> -D[a, 1, 2]
 * res=>:1> +I[a, 3, 1]
 * res=>:1> +I[a, 2, 2]
 */
public class TableAggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        SingleOutputStreamOperator<Tuple2<String,Integer>> tpStream = source.map(new MapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String,Integer> map(String input) throws Exception {
                return new Tuple2<>(input.split(",")[0],Integer.parseInt(input.split(",")[1]));
            }
        });

        tEnv.registerFunction("top2", new Top2());

        Table table = tEnv.fromDataStream(tpStream, "key,value");

        tEnv.createTemporaryView("SourceTable", table);

        // 使⽤函数
        Table res = tEnv.from("SourceTable")
                .groupBy("key")
                .flatAggregate("top2(value) as (v, rank)")
                .select("key, v, rank");

        tEnv.toChangelogStream(res).print("res=>");
        env.execute();
    }

    /**
     * Accumulator for Top2.
     */
    public static class Top2Accum {
        public Integer first;
        public Integer second;
    }

    public static class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }

        public void accumulate(Top2Accum acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

        public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }

        public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }
}

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/135055.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为ensp搭建小型园区网络规划

文章目录 前言一、拓扑图二、数据规划三、设备配置四.配置命令1.配置接入层交换机ACC11.1 设备命名&#xff0c;创建VLAN1.2 配置eth-trunk 11.3 配置用户端 2.配置核心层交换机CORE2.1设备命名2.2配置Eth-Trunk2.3 vlan配置ip2.4 上行接口配置 3.DHCP配置3.1 CORE: 4.配置路由…

计算机毕业设计:疲劳驾驶检测识别系统 python深度学习 YOLOv5 (包含文档+源码+部署教程)

[毕业设计]2023-2024年最新最全计算机专业毕设选题推荐汇总 1、项目介绍 基于YOLOv5的疲劳驾驶检测系统使用深度学习技术检测常见驾驶图片、视频和实时视频中的疲劳行为&#xff0c;识别其闭眼、打哈欠等结果并记录和保存&#xff0c;以防止交通事故发生。本文详细介绍疲劳驾…

ROC 曲线详解

前言 ROC 曲线是一种坐标图式的分析工具&#xff0c;是由二战中的电子和雷达工程师发明的&#xff0c;发明之初是用来侦测敌军飞机、船舰&#xff0c;后来被应用于医学、生物学、犯罪心理学。 如今&#xff0c;ROC 曲线已经被广泛应用于机器学习领域的模型评估&#xff0c;说…

模板初阶 C++

目录 泛型编程 函数模板 概念 格式 原理 函数模板的实例化 类模板 格式 类模板的实例化 泛型编程 当我们要实现一个交换函数&#xff0c;我们可以利用函数重载实现&#xff0c;但是有几个不好的地方 1.函数重载仅仅是类型不同&#xff0c;代码复用率较低&#xff0c;只…

pyorch Hub 系列#4:PGAN — GAN 模型

一、主题描述 2014 年生成对抗网络的诞生及其对任意数据分布进行有效建模的能力席卷了计算机视觉界。两人范例的简单性和推理时令人惊讶的快速样本生成是使 GAN 成为现实世界中实际应用的理想选择的两个主要因素。 然而&#xff0c;在它们出现后的很长一段时间内&#xff0c;GA…

知识蒸馏概述及开源项目推荐

文章目录 1.介绍2.知识2.1 基于响应的知识&#xff08;response-based&#xff09;2.2 基于特征的知识(feature-based)2.3 基于关系的知识(relation-based) 3.蒸馏机制3.1 离线蒸馏3.2 在线蒸馏3.3 自蒸馏 4.教师-学生架构5.蒸馏算法5.1 对抗性蒸馏&#xff08;Adversarial Dis…

Linux基础开发工具之调试器gdb

文章目录 1.编译成的可调试的debug版本1.1gcc test.c -o testdebug -g1.2readelf -S testdebug | grep -i debug 2.调试指令2.0quit退出2.1list/l/l 数字: 显示代码2.2run/r运行2.3断点相关1. break num/b num: 设置2. info b: 查看3. d index: 删除4. n: F10逐过程5. p 变量名…

Python文件、文件夹操作汇总

目录 一、概览 二、文件操作 2.1 文件的打开、关闭 2.2 文件级操作 2.3 文件内容的操作 三、文件夹操作 四、常用技巧 五、常见使用场景 5.1 查找指定类型文件 5.2 查找指定名称的文件 5.3 查找指定名称的文件夹 5.4 指定路径查找包含指定内容的文件 一、概览 ​在…

CSS注入的四种实现方式

目录 CSS注入窃取标签属性数据 简单的一个实验&#xff1a; 解决hidden 方法1&#xff1a;jsnode.js实现 侧信道攻击 方法2&#xff1a;对比波兰研究院的方案 使用兄弟选择器 方法3&#xff1a;jswebsocket实现CSS注入 实验实现&#xff1a; 方法4&#xff1a;window…

【云备份|| 日志 day6】文件业务处理模块

云备份day6 业务处理 业务处理 云备份项目中 &#xff0c;业务处理模块是针对客户端的业务请求进行处理&#xff0c;并最终给与响应。而整个过程中包含以下要实现的功能&#xff1a; 借助网络通信模块httplib库搭建http服务器与客户端进行网络通信针对收到的请求进行对应的业…

算法导论笔记4:散列数 hash

一 了解一些散列的基本概念&#xff0c;仅从文字角度&#xff0c;整理了最基础的定义。 发现一本书&#xff0c;《算法图解》&#xff0c;微信读书APP可读&#xff0c;有图&#xff0c;并且是科普性质的读物&#xff0c;用的比喻很生活化&#xff0c;可以与《算法导论》合并起…

Xshell远程登录 Linux小键盘数字输入变成字母解决办法

Xshell的设置问题&#xff0c;依次查看&#xff1a;文件-->属性-->终端-->VT模式-->初始数字键盘模式更改为&#xff1a;设置普通&#xff08;s&#xff09;

vue-常用指令

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容-常用指令 目录 常用指令 1、v-cloak 2、数据绑定指令 3、v-once 4、v-bind&#xff08;重点&a…

在线制作仿真病历证明软件,易语言实现病例报告生成器,取画板快照+标签+编辑框

闲着无聊用易语言开发了一个病例生成器&#xff0c;当然我加了水印的&#xff0c;这个图片你就算截图你也用不了&#xff0c;模板是从百度图库搜的&#xff0c;很多&#xff0c;我就随便找了一个&#xff0c;然后实现逻辑就是加了一个画板&#xff0c;然后载入了素材图&#xf…

2023-11-12 LeetCode每日一题(Range 模块)

2023-03-29每日一题 一、题目编号 715. Range 模块二、题目链接 点击跳转到题目位置 三、题目描述 Range模块是跟踪数字范围的模块。设计一个数据结构来跟踪表示为 半开区间 的范围并查询它们。 半开区间 [left, right) 表示所有 left < x < right 的实数 x 。 实…

采用示波器显示扭矩传感器模拟信号

扭矩传感器输出的信号波形通常是模拟电压信号&#xff0c;可以通过示波器等仪器进行分析。扭矩传感器的输出信号波形通常有两种类型&#xff1a;正弦波和方波。 应变片传感器扭矩测量采用应变电测技术。在弹性轴上粘贴应变计组成测量电桥&#xff0c;当弹性轴受扭矩产生微小变…

【CASS精品教程】cass3d 11.0加载超大影像、三维模型、点云数据

CAD2016+CASS11.0(内置3d)下载与安装: 【CASS精品教程】CAD2016+CASS11.0安装教程(附CASS11.0安装包下载)https://geostorm.blog.csdn.net/article/details/132392530 一、cass11.0 3d支持的数据 cass11.0中的3d模块增加了多种数据的支持,主要有: 1. 三维模型 点击…

Linux学习教程(第二章 Linux系统安装)3

第二章 Linux系统安装 十一、Linux远程管理协议&#xff08;RFB、RDP、Telnet和SSH&#xff09; 提到远程管理&#xff0c;通常指的是远程管理服务器&#xff0c;而非个人计算机。个人计算机可以随时拿来用&#xff0c;服务器通常放置在机房中&#xff0c;用户无法直接接触到…

画面精美传奇手游幽冥传奇【幽冥灭龙传奇】win服务端+双端+GM授权后台+详细教程

搭建资源下载地址&#xff1a;画面精美传奇手游幽冥传奇幽冥灭龙传奇win服务端双端GM授权后台详细教程-海盗空间

Xilinx FPGA平台DDR3设计详解(一):DDR SDRAM系统框架

DDR SDRAM&#xff08;双倍速率同步动态随机存储器&#xff09;是一种内存技术&#xff0c;它可以在时钟信号的上升沿和下降沿都传输数据&#xff0c;从而提高数据传输的速率。DDR SDRAM已经发展了多代&#xff0c;包括DDR、DDR2、DDR3、DDR4和DDR5&#xff0c;每一代都有不同的…