0基础学习PyFlink——用户自定义函数之UDF

大纲

  • 标量函数
    • 入参并非表中一行(Row)
    • 入参是表中一行(Row)
    • alias

PyFlink中关于用户定义方法有:

  • UDF:用户自定义函数。
  • UDTF:用户自定义表值函数。
  • UDAF:用户自定义聚合函数。
  • UDTAF:用户自定义表值聚合函数。

这些字母可以拆解如下:

  • UD表示User Defined(用户自定义);
  • F表示Function(方法);
  • T表示Table(表);
  • A表示Aggregate(聚合);
    在这里插入图片描述
    Aggregate(聚合)函数是指:以多行数据为输入,计算出一个新的值的函数。这块我们会在后续的章节介绍,本文我们主要介绍非聚合类型的用户自定义方法的简单使用。

标量函数

即我们常见的UDF。

def udf(f: Union[Callable, ScalarFunction, Type] = None,
        input_types: Union[List[DataType], DataType, str, List[str]] = None,
        result_type: Union[DataType, str] = None,
        deterministic: bool = None, name: str = None, func_type: str = "general",
        udf_type: str = None) -> Union[UserDefinedScalarFunctionWrapper, Callable]:

我们主要关注result_type和input_types,它们分别用于确定函数的输入和输出。
input_types可以是List[DataType], DataType, str, List[str]之一任何一种,这个要视使用者决定。UDTF也是这种类型,它们没啥区别。
result_type只能是DataType或str;而UDTF可以是List[DataType], DataType, str, List[str]任意之一。这也是UDF和UDTF最大的区别。
我们以一个例子来介绍它的用法。这个例子会将大写字符转换成小写字符,然后统计字符出现的次数。
在介绍例子之前,我们先构造Execute之前的准备环境

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source)

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)

这段代码从读取数据word_count_data,并构造出tab_source作为输入数据暂存的表。下面我们看下入参不同时,UDF怎么写

入参并非表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])
    def colFunc(oneCol):
        return Row(oneCol.lower())

input_types我们设置成[DataTypes.STRING()],即该数组中只有一个参数,也表示修饰的方法只有一个参数,类型是String。如果觉得input_types写起来麻烦,这个参数可以不设置。
result_type我们设置为一个DataTypes.ROW([DataTypes.FIELD(“lower_word”, DataTypes.STRING())])。我们可以把它看成是一个新表的结构描述,即一行只有一个字段——lower_word,它的类型也是String。

    tab_lower=tab_source.map(colFunc(col('word')))

map方法中,我们会给UDF修饰的方法传入原始表tab_source每行中的word字段的值。然后构造出一个新的表tab_lower。这个新的表没有word字段,只有UDF中result_type定义的lower_word。

def map(self, func: Union[Expression, UserDefinedScalarFunctionWrapper]) -> 'Table':

后续只要使用这个新表,新字段即可。

    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

完整代码

from pyflink.common import Configuration
from pyflink.table import (EnvironmentSettings, TableEnvironment, Schema)
from pyflink.table.types import DataTypes
from pyflink.table.table_descriptor import TableDescriptor
from pyflink.table.expressions import lit, col
from pyflink.common import Row
from pyflink.table.udf import udf,udtf,udaf,udtaf
import pandas as pd
from pyflink.table.udf import UserDefinedFunction

word_count_data = ["A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "A", "G"]  
    
def word_count():
    config = Configuration()
    # write all the data to one file
    config.set_string('parallelism.default', '1')
    env_settings = EnvironmentSettings \
        .new_instance() \
        .in_batch_mode() \
        .with_configuration(config) \
        .build()
    
    t_env = TableEnvironment.create(env_settings)
    
    row_type_tab_source = DataTypes.ROW([DataTypes.FIELD('word', DataTypes.STRING())])
    tab_source = t_env.from_elements(map(lambda i: Row(i), word_count_data), row_type_tab_source )

    # define the sink schema
    sink_schema = Schema.new_builder() \
        .column("word", DataTypes.STRING().not_null()) \
        .column("count", DataTypes.BIGINT()) \
        .primary_key("word") \
        .build()
        
    # Create a sink descriptor
    sink_descriptor = TableDescriptor.for_connector('print')\
        .schema(sink_schema) \
        .build()
    
    t_env.create_temporary_table("WordsCountTableSink", sink_descriptor)
    
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=[DataTypes.STRING()])
    def colFunc(oneCol):
        return Row(oneCol.lower())
              
    tab_lower=tab_source.map(colFunc(col('word')))   
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

if __name__ == '__main__':
    word_count()

入参是表中一行(Row)

    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("lower_word", DataTypes.STRING())]), input_types=row_type_tab_source)
    def rowFunc(row):
        return Row(row[0].lower())

    tab_lower=tab_source.map(rowFunc) 
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

主要的区别是map方法直接传递udf修饰的方法,而不是直接其调用返回值。input_types是原始表的行结构——RowType,而不是一个参数数组。
map方法给rowFunc传递原始表tab_source的每行数据,然后构造出一个新表tab_lower。新表的字段也在udf的result_type中定义了,它是String类型的lower_word。后面我们对新表就要聚合统计这个新的字段,而不是老表中的字段。

alias

前面两个案例,在定义UDF时,我们严格设置了result_type和input_types。实际input_types可以不用设置,但是result_type必须设置。上面例子中,result_type我们都设置为RowType,即表行的结构。如果觉得这样写很麻烦,可以考虑使用alias来实现。

    @udf(result_type=DataTypes.STRING())
    def colFunc(oneCol):
        return oneCol.lower()
    
    tab_lower=tab_source.map(colFunc(col('word'))).alias('lower_word')
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()
    @udf(result_type=DataTypes.STRING())
    def rowFunc(row):
        return row[0].lower()

    tab_lower=tab_source.map(rowFunc).alias('lower_word')
    tab_lower.group_by(col('lower_word')) \
        .select(col('lower_word'), lit(1).count) \
        .execute_insert("WordsCountTableSink") \
        .wait()

这样我们在定义udf时,只是指定了返回类型是个字符串,也不知道它在新表中叫啥名字(实际叫f0)。但是为了便于后续使用,我们使用alias给它取了一个别名lower_word。这样就可以让其参与后续的计算了。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/105419.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Go学习第十章——文件操作,Json和测试

Go文件操作,Json和测试 1 文件1.1 基本介绍1.2 读取的基本操作1.3 写入的基本操作1.4 使用案例(三个) 2 Go语言的Json使用2.1 序列化案例2.2 反序列化案例 3 单元测试3.1 先看个需求3.2 快速入门3.3 入门总结 1 文件 1.1 基本介绍 文件在程序中是以流的形式来操作…

使用Jenkins触发gitlab的webhook

满足条件: 首先手动构建可以完成构建 例如: 打开项目点击配置 在“Build Triggers”栏勾选,Build when a change is pushed to GitLab. GitLab webhook ;如下 复制URL链接,我的链接是:http://192.168.44…

HarmonyOS鸿蒙原生应用开发设计- 流转图标

HarmonyOS设计文档中,为大家提供了独特的流转图标,开发者可以根据需要直接引用。 开发者直接使用官方提供的流转图标内容,既可以符合HarmonyOS原生应用的开发上架运营规范,又可以防止使用别人的图标侵权意外情况等,减…

【每日一题】切割后面积最大的蛋糕

文章目录 Tag题目来源题目解读解题思路方法一:排序 其他语言python3 写在最后 Tag 【排序】【数组】【2023-10-27】 题目来源 1465. 切割后面积最大的蛋糕 题目解读 切割后面积最大的蛋糕。 解题思路 方法一:排序 本题较为简单,找出最大…

Android加载SO包

一、前言 这几天用Android整合开源的RTMP推拉流都没成功,好几年没玩Android了碰到好多坑,在Android中为了效率难免需要调用C语言编写生成的SO文件,比如图片渲染加速,视频编解码等插件,今天我们就先聊一下在Android中如…

51单片机实验:数码管动态显示00-99

1、实验要求 利用STC89C52RC单片机开发板实现:使用2位数码管循环显示00-99,每次间隔1s,并且当计数到20时,则蜂鸣器鸣响1次。 2、实验分析 程序实现分析: 1、定义数码管位选引脚(P2.4、P2.5、P2.6、…

ES6初步了解迭代器

迭代器是什么? 迭代器(iterator)是一种接口,为各种不同的数据结构提供统一的访问机制。任何数据结构只要部署 iterator 接口,就可以完成遍历操作 ES6创造了一种新的遍历方法for…of循环,iterator 接口主要供 for…of 使用 原生中具…

Android12之#pragma clang diagnostic ignored总结(一百六十八)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 人生格言: 人生…

pip 更换源

方案1 在C盘用户名录下新建pip文件夹,里面包含pip.ini文件 方案2 在C盘用户名目录的AppData的Roaming下新建pip文件夹,里面包含pip.ini文件。 内容为 [global] index-url https://pypi.tuna.tsinghua.edu.cn/simple

Git(四)底层命令:git对象、树对象、提交对象

目录 一、知识回顾1.1 Linux 基础命令1.2 .git 文件夹解析 二、git 对象(数据对象)2.1 hash-object 存储对象2.2 cat-file 查看对象 三、树对象3.1 ls-files 查看暂存区3.2 update-index 创建暂存区3.3 write-tree 生成树对象3.4 更新暂存区,…

ffmpeg的下载和编译(vs2022)

感谢大佬的二创,直接提供了sln编译 ffmpeg二创地址 创建如下目录 build存放代码(build最好改成source,因为作者这么建议,编译完才发现) msvc存放第三方依赖的头文件,这里固定叫msvc,因为大佬的sln里查找的路径是这个,不嫌麻烦也可以自己改 下载代码和编译器 下载源码…

[论文阅读]Point Density-Aware Voxels for LiDAR 3D Object Detection(PDV)

PDV Point Density-Aware Voxels for LiDAR 3D Object Detection 论文网址:PDV 论文代码:PDV 简读论文 摘要 LiDAR 已成为自动驾驶中主要的 3D 目标检测传感器之一。然而,激光雷达的发散点模式随着距离的增加而导致采样点云不均匀&#x…

层次式架构的设计理论与实践

层次式架构的设计理论与实践 层次式架构概述 层次式架构的定义和特性 定义 特性 层次式架构的一般组成(表现层、中间层、数据访问层和数据层) 表现层框架设计 设计模式 MVC MVP MVVM XML技术 UIP设计思想 表现层动态生成设计思想(基于XML界面管理技术) 中间层架构设计 业务…

Vue+ElementUI项目打包部署到Ubuntu服务器中

1、修改config/index.js中的assetsPublicPath: /,修改为assetsPublicPath: ./ assetsPublicPath: ./2、在build/utils.js中增加publicPath: ../../ publicPath: ../../3、打开终端,在根目录下执行npm run build进行打包,打包成功后会生成dist npm run…

cmake练习一

需求: 1、利用CGAL库Boost库,写一个关于CGAL的程序 2、使用cmake构建 1、创建目录结构 src中有一个main.cpp,放的是我们的主程序代码 2、安装CGAL和Boost库 略 3、编写cmakelist.txt cmake_minimum_required(VERSION 3.1.0) project(cg…

规则推理桌游

目录 Eleusis Express 1,规则 2,出牌规则示例 3,中文规则 Eleusis Express 原文:Eleusis Express 1,规则 简单来说就是需要一个主持人想一个出牌规则,其他人通过出牌试探过程推理出这个出牌规则。 …

甘特图组件DHTMLX Gantt用例 - 如何自定义任务、月标记和网格新外观

dhtmlxGantt是用于跨浏览器和跨平台应用程序的功能齐全的Gantt图表。可满足项目管理应用程序的所有需求,是最完善的甘特图图表库。 本文将为大家揭示DHTMLX Gantt自定义的典型用例,包括自定义任务、网格的新外观等,来展示其功能的强大性&…

电力物联网关智能通讯管理机-安科瑞黄安南

众所周知,网关应用于各种行业的终端设备的数据采集与数据分析,然后去实现设备的监测、控制、计算,为系统与设备之间建立通讯联系,达到双向的数据通讯。 网关可以实时监测并及时发现异常数据,同时自身根据用户规则进行…

[AutoSar NVM] 存储服务层(Service)详解

专栏 《深入浅出AutoSAR》。全文 3400 字, 依AutoSAR及公开知识辛苦整理,禁止转载。 接上一讲 [AutoSar NVM] 存储架构 "数据是新时代的石油" -- 克莱门特•M•杜森堡(Clement M. Dusenbury) 我们深入了解下存储服务层 …

国际腾讯云自主拼装直播 URL教程!!!

注意事项 创建转码模板 并与播放域名进行 绑定 后,转码配置后的直播流,需将播放地址的 StreamName 拼接为 StreamName_转码模板名称,更多详情请参见 播放配置。 前提条件 已注册腾讯云账号,并开通 腾讯云直播服务。 已在 域名…