flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数

flinksql 流表转换, 自定义udf/udtf

  • 1、标量函数
  • 2、表函数
  • 3、聚合函数
  • 4、表聚合函数

表变流
在这里插入图片描述
在这里插入图片描述

1、在大多数情况下,用户定义的函数必须先注册,然后才能在查询中使用。不需要专门为 Scala 的 Table API 注册函数。

2、函数通过调用 registerFunction()方法在 TableEnvironment 中注册。当用户定义的函数 被注册时,它被插入到 TableEnvironment 的函数目录中,
这样 Table API 或 SQL 解析器就可 以识别并正确地解释它

函数总结,函数总分为四大类
在这里插入图片描述

1、标量函数

用户定义的标量函数,可以将 0、1 或多个标量值,映射到新的标量值。
为了定义标量函数,必须在 org.apache.flink.table.functions 中扩展基类 Scalar Function, 并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定, 求值方法必须公开声明并命名为 eval(直接 def 声明,没有 override)。求值方法的参数类型 和返回类型,确定了标量函数的参数和返回类型。

package table.tableUdf

import com.yangwj.api.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{EnvironmentSettings, Table, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.functions.ScalarFunction
import org.apache.flink.types.Row

/**
 * @author yangwj
 * @date 2021/1/15 23:40
 * @version 1.0
 */
object ScalarFunctionTest {
  def main(args: Array[String]): Unit = {
    //1、创建表执行环境、就得使用流式环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env,settings)

    //2、连接外部系统,读取数据,注册表
    //2.1读取文件
    val inputFile:String = "G:\\Java\\Flink\\guigu\\flink\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(inputFile)
    val dataStream: DataStream[SensorReading] = inputStream.map(data => {
      val arr: Array[String] = data.split(",")
      SensorReading(arr(0), arr(1).toLong, arr(2).toDouble)
    }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
      override def extractTimestamp(t: SensorReading): Long = t.timestamp * 1000L
    })
    //tp.proctime 处理时间,注意,使用表达式,一定要引用隐式转换,否则无法使用
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id,  'temperature, 'timestamp.rowtime as 'ts)

    //调用自定义hash函数,对id进行hash运算
    //1、table api
    //首先new一个UDF的实例
    val hashCode = new HashCode(23)
    val apiResult: Table = sensorTable
      .select('id, 'ts, hashCode('id))


    //2、sql调用
    //需要在环境注册UDF
    tableEnv.createTemporaryView("sensor",sensorTable)
    tableEnv.registerFunction("hashCode",hashCode)
    val udfResult: Table = tableEnv.sqlQuery(
      """
        |select id,ts,hashCode(id)
        |from sensor
      """.stripMargin)

    apiResult.toAppendStream[Row].print("apiResult")
    udfResult.toAppendStream[Row].print("udfResult")
    env.execute("udf test")
  }
}

//自定义标量函数
class  HashCode(factor:Int) extends ScalarFunction{
    //必须叫 eval
    def  eval(s:String): Int ={
          s.hashCode * factor - 10000
    }
}

2、表函数

  • 1、与用户定义的标量函数类似,用户定义的表函数,可以将 0、1 或多个标量值作为输入 参数;与标量函数不同的是,它可以返回任意数量的行作为输出,而不是单个值。
    、为了定义一个表函数,必须扩展 org.apache.flink.table.functions 中的基类 TableFunction 并实现(一个或多个)求值方法。表函数的行为由其求值方法决定,求值方法必须是 public 的,并命名为 eval。  求值方法的参数类型,决定表函数的所有有效参数。
  • 2、返回表的类型由 TableFunction 的泛型类型确定。求值方法使用 protected collect(T)方 法发出输出行。
  • 3、在 Table API 中,Table 函数需要与.joinLateral 或.leftOuterJoinLateral 一起使用。
  • 4、joinLateral 算子,会将外部表中的每一行,与表函数(TableFunction,算子的参数是它 的表达式)计算得到的所有行连接起来。
  • 5、而 leftOuterJoinLateral 算子,则是左外连接,它同样会将外部表中的每一行与表函数计 算生成的所有行连接起来;并且,对于表函数返回的是空表的外部行,也要保留下来。
  • 6、在 SQL 中,则需要使用 Lateral Table(),或者带有 ON TRUE 条件的左连接
package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.TableFunction
import org.apache.flink.types.Row

/**
 * @program: demo
 * @description: 表函数:一行对应多行(表)数据输出
 * @author: yang
 * @create: 2021-01-16 16:07
 */
object tableFunc {
  def main(args: Array[String]): Unit = {
    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    //3、table api
    val split = new Split("_")     // new一个UDF实例
    val sensorTable: Table = tableEnv.from("sensorTable")
    val resutTable = sensorTable.joinLateral(split('id) as ('word,'length))
      .select('id,'ts,'word,'length)

    //4、sql 实现
    tableEnv.registerFunction("split",split)
    val sqlResult: Table = tableEnv.sqlQuery(
      """
        |select id ,ts ,word ,length
        |from sensorTable,
        |lateral table ( split(id) ) as splitid(word,length) # splitid 为 split和字段的id的组合
      """.stripMargin)

    resutTable.toAppendStream[(Row)].print("api")

    sqlResult.toAppendStream[(Row)].print("sql")

    env.execute("table function")

  }
}

//输出类型(String,Int)
class Split(separator:String) extends TableFunction[(String,Int)]{
  def eval(str:String): Unit ={
    str.split(separator).foreach(
      word => collect((word,word.length))
    )
  }
}

3、聚合函数

  • 1、用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的 数据,聚合成一个标量值。用户定义的聚合函数,是通过继承 AggregateFunction 抽象类实 现的

  • 2、AggregateFunction 的工作原理如下:
    首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过 调用 AggregateFunction 的 createAccumulator()方法创建空累加器。
    随后,对每个输入行调用函数的 accumulate()方法来更新累加器。
    处理完所有行后,将调用函数的 getValue()方法来计算并返回最终结果。

  • 3、AggregationFunction 要求必须实现的方法:createAccumulator() 、accumulate()、 getValue()

  • 4、除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询 更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口
    (session group window)的上下文中,则 merge()方法是必需。 retract() merge() resetAccumulator()

package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.AggregateFunction
import org.apache.flink.types.Row

/**
 * @program: demo
 * @description: 聚合函数:多行数据聚合输出一行数据
 * @author: yang
 * @create: 2021-01-16 16:41
 */
object aggFunc {

  def main(args: Array[String]): Unit = {

    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    val sensorTable: Table = tableEnv.from("sensorTable")
    //table api
    val aggTemp = new AggTemp()
    val apiResult: Table = sensorTable.groupBy('id).aggregate(aggTemp('temperature) as 'vagTemp).select('id, 'vagTemp)

    //sql 实现
    tableEnv.registerFunction("avgTemp",aggTemp)
    val sqlResult: Table = tableEnv.sqlQuery(
      """
        |select id,avgTemp(temperature)
        |from sensorTable
        |group by id
      """.stripMargin)

    apiResult.toRetractStream[Row].print("apiResult")
    sqlResult.toRetractStream[Row].print("sqlResult")

    env.execute("agg Func")

  }

}

//定义一个类,专门用于聚合的状态
class AvgTempAcc{
  var sum :Double = 0.0
  var count:Int = 0

}

//自定义一个聚合函数,求每个传感器的平均温度值,保存状态(tempSum,tempCount)
class AggTemp extends AggregateFunction[Double,AvgTempAcc]{

  //处理计算函数
  def accumulate(accumulator:AvgTempAcc,temp:Double): Unit ={
      accumulator.sum += temp
      accumulator.count += 1
  }

  //计算函数
  override def getValue(accumulator: AvgTempAcc): Double = accumulator.sum / accumulator.count

  //初始化函数
  override def createAccumulator(): AvgTempAcc = new AvgTempAcc
}

4、表聚合函数

  • 1、用户定义的表聚合函数(User-Defined Table Aggregate Functions,UDTAGGs),可以把一 个表中数据,聚合为具有多行和多列的结果表。
    这跟 AggregateFunction 非常类似,只是之 前聚合结果是一个标量值,现在变成了一张表。用户定义的表聚合函数,是通过继承 TableAggregateFunction 抽象类来实现的。

  • 2、TableAggregateFunction 的工作原理如下:
    首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。 通过调用 TableAggregateFunction的createAccumulator()方法可以创建空累加器。
    随后,对每个输入行调用函数的 accumulate()方法来更新累加器。
    处理完所有行后,将调用函数的 emitValue()方法来计算并返回最终结果。

  • 3、AggregationFunction 要求必须实现的方法: createAccumulator() 、accumulate()
    除了上述方法之外,还有一些可选择实现的方法:retract()、 merge() 、resetAccumulator()、 emitValue()、emitUpdateWithRetract()

package guigu.table.udf

import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, FlatAggregateTable, Table}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Schema}
import org.apache.flink.table.functions.TableAggregateFunction
import org.apache.flink.types.Row
import org.apache.flink.util.Collector

/**
 * @program: demo
 * @description: 多行数据聚合输出多行数据
 * @author: yang
 * @create: 2021-01-16 18:48
 */
object tableAggFunc {
  def main(args: Array[String]): Unit = {
    //1、基于流执行环境创建table执行环境
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

    //2、读取文件,注册表视图
    tableEnv.connect(new FileSystem().path("E:\\java\\demo\\src\\main\\resources\\file\\data5.csv"))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("ts",DataTypes.BIGINT())
        .field("temperature",DataTypes.DOUBLE()))
      .createTemporaryTable("sensorTable")

    val sensorTable: Table = tableEnv.from("sensorTable")
    //table api
    val top2Temp = new Top2Temp()

    val resultTable: Table = sensorTable.groupBy('id).flatAggregate(top2Temp('temperature) as('temp, 'rank))
        .select('id,'temp,'rank)

    resultTable.toRetractStream[Row].print("flat agg")

    env.execute(" table agg func")
  }
}

//定义一个类,表示表聚合函数的状态
class  Top2TempAcc{
    var highestTemp:Double = Double.MinValue
    var secondHighestTemp:Double = Double.MinValue

}

//自定义表聚合函数,提取所有温度值中最高的两个温度,输出(temp,rank)
class Top2Temp extends TableAggregateFunction[(Double,Int),Top2TempAcc]{
  //初始化函数
  override def createAccumulator(): Top2TempAcc = new Top2TempAcc()

  //实现计算聚合结果的函数accumulate
  //注意:方法名称必须叫accumulate
  def accumulate(acc:Top2TempAcc,temp:Double): Unit ={
    //判断当前温度值是否比状态值大
    if(temp > acc.highestTemp){
      //如果比最高温度还高,排在第一,原来的顺到第二位
      acc.secondHighestTemp = acc.highestTemp
      acc.highestTemp = temp
    }else if(temp > acc.secondHighestTemp){
      //如果在最高和第二高之间,那么直接替换第二高温度
      acc.secondHighestTemp = temp
    }
  }

  //实现一个输出结果的方法,最终处理完表中所有的数据时调用
  //注意:方法名称必须叫emitValue
  def emitValue(acc:Top2TempAcc,out:Collector[(Double,Int)]): Unit ={
    out.collect((acc.highestTemp,1))
    out.collect((acc.secondHighestTemp,2))
  }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404557.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

yolov9目标检测报错AttributeError: ‘list‘ object has no attribute ‘device‘

最近微智启软件工作室在运行yolov9目标检测的detect.py测试代码时,报错: File “G:\down\yolov9-main\yolov9-main\detect.py”, line 102, in run pred non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_detmax_det) Fil…

Python urllib、requests、HTMLParser

HTTP协议 HTTP 协议:一般指HTTP(超文本传输)协议。 HTTP是为Web浏览器和Web服务器之间的通信而设计的,基于TCP/IP通信协议嘞传递数据。 HTTP消息结构 客户端请求消息 客户端发送一个HTTP请求到服务器的请求消息包括以下格式 请求行(request line)请求…

排序算法之——归并排序

归并排序 1. 基本思想2. 数据的分解3. 数据的合并4.归并排序的实现4.1 递归实现4.1.1 一个易错点4.1.2 运行结果 4.2 非递归实现4.2.1 图示思路4.2.2 代码实现4.2.3 一个易错点4.2.4 修改后的代码4.2.5 运行结果 6. 时间复杂度7. 空间复杂度8. 稳定性9. 动图演示 1. 基本思想 …

h-table(表格列表组件的全封装)

文章目录 概要h-table的封装过程查询组件封装 h-highForm结果页右侧工具栏封装RightToolbar结果页列表组件h-table结果页vue页面使用js文件有需要的请私信博主,还请麻烦给个关注,博主不定期更新组件封装,或许能够有所帮助!&#x…

如何用GPT进行成像光谱遥感数据处理?

第一:遥感科学 从摄影侦察到卫星图像 遥感的基本原理 遥感的典型应用 第二:ChatGPT ChatGPT可以做什么? ChatGPT演示使用 ChatGPT的未来 第三:prompt 提示词 Prompt技巧(大几岁) 最好的原则和策…

互动游戏团队如何将性能体验优化做到TOP级别

一、背景 随着互动游戏业务 DAU 量级增加,性能和体验重要性也越发重要,好的性能和体验不仅可以增加用户使用体感,也可以增加用户对于互动游戏的使用粘性。 对现状分析,主要存在首屏渲染速度慢、打开页面存在白屏、页面加载过多资…

app测试必掌握的核心测试:UI、功能测试!

一、UI测试 UI即User Interface (用户界面)的简称。UI 设计则是指对软件的人机交互、操作逻辑、界面美观的整体设计。好的UI设计不仅是让软件变得有个性有品味,还要让软件的操作变得舒适、简单、自由、充分体现软件的定位和特点。手机APP从启动界面开始, 到运行过程,直至退出,…

聊聊mysql的七种日志

进入正题前,可以先简单介绍一下,MySQL的逻辑架构, MySQL的逻辑架构大致可以分为三层: 第一层:处理客户端连接、授权认证,安全校验等。第二层:服务器 server 层,负责对SQL解释、分析、优化、执行操作引擎等。第三层:存储引擎,负责MySQL中数据的存储和提取。我们要知道…

云图极速版限时免费活动

产品介绍 云图极速版是针对拥有攻击面管理需求的用户打造的 SaaS 应用,致力于协助用户发现并管理互联网资产攻击面。 实战数据 (2023.11.6 - 2024.2.23) 云图极速版上线 3 个月以来,接入用户 3,563 家,扫描主体 19,961 个,累计发…

OpenCV笔记4:级联分类器实现嘴部检测

OpenCV 嘴部检测 """ 嘴部区域检测 1. 静态图像检测嘴部区域创建分类器加载特征文件检测图像绘制嘴部区域显示 2. 切换为摄像头 """ import cv2 import numpy as npclass FaceDetect:def __init__(self):# 级联分类器# 创建级联分类器&#xf…

云原生之容器管理工具Portainer

1. 简介 前面文章我们讲Docker、Docker Compose和Docker Swarm都是在Linux系统上手工命令行去操作,在第一次安装的时候可以命令行,以后运维和CICD流程操作中,如果还要命令行去各个节点操作,操作就麻烦了,工作效…

Seata 入门知识

目录 概述 工作流程 工作模式 AT模式 TCC模式 概述 Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。 AT模式是阿里首推…

Linux系统运维:离线安装sar-性能监视和分析工具

目 录 一、前言 二、系统环境 三、安装sar (一)准备工作 1、下载 sar 工具的安装包: 2、将安装包传输到 CentOS 服务器 (二)安装工作 1、解压 2、配置安装 3、编译 4、安装 (三&#xff0…

C# Onnx 使用onnxruntime部署实时视频帧插值

目录 介绍 效果 模型信息 项目 代码 下载 C# Onnx 使用onnxruntime部署实时视频帧插值 介绍 github地址:https://github.com/google-research/frame-interpolation FILM: Frame Interpolation for Large Motion, In ECCV 2022. The official Tensorflow 2…

【Flink集群RPC通讯机制(四)】集群组件(tm、jm与rm)之间的RPC通信

文章目录 1. 集群内部通讯方法概述2. TaskManager向ResourceManager注册RPC服务3. JobMaster向ResourceManager申请Slot计算资源 现在我们已经知道Flink中RPC通信框架的底层设计与实现,接下来通过具体的实例了解集群运行时中组件如何基于RPC通信框架构建相互之间的调…

大数据 - Spark系列《十一》- Spark累加器详解

Spark系列文章: 大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客 大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客 大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客 大数据 - Spark系列《…

2024/02/23

使用消息队列完成两个进程间相互通信 A.c #include<myhead.h> struct msgbuf {long mtype;char mtext[1024]; }; //定义表示正文内容大小的宏 #define MSGSIZE sizeof(struct msgbuf)-sizeof(long)int main(int argc, const char *argv[]) {//创建一个key值key_t key;ke…

知乎66条高赞回答,句句醍醐灌顶!

-01- 穷人是小心翼翼地大方&#xff0c; 有钱人是大大方方地小气。 ——论如何判断一个人是真有钱还是装有钱 -02- 枕头要常晒&#xff0c; 因为里面装满了心酸的泪和发霉的梦。 ——一切终将随风而逝 -03- 人活得累&#xff0c;一是太认真&#xff0c;二是太想要。 …

第3部分 原理篇2去中心化数字身份标识符(DID)(3)

3.2.2.4. DID文档 (DID Document) 本聪老师&#xff1a;DID标识符和DID URL还都只是ID&#xff0c;必须为它附加一个基本属性才可以证明是该主体独有的。这个就是我们下面介绍的DID文档。 本聪老师&#xff1a;每个DID标识符都唯一对应一个DID文档&#xff0c;也可以说&#x…

计算机功能简介:EC, NVMe, SCSI/ISCSI与块存储接口 RBD,NUMA

一 EC是指Embedded Controller 主要应用于移动计算机系统和嵌入式计算机系统中&#xff0c;为此类计算机提供系统管理功能。EC的主要功能是控制计算机主板上电时序、管理电池充电和放电&#xff0c;提供键盘矩阵接口、智能风扇接口、串口、GPIO、PS/2等常规IO功能&#xff0c;…