Flink SQL自定义表值函数(Table Function)

使用场景: 表值函数即 UDTF,⽤于进⼀条数据,出多条数据的场景。

开发流程:

  • 实现 org.apache.flink.table.functions.TableFunction 接⼝
  • 实现⼀个或者多个⾃定义的 eval 函数,名称必须叫做 eval,eval ⽅法签名必须是 public 的
  • eval ⽅法的⼊参是直接体现在 eval 函数签名中,出参是体现在 TableFunction 类的泛型参数 T 中

注意:

eval 是没有返回值的,和标量函数不同,Flink TableFunction 接⼝提供了 collect(T) 来发送输出的数据,如果体现在函数签名上,就成了标量函数,使⽤ collect(T) 能体现出 进⼀条数据 出多条数据。

在 SQL 中是⽤ SQL 中的 LATERAL TABLE() 配合 JOIN 、 LEFT JOIN xxx ON TRUE 使⽤。

开发案例:

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.*;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.*;

/**
 * 输入数据:
 * nc -lk 8888
 * a,bb,cc
 * 
 * 输出结果:
 * 
 * res1=>:5> +I[a,bb,cc, a, 1]
 * res1=>:7> +I[a,bb,cc, cc, 2]
 * res1=>:6> +I[a,bb,cc, bb, 2]
 * res8=>:4> +I[a,bb,cc, a, 1]
 * res8=>:5> +I[a,bb,cc, bb, 2]
 * res8=>:6> +I[a,bb,cc, cc, 2]
 * res4=>:3> +I[a,bb,cc, cc, 2]
 * res4=>:1> +I[a,bb,cc, a, 1]
 * res4=>:2> +I[a,bb,cc, bb, 2]
 * res7=>:8> +I[a,bb,cc, bb, 2]
 * res7=>:1> +I[a,bb,cc, cc, 2]
 * res7=>:7> +I[a,bb,cc, a, 1]
 * res2=>:2> +I[a,bb,cc, cc, 2]
 * res2=>:8> +I[a,bb,cc, a, 1]
 * res2=>:1> +I[a,bb,cc, bb, 2]
 * res6=>:1> +I[a,bb,cc, cc, 2]
 * res6=>:7> +I[a,bb,cc, a, 1]
 * res6=>:8> +I[a,bb,cc, bb, 2]
 * res3=>:6> +I[a,bb,cc, bb, 2]
 * res3=>:7> +I[a,bb,cc, cc, 2]
 * res3=>:5> +I[a,bb,cc, a, 1]
 * res5=>:7> +I[a,bb,cc, bb, 2]
 * res5=>:8> +I[a,bb,cc, cc, 2]
 * res5=>:6> +I[a,bb,cc, a, 1]
 */
public class TableFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .useBlinkPlanner()
                .inStreamingMode()
                .build();

        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

        DataStreamSource<String> source = env.socketTextStream("localhost", 8888);

        Table table = tEnv.fromDataStream(source, "field");

        tEnv.createTemporaryView("SourceTable", table);

        // 在 Table API ⾥可以直接调⽤ UDF
        Table res1 = tEnv.from("SourceTable")
                .joinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));

        Table res2 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .select($("field"), $("word"), $("length"));


        // 在 Table API ⾥重命名 UDF 的结果字段
        Table res3 = tEnv.from("SourceTable")
                .leftOuterJoinLateral(call(SplitFunction.class, $("field")))
                .as("myField", "newWord", "newLength")
                .select($("myField"), $("newWord"), $("newLength"));

        // 注册函数
        tEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

        // 在 Table API ⾥调⽤注册好的 UDF
        Table res4 = tEnv
                .from("SourceTable")
                .joinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));


        Table res5 = tEnv
                .from("SourceTable")
                .leftOuterJoinLateral(call("SplitFunction", $("field")))
                .select($("field"), $("word"), $("length"));

        // 在 SQL ⾥调⽤注册好的 UDF
        Table res6 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable, LATERAL TABLE(SplitFunction(field))");

        Table res7 = tEnv.sqlQuery(
                "SELECT field, word, length " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) ON TRUE");


        // 在 SQL ⾥重命名 UDF 字段
        Table res8 = tEnv.sqlQuery(
                "SELECT field, newWord, newLength " +
                        "FROM SourceTable " +
                        "LEFT JOIN LATERAL TABLE(SplitFunction(field)) AS T(newWord, newLength) ON TRUE");

        tEnv.toDataStream(res1).print("res1=>");
        tEnv.toDataStream(res2).print("res2=>");
        tEnv.toDataStream(res3).print("res3=>");
        tEnv.toDataStream(res4).print("res4=>");
        tEnv.toDataStream(res5).print("res5=>");
        tEnv.toDataStream(res6).print("res6=>");
        tEnv.toDataStream(res7).print("res7=>");
        tEnv.toDataStream(res8).print("res8=>");

        env.execute();
    }

    @FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>"))
    public static class SplitFunction extends TableFunction<Row> {
        public void eval(String str) {
            for (String s : str.split(",")) {
                // 输出结果
                collect(Row.of(s, s.length()));
            }
        }
    }
}

注意: 如果使⽤ Scala 实现函数,不要使⽤ Scala 中 object 实现 UDF,Scala object 是单例的,可能会导致并发问题。

测试结果:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/130332.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Unity 场景优化策略

Unity 场景优化策略 GPU instancing 使用GPU Instancing可以将多个网格相同、材质相同、材质属性可以不同的物体合并为一个批次&#xff0c;从而减少Draw Calls的次数。这可以提高性能和渲染效率。 GPU instancing可用于绘制在场景中多次出现的几何体&#xff0c;例如树木或…

MIT6.5830 Lab1-GoDB实验记录(六)

MIT6.5830 Lab1-GoDB实验记录&#xff08;六&#xff09; – WhiteNights Site 标签&#xff1a;Golang 赛博坐牢之旅第一章第六节&#xff1a;接着上一节&#xff0c;补全heap_page剩下的函数。 开始坐牢 删除tuple 这个看起来…难度还没那么高&#xff0c;写一下试试吧。那…

HTTP和HTTPS详解

一)什么是HTTP协议 1)HTTP协议是倾向于相遇业务层次上面的一种协议&#xff0c;传输层协议主要考虑的是端对端之间的一个传输过程&#xff0c;TCP重点进行关注的是可靠传输&#xff1b;咱们的HTTP/1&#xff0c;HTTP/2是基于TCP的&#xff0c;但是咱们的HTTP/3是基于UDP的&…

QWidget背景图片在Qt Designer 中能显示但运行时不显示的解决方法

目录 1. 现象 2. 解决方法 3. 附录 1. 现象 今天想在QWidget中贴一张png图片作为背景图&#xff0c;在Qt Designer 能显示&#xff0c;但运行时&#xff0c;死活不显示背景图片。样式表设置如下&#xff1a; QWidget {border-image:url(:/untitled2/image/operpanel.png); }…

Linux 多线程控制详解

目录 多线程编临界资源访问 互斥锁 API 简述 初始化互斥量 互斥量加锁/解锁 互斥量加锁(非阻塞方式) 互斥量销毁 程序示例 多线程编执行顺序控制 信号量 API 简述 初始化信号量 信号量 P/V 操作 信号量申请(非阻塞方式) 信号量销毁 程序示例 条件变量 创建和销毁…

Mybatis-plus 内部提供的 ServiceImpl<M extends BaseMapper<T>, T> 学习总结

作用 当集成Mybatis-Plus 后&#xff0c;我们的大部分数据库操作都可以通过 XxxxxMapper &#xff0c;同时 Mybatis-plus 在Mapper 提供基本操作方法的同时&#xff0c;也提供类基础的 serviceImpl 来帮助我们完成一些常见的基本操作。 使用 一般情况下&#xff0c;我们首先…

Ipa Guard使用手册

使用手册 开始使用ipa guard代码混淆界面介绍文件混淆-界面介绍安装和登录Ipa Guard 相关教程 下载安装Ipa Guardipaguard注册和登录 下载安装Ipa Guard 可以前往ipaguard工具官网下载&#xff0c;工具是免费下载&#xff0c;免费体验使用的。下载地址是https://www.ipagua…

MS2358:96KHz、24bit 音频 ADC

MS2358 是带有采样速率 8kHz-96kHz 的立体声音频模数 转换器&#xff0c;适合于面向消费者的专业音频系统。 MS2358 通过使用增强型双位 Δ - ∑ 技术来实现其高精度 的特点。 MS2358 支持单端的模拟输入&#xff0c;所以不需要外部器 件&#xff0c;非常适合用于像 …

ChatGPT 宕机?OpenAI 将中断归咎于 DDoS 攻击

您的 ChatGPT 已关闭吗&#xff1f;您是否遇到 ChatGPT 问题&#xff0c;例如连接问题或遇到“长响应时出现网络错误”&#xff1f;– ChatGPT 遭受了一系列 DDoS 攻击&#xff0c;显然是由匿名苏丹组织策划的。 OpenAI 的 ChatGPT 是一款流行的人工智能聊天机器人&#xff0c;…

标本传送设备物联网应用案例|蓝蜂物联网一体化方案

标本传送设备物联网应用案例 标本传输系统被大量应用到现代医院场景中&#xff0c;系统各个设备的运行情况直接影响到整个医院系统的正常稳定&#xff0c;所以对于标本传输系统的实时监控和及时运维是维持医院稳定和规避风险的重中之重。 针对标本传输系统应用过程中的数据统…

ios安全加固 ios 加固方案

​ 目录 一、iOS加固保护原理 1.字符串混淆 2.类名、方法名混淆 3.程序结构混淆加密 4.反调试、反注入等一些主动保护策略 二 代码混淆步骤 1. 选择要混淆保护的ipa文件 2. 选择要混淆的类名称 3. 选择要混淆保护的函数&#xff0c;方法 4. 配置签名证书 5. 混淆和测…

centos7安装Nexus(Maven私服)与配置使用教程

之前有位大佬问我&#xff0c;他说有个第三方的Jar包&#xff0c;在idea导出库中使用&#xff0c;现在要部署上线测试&#xff0c;要如何导进去打包。 我说&#xff0c;不用那么麻烦&#xff0c;搞个Nexus私服&#xff0c;将Jar上传上去&#xff0c;然后配置Maven的setting文件…

现货黄金靠谱吗

现货黄金是一种相当不错的黄金投资方式&#xff0c;它通过紧密跟踪伦敦市场上黄金现货的价格走势&#xff0c;为投资者提供了捕捉差价的机会&#xff0c;而且在交易平台的帮助下&#xff0c;投资者可以获得很高的资金杠杆&#xff0c;因此即使只是抓住了一点点的行情波动&#…

01_ddim_inversion_CN

DDIM反转 设置 # !pip install -q transformers diffusers accelerateimport torch import requests import torch.nn as nn import torch.nn.functional as F from PIL import Image from io import BytesIO from tqdm.auto import tqdm from matplotlib import pyplot as p…

链表练习题

作者前言 &#x1f382; ✨✨✨✨✨✨&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f367;&#x1f382; ​&#x1f382; 作者介绍&#xff1a; &#x1f382;&#x1f382; &#x1f382; &#x1f389;&#x1f389;&#x1f389…

时间序列预测实战(十一)用SCINet实现滚动预测功能(附代码+数据集+原理介绍)

论文地址->SCINet官方论文地址 官方代码地址-> 官方代码下载地址 个人整理的代码地址->免费分享给大家创作不易请大家给文章点点赞 一、本文介绍 这篇文章给大家带来的是关于SCINet实现时间序列滚动预测功能的讲解&#xff0c;SCINet是样本卷积交换网络的缩写(Sam…

devops完整搭建教程(gitlab、jenkins、harbor、docker)

devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09; 文章目录 devops完整搭建教程&#xff08;gitlab、jenkins、harbor、docker&#xff09;1.简介&#xff1a;2.工作流程&#xff1a;3.优缺点4.环境说明5.部署前准备工作5.1.所有主机永久关闭防火墙…

HTTPS的工作流程

. HTTPS是什么&#xff1f; https是应用层中的一个协议&#xff0c;是在http协议的基础上引入的一个加密层。 为什么需要HTTPS 由于http协议内容都是按照文本的方式明文传输的&#xff0c;这就导致传输过程中会出现一些被篡改的情况。运营商劫持事件最开始百度&#xff0c;…

比较PID控制和神经网络控制在机器人臂上的应用

机器人臂是自动化领域中常见的机器人形式&#xff0c;其精确控制对于实现复杂任务具有重要意义。在机器人臂的控制中&#xff0c;PID控制和神经网络控制是两种常用的控制方法。本文将比较PID控制和神经网络控制在机器人臂控制方面的应用&#xff0c;包括控制原理、优缺点以及在…

【广州华锐互动】太空探索VR模拟仿真教学系统

随着科技的不断发展&#xff0c;人类对宇宙的探索欲望愈发强烈。火星作为距离地球最近的行星之一&#xff0c;自然成为了人类关注的焦点。近年来&#xff0c;火星探测取得了一系列重要成果&#xff0c;为人类了解火星提供了宝贵的信息。然而&#xff0c;实地考察火星仍然面临着…