0基础学习PyFlink——用户自定义函数之UDTAF

大纲

  • UDTAF
  • TableAggregateFunction的实现
    • 累加器
      • 定义
      • 创建
      • 累加
    • 返回
      • 类型
      • 计算
  • 完整代码

在前面几篇文章中,我们分别介绍了UDF、UDTF和UDAF这三种用户自定义函数。本节我们将介绍最后一种函数:UDTAF——用户自定义表值聚合函数。
在这里插入图片描述

UDTAF

UDTAF函数即具备了UDTF的特点,也具备UDAF的特点。即它可以像《0基础学习PyFlink——用户自定义函数之UDTF》介绍的UDTF那样可以返回任意数量的行作为输,又可以像《0基础学习PyFlink——用户自定义函数之UDAF》介绍的UDAF那样通过聚合的数据(多组)计算出一个值
举一个例子:我们拿到一个学生成绩表,每行包括:

  • 学生姓名
  • 英语成绩
  • 数学成绩
  • 年级

现在我们需要把这张表调整为:

  • 学生姓名
  • 成绩
  • 科目
  • 科目年级平均成绩
  • 年级
    在这里插入图片描述
    将一行中的“英语成绩”和“数学成绩”,拆成“成绩”和“科目”,相当于把一行数据拆解成多行,如上图左侧“张三”只有一行,而右侧有两行“张三”信息。这种拆解操作就需要T类型的用户自定义函数,比如UDTF和UDTAF。
    而我们需要计算一个年级一科的平均成绩,比如1年级英语的平均成绩,则需要按年级聚合之后再做计算。这个就需要A类型的用户自定义函数,比如UDAF和UDTAF。
    同时要满足上述两种技术方案的就是UDTAF。我们先看下主体代码,它和《0基础学习PyFlink——用户自定义函数之UDAF》中的很像。但是有两个重要区别:
  • 要设置成in_streaming_mode模式,否则会报错
  • udtaf要修饰一个对象,而非一个方法;
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_streaming_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, 60.0, "1"),
        ("李四", 75.0, 95.0, "1"),
        ("王五", 90.0, 90.0, "2"),
        ("赵六", 85.0, 70.0, "2"),
        ("孙七", 60.0, 0.0, "3"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source)
    
    split_class = udtaf(SplitClass())
    tab_source.group_by(col('grade')) \
        .flat_aggregate(split_class) \
        .select(col('*')) \
        .execute().print()

TableAggregateFunction的实现

用于计算的类要继承于TableAggregateFunction,即UDTAF中的TAF。

class SplitClass(TableAggregateFunction):
    _class_keys = ["english", "math"]

我们需要通过get_result_type告诉框架,UDTAF函数返回的是什么类型的数据。一般我们都是构造一个行类型——ROW,然后定义其每个字段的值和类型:

  • name:string类型,用户姓名;
  • score:float类型,考分;
  • avg score:float类型,科目年级平均分数;
  • class:sting类型,科目名称;

累加器

accumulator(累加器)是用于参与计算的中间数据。比如这个案例中,我们会向让accumulator保存拆解后的数据(即一行拆解成多行后的数据),然后再计算各年级每科的平均成绩。

定义

    def get_accumulator_type(self):
        return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])) 

因为只是为了保存展开的数据,于是我们只用定义均值计算之前的字段:

  • name:string类型,姓名;
  • score:float类型,分数;
  • class:string类型,科目名称;

创建

刚开始时,我们让其是一个空数组,对应上定义中的ARRAY类型。

    def create_accumulator(self):
        return []

累加

我们对科目进行遍历,进行行的拆分。即将(“张三”, 80.0, 60.0, “1”)拆解成(“张三”, 80.0, “english”)和(“张三”, 60.0, “math”)这样的两组数据。

    def accumulate(self, accumulator, row):
        for i in self._class_keys:
            accumulator.append(Row(row["name"], row[i], i))

返回

类型

    def get_result_type(self):
        return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

可以看到result_type(返回类型)和accumulator_type(累加器类型)是不一样的(也可以一样,主要看怎么计算规则)。前者比后者多了“学科年级平均分”(avg score),这就更加接近我们希望获得的最终结果。
这些字段和我们目标字段只差一个grade(年级)。因为原始表中有grade,且我们会通过grade聚类,所以最终我们可以获得这个信息,而不用在这儿定义。
需要注意的是,虽然表值类型函数返回的是一组数据(若干Row),但是这儿只是返回Row的具体定义,而不是ARRAY[Row]。

计算

    def emit_value(self, accumulator):
        rows = []
        for i in self._class_keys: 
            total = 0.0
            student_count = 0
            for y in accumulator:
                # y[2] y[]"class"]
                if i == y[2]:
                    # y[1] y["score"]
                    total = total + y[1]
                    student_count = student_count + 1
            avg_score = total / student_count
            for y in accumulator:
                if i == y[2]:
                    rows.append(Row(y[0], y[1], avg_score, y[2]))
        for x in rows:   
            yield x

这个函数会在最后执行,它会通过累加器中的数据计算“学科年级平均分”,然后构造和“返回类型”一直的Row到rows数组中。最后通过yeild关键字返回一个生成器,我们可以将其看成还是一组Row,即拆解后的结果。

最后我们看下结果

+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| op |                          grade |                           name |                          score |                      avg score |                          class |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
| +I |                              1 |                           张三 |                           80.0 |                           77.5 |                        english |
| +I |                              1 |                           李四 |                           75.0 |                           77.5 |                        english |
| +I |                              1 |                           张三 |                           60.0 |                           77.5 |                           math |
| +I |                              1 |                           李四 |                           95.0 |                           77.5 |                           math |
| +I |                              2 |                           王五 |                           90.0 |                           87.5 |                        english |
| +I |                              2 |                           赵六 |                           85.0 |                           87.5 |                        english |
| +I |                              2 |                           王五 |                           90.0 |                           80.0 |                           math |
| +I |                              2 |                           赵六 |                           70.0 |                           80.0 |                           math |
| +I |                              3 |                           孙七 |                           60.0 |                           60.0 |                        english |
| +I |                              3 |                           孙七 |                            0.0 |                            0.0 |                           math |
+----+--------------------------------+--------------------------------+--------------------------------+--------------------------------+--------------------------------+
10 rows in set

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf,TableAggregateFunction
import pandas as pd
from pyflink.table.udf import UserDefinedFunction
from typing import List

class SplitClass(TableAggregateFunction):
    _class_keys = ["english", "math"]

    def emit_value(self, accumulator):
        rows = []
        for i in self._class_keys: 
            total = 0.0
            student_count = 0
            for y in accumulator:
                if i == y[2]:
                    total = total + y[1]
                    student_count = student_count + 1
            avg_score = total / student_count
            for y in accumulator:
                if i == y[2]:
                    rows.append(Row(y[0], y[1], avg_score, y[2]))
        return rows

    def create_accumulator(self):
        return []

    def accumulate(self, accumulator, row):
        for i in self._class_keys:
            accumulator.append(Row(row["name"], row[i], i))

    def get_accumulator_type(self):
        return DataTypes.ARRAY(DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())]))  

    def get_result_type(self):
        return DataTypes.ROW([DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("score", DataTypes.FLOAT()), DataTypes.FIELD("avg score", DataTypes.FLOAT()), DataTypes.FIELD("class", DataTypes.STRING())])

    
def calc():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_streaming_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('name', DataTypes.STRING()), DataTypes.FIELD('english', DataTypes.FLOAT()), DataTypes.FIELD('math', DataTypes.FLOAT()), DataTypes.FIELD('grade', DataTypes.STRING())])
    students_score = [
        ("张三", 80.0, 60.0, "1"),
        ("李四", 75.0, 95.0, "1"),
        ("王五", 90.0, 90.0, "2"),
        ("赵六", 85.0, 70.0, "2"),
        ("孙七", 60.0, 0.0, "3"),
    ]
    tab_source = t_env.from_elements(students_score, row_type_tab_source)
    
    split_class = udtaf(SplitClass())
    tab_source.group_by(col('grade')) \
        .flat_aggregate(split_class) \
        .select(col('*')) \
        .execute().print()
    
if __name__ == '__main__':
    calc()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/111157.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ATECLOUD如何进行电源模块各项性能指标的测试?

ATECLOUD平台进行电源模块各项性能指标的测试是通过以下步骤实现的: 连接测试设备:将测试设备与云计算服务器连接,实现数据采集和远程控制。测试设备包括示波器、电子负载、电源、万用表等,这些设备通过纳米BOX连接到云测试平台上…

【Java 进阶篇】深入理解 Java Response:从基础到高级

HTTP响应(Response)是Web开发中的一个关键概念,它是服务器向客户端(通常是浏览器)返回数据的方式。理解如何在Java中处理和构建HTTP响应是开发Web应用程序的重要一部分。本文将从基础知识到高级技巧,详细介…

【Linux】虚拟机部署与发布J2EE项目(Linux版本)

🎉🎉欢迎来到我的CSDN主页!🎉🎉 🏅我是Java方文山,一个在CSDN分享笔记的博主。📚📚 🌟推荐给大家我的专栏《微信小程序开发实战》。🎯&#x1f3a…

Linux 命令|服务器相关

1. 在公共 linux 上创建 python 虚拟环境 【精选】在公共Linux服务器上创建自己的python虚拟环境_服务器创建自己的环境-CSDN博客 2. 查看现存的状态,看有没有程序在跑 nvidia-smi命令详解-CSDN博客 3. 上传本地文件到服务器 在本地 Mac 计算机的终端中&#x…

谷歌财报解读:基本盘守成有余,云业务进取不足?

科技巨头的AI之战持续上演,而财报季是一窥AI成色的重要窗口。 谷歌和微软这对在多个领域均正面对决的科技巨头,又在同一日发布了财报,而这次相比上季度,战局似乎迎来了反转。 上季度,谷歌不仅成功抵御了Bing联手ChatG…

HTML基本概念:

HTML简介: 超文本标记语言(英语:HyperText Markup Language,简称:HTML)是一种用于创建网页的标准标记语言。 1)、HTML 是用来描述网页的一种语言。 2)、HTML 不是一种编程语言&am…

STM32:串口轮询模式、中断模式、DMA模式和接收不定长数据

一.串口轮询模式底层机制: 在STM32每个串口的内部都有两个寄存器:发送数据寄存器(TDR)/发送移位寄存器,当我们调用HAL_UART_Transmit 把数据发送出去时,CPU会将数据依次将数据发送到数据寄存器中,移位寄存器中的数据会根据我们设置…

应用于智慧矿山的皮带跑偏视频分析AI算法

一、引言 随着科技的发展,人工智能技术已经在各个领域得到广泛应用。而在智慧矿山领域,皮带跑偏视频分析是其中一个重要的应用方向。本文将详细介绍皮带跑偏视频分析AI算法的原理,以期为智慧矿山的发展提供有益的参考。 二、算法原理 1. 视…

Mac上具好用的屏幕录像工具(Omi录屏专家)Screen Recorder By Omi Mac 下载安装详细教程

Omi 录屏专家 是 Mac 上的一款出色的录音工具,它让您能够在Mac电脑上轻松录制和保存高质量音频。这款应用拥有简单直观的操作界面,无论我们水平如何,都可以轻松捕捉录制卓越的音质和录像视频。 该版本的 Omi 安装后可以直接支持最高 4K 60帧…

Django 尝试SSE报错 AssertionError: Hop-by-hop headers not allowed 的分析

情况描述 近期计划测试一下django对日志打印的支持,一般都是用websocket的方式,想测试一下SSE (Server-sent events)的服务端推送,发现过程中存在报错: Traceback (most recent call last):File "D:\Software\Anaconda3\li…

正点原子嵌入式linux驱动开发——Linux 音频驱动

音频是最常用到的功能,音频也是linux和安卓的重点应用场合。STM32MP1带有SAI接口,正点原子的STM32MP1开发板通过此接口外接了一个CS42L51音频DAC芯片,本章就来学习一下如何使能CS42L51驱动,并且CS42L51通过芯片来完成音乐播放与录…

3.5 队列的表示和操作的实现

思维导图: 3.5.1 队列类型 3.5.1 队列的类型定义 1. 简介 队列是一种特殊的线性表,它的特性是只能在表的一端进行插入操作,而在另一端进行删除操作。通常将允许插入操作的一端称为队尾,允许删除操作的一端称为队头。 2. 抽象…

秒级启动的集成测试框架

本文介绍了一种秒级启动的集成测试框架,使用该框架可以方便的修改和完善测试用例,使得测试用例成为测试过程的产物。 背景 传统的单元测试,测试的范围往往非常有限,常常覆盖的是一些工具类、静态方法或者较为底层纯粹的类实现&…

Shadingsphere proxy 启动报错 Windows

Exception in thread "main" java.lang.NoClassDefFoundError 本来打算在本地电脑测试一下proxy的功能,使用的二进制安装包,没想到怎么都启动不起来,一直报找不到某个类的错误。我还以为是自身的配置有问题,等我copy了…

【MySQL索引与优化篇】索引优化与查询优化

索引优化与查询优化 文章目录 索引优化与查询优化1. 概述2. 索引失效案例3. 关联查询优化3.1 Join语句原理3.2 Simple Nested-Loop Join(简单嵌套循环连接)3.3 Index Nested-Loop Join(索引嵌套循环连接)3.4 Block Nested-Loop Jo…

Pytorch L1,L2正则化

L1正则化和L2正则化是常用的正则化技术,用于在机器学习模型中控制过拟合。它们的主要区别在于正则化项的形式和对模型参数的影响。 L1正则化(Lasso正则化): 正则化项形式:L1正则化使用模型参数的绝对值之和作为正则化…

电源控制系统架构(PCSA)之电源管理软件

下图显示了电源管理软件栈的简化表示。该图说明了OS电源管理框架、具有直接从SCP请求操作功能的组件以及它们与SCP固件之间的关系。 一个重要的方面是,所有硬件电源管理操作都是由SCP代表这些请求者执行的。 这种OS电源管理(OSPM)的简化表示可以分为两部分&#xff…

什么是神经网络,它的原理是啥?(2)

参考:https://www.youtube.com/watch?vmlk0rddP3L4&listPLuhqtP7jdD8CftMk831qdE8BlIteSaNzD 视频3:什么是激活函数?为什么我们需要激活函数?它的类型有哪些? 为什么需要激活函数?如果没有激活函数&…

TDengine 受邀参加 CNCC 2023,大会现场展位前“人山人海”!

10 月 26 日-28 日,2023 年度中国计算机大会(CNCC 2023)在沈阳新世界博览馆成功举办,本届大会以“发展数字基础设施,支撑数字中国建设”作为会议主题,参会规模头一次达到上万人。本届 CNCC 组织了 19 个特邀…

MWeb Pro for Mac:博客生成编辑器,助力你的创作之旅

在当今数字化时代,博客已经成为了许多人记录生活、分享知识和表达观点的重要渠道。而要打造一个专业、美观且易于管理的博客,选择一款强大的博客生成编辑器至关重要。今天,我向大家推荐一款备受好评的Mac软件——MWeb Pro。 MWeb Pro是一款专…