Flink高手之路5-Table API SQL

文章目录

  • Flink 中的Table API & SQL
    • 一、Table API & SQL 介绍
      • 1. 为什么要Table API和SQL
      • 2. Table API & SQL的特点
      • 3. Table API& SQL发展历程
        • 3.1 架构升级
        • 3.2 查询处理器的选择
        • 3.3 了解-Blink planner和Flink Planner具体区别如下:
        • 3.4 了解-Blink planner和Flink Planner具体区别如下:
      • 4. 注意事项
    • 二.、API
    • 三、案例
      • 1. 案例一
        • 1)导入pom依赖
        • 2)新建包和类
        • 3)代码实现
        • 4)运行,查看结果
        • 5)表的合并操作
        • 6)合并结果

Flink 中的Table API & SQL

一、Table API & SQL 介绍

1. 为什么要Table API和SQL

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/overview/

image-20230420163500993

Flink的Table模块包括 Table API 和 SQL:

Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便

SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手

Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。

2. Table API & SQL的特点

Flink之所以选择将 Table API & SQL 作为未来的核心 API,是因为其具有一些非常重要的特点:
在这里插入图片描述

  • 声明式:属于设定式语言,用户只要表达清楚需求即可,不需要了解底层执行;
  • 高性能:可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;
  • 简单易学:易于理解,不同行业和领域的人都懂,学习成本较低;
  • 标准稳定:语义遵循SQL标准,非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;
  • 流批统一:可以做到API层面上流与批的统一,相同的SQL逻辑,既可流模式运行,也可批模式运行,Flink底层Runtime本身就是一个流与批统一的引擎

3. Table API& SQL发展历程

3.1 架构升级

自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。随着版本的不断更新,API 也出现了很多不兼容的地方。

在 Flink 1.9 中,Table 模块迎来了核心架构的升级,引入了阿里巴巴Blink团队贡献的诸多功能

在这里插入图片描述

在Flink 1.9 之前,Flink API 层 一直分为DataStream API 和 DataSet API,Table API & SQL 位于 DataStream API 和 DataSet API 之上。可以看处流处理和批处理有各自独立的api (流处理DataStream,批处理DataSet)。而且有不同的执行计划解析过程,codegen过程也完全不一样,完全没有流批一体的概念,面向用户不太友好。

在Flink1.9之后新的架构中,有两个查询处理器:Flink Query Processor,也称作Old Planner和Blink Query Processor,也称作Blink Planner。为了兼容老版本Table及SQL模块,插件化实现了Planner,Flink原有的Flink Planner不变,后期版本会被移除。新增加了Blink Planner,新的代码及特性会在Blink planner模块上实现。批或者流都是通过解析为Stream Transformation来实现的,不像Flink Planner,批是基于Dataset,流是基于DataStream。

3.2 查询处理器的选择

查询处理器是 Planner 的具体实现,通过parser、optimizer、codegen(代码生成技术)等流程将 Table API & SQL作业转换成 Flink Runtime 可识别的 Transformation DAG,最终由 Flink Runtime 进行作业的调度和执行。

Flink Query Processor查询处理器针对流计算和批处理作业有不同的分支处理,流计算作业底层的 API 是 DataStream API, 批处理作业底层的 API 是 DataSet API。

Blink Query Processor查询处理器则实现流批作业接口的统一,底层的 API 都是Transformation,这就意味着我们和Dataset完全没有关系了。

Flink1.11之后Blink Query Processor查询处理器已经是默认的了:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/

在这里插入图片描述

3.3 了解-Blink planner和Flink Planner具体区别如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

在这里插入图片描述

3.4 了解-Blink planner和Flink Planner具体区别如下:

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/common.html

在这里插入图片描述

4. 注意事项

https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/common/

All Flink Scala APIs are deprecated and will be removed in a future Flink version. You can still build your application in Scala, but you should move to the Java version of either the DataStream and/or Table API.
所有Flink Scala api都已弃用,并将在未来的Flink版本中删除。您仍然可以在Scala中构建应用程序,但是应该使用Java版本的DataStream和/或Table API。

二.、API

  • 创建表
  • 查询表
  • 输出表

三、案例

1. 案例一

将DataStream注册为Table和view并进行SQL统计

1)导入pom依赖

<dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.30</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>1.12.7</version>
        </dependency>
    </dependencies>

注意本地scala版本:2.12为scala版本。

2)新建包和类

在这里插入图片描述

3)代码实现

package cn.edu.hgu.bigdata20.flink.table_demo;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * description:flink的TableAPI和SQL演示
 * author 王
 * date 2023/04/20
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 3.source
        DataStream<Student> StudentA = env.fromCollection(Arrays.asList(
                new Student(101, "张三", "男", 20, "14567457623", "唐山"),
                new Student(102, "李四", "女", 18, "17467457623", "张家口"),
                new Student(103, "王五", "男", 19, "18367457623", "承德")
        ));
        DataStream<Student> StudentB = env.fromCollection(Arrays.asList(
                new Student(201, "赵六", "女", 21, "18867457623", "秦皇岛"),
                new Student(202, "钱七", "男", 19, "17567457623", "邢台"),
                new Student(203, "孙八", "女", 18, "16767457623", "廊坊")
        ));

        // 3.注册表
        // 3.1 转换一个流为一个表
        Table tableA = tEnv.fromDataStream(StudentA, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 3.2 注册一个流为表
        tEnv.createTemporaryView("StudentB", StudentB, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 4.transformation
        Table tableB = tEnv.sqlQuery("select * from StudentB where age >= 19 ");//SQL

        // 5.sink
        // 把表转换为流
        DataStream<Student> resultA = tEnv.toAppendStream(tableA, Student.class);
        DataStream<Student> resultB = tEnv.toAppendStream(tableB, Student.class)
        // tableA.printSchema();
        // tableB.printSchema();
        // resultA.print();
         resultB.print();
        // 6.execute
        env.execute();
    }
}

4)运行,查看结果

在这里插入图片描述

5)表的合并操作

package cn.edu.hgu.bigdata20.flink.table;

import cn.edu.hgu.bigdata20.flink.mysql.Student;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

/**
 * description:flink的TableAPI和SQL演示
 * author 王
 * date 2023/04/20
 */
public class FlinkTableDemo {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.table env
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        // 3.source
        DataStream<Student> StudentA = env.fromCollection(Arrays.asList(
                new Student(101, "张三", "男", 20, "14567457623", "唐山"),
                new Student(102, "李四", "女", 18, "17467457623", "张家口"),
                new Student(103, "王五", "男", 19, "18367457623", "承德")
        ));
        DataStream<Student> StudentB = env.fromCollection(Arrays.asList(
                new Student(201, "赵六", "女", 21, "18867457623", "秦皇岛"),
                new Student(202, "钱七", "男", 19, "17567457623", "邢台"),
                new Student(203, "孙八", "女", 18, "16767457623", "廊坊")
        ));

        // 3.注册表
        // 3.1 转换一个流为一个表
        Table tableA = tEnv.fromDataStream(StudentA, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 3.2 注册一个流为表
        tEnv.createTemporaryView("StudentB", StudentB, $("id"), $("sname"), $("sex"), $("age"), $("phone"), $("address"));
        // 4.transformation
        Table tableB = tEnv.sqlQuery("select * from StudentB where age >= 19 ");//SQL
        // 合并
        Table unionTable = tEnv.sqlQuery("select * from " + tableA + " where age > 19 " +
                "union all " +
                "select * from StudentB where age >= 19");

        // 5.sink
        // 把表转换为流
        DataStream<Student> resultA = tEnv.toAppendStream(tableA, Student.class);
        DataStream<Student> resultB = tEnv.toAppendStream(tableB, Student.class);
        DataStream<Student> resultUnion = tEnv.toAppendStream(unionTable, Student.class);
        // tableA.printSchema();
        // tableB.printSchema();
        // resultA.print();
        // resultB.print();
        resultUnion.print();
        // 6.execute
        env.execute();
    }
}

出现问题:Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered “<”,可能是符号有问题。

6)合并结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/13733.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

基于GPS/北斗卫星技术的无盲区车辆调度系统

基于GPS/北斗卫星技术的无盲区车辆调度系统 现代车辆调度系统是一种集全球卫星定位技术&#xff08;GPS&#xff09;、地理信息技术&#xff08;GIS&#xff09;和现代通信技术于一体的高科技项目。它将移动目标的动态位置&#xff08;经度与纬度&#xff09;、时间和状态等信息…

uni-app入门到实战

&#x1f37f;*★,*:.☆(&#xffe3;▽&#xffe3;)/$:*.★* &#x1f37f; &#x1f35f;欢迎来到前端初见的博文&#xff0c;本文主要讲解uni-app入门到实战&#x1f35f; &#x1f468;‍&#x1f527; 个人主页 : 前端初见 &#x1f95e;喜欢的朋友可以关注一下&#xff…

javassist 字节码处理库

目录 一、快速入门 1.1 创建class文件1.2 ClassPool的相关方法1.3 CtClass的相关方法1.4 CtMethod的相关方法1.5 调用生成的类对象 1.5.1 通过反射调用1.5.2 通过接口调用1.6 修改现有的类对象二、将类冻结三、类搜索路径四、$开头的特殊字符五、ProxyFactory的使用 我们知道J…

Linux I/O复用函数的使用情况和select接口的介绍

I/O 复用使得程序能同时监听多个文件描述符&#xff0c;这对于提高程序的性能至关重要。通常&#xff0c; 网络程序在下列情况下需要使用 I/O 复用技术&#xff1a; 1.TCP服务器同时要处理监听套接字和连接套接字 2.服务器同时要处理TCP请求和UDP请求。 3.程序同时要处理多个套…

直播预告 | 时序数据处理的云端利器:TDengine Cloud 详解与演示

当下&#xff0c;我们正处在一个万物互联的时代&#xff0c;大数据、云原生、AI、5G 等数字技术极大地方便了人们的生活&#xff0c;但智能物联网产生的海量数据却成为众多企业在数据处理上的巨大痛点。从本质来看&#xff0c;这些数据大多是产生自各种设备和传感器的时序数据&…

Spring种存取Bean的5种注解

存取Bean的五种注解 存储Bean对象两种方式1.添加一行bean2.使用注解的方式(5大注解)Controller(控制器存储)Service(服务存储)Repository(仓库存储)Component(组件存储)Configuration(配置存储)方法注解 Bean 获取Bean对象(三种)1.属性注入2.setter注入3.构造方法注入三种注入的…

springboot-分页功能

1.分页功能的作用 分页功能作为各类网站和系统不可或缺的部分&#xff08;例如百度搜索结果的分页等&#xff09; &#xff0c;当一个页面数据量大的时候分页作用就体现出来的&#xff0c;其作用有以下5个。 &#xff08;1&#xff09;减少系统资源的消耗 &#xff08;2&#…

Vue 3组件传值 、组件通信

本文采用<script setup />的写法&#xff0c;比options API更自由。那么我们就来说说以下七种组件通信方式&#xff1a; props emit v-model refs provide/inject eventBus vuex/pinia 举个例子 本文将使用下面的演示&#xff0c;如下图所示&#xff1a; 上图中…

mybatis粗心使用导致内存溢出

现象 服务响应变慢&#xff0c;线程日志也出现Java heap space内存溢出的错误&#xff0c;这个服务属于基础业务服务&#xff0c;出现问题要尽快的排查 分析 因为设置了gc日志和jmap启动相关参数 所以我们进行分析&#xff0c;这里模拟线上环境将堆大小参数调整到了128m&am…

【Linux】权限管理

文章目录 &#x1f4d6; 前言1. 什么是权限2. 权限管理2.1 Linux的用户分类&#xff1a;2.2 Liunx文件的分类&#xff1a;2.3 文件的访问权限2.4 文件访问权限的相关设置方法&#xff1a;chmod对文件权限的修改chown / chgrp 2.5 以八进制修改文件权限&#xff1a;2.6 默认权限…

Springsecurity课程笔记06-13章基于数据库的方法授权

动力节点Springsecurity视频课程 6 密码处理 6.1 为什么要加密&#xff1f; csdn 密码泄露事件 泄露事件经过&#xff1a;https://www.williamlong.info/archives/2933.html 泄露数据分析&#xff1a;https://blog.csdn.net/crazyhacking/article/details/10443849 6.2加密…

IJKPLAYER源码分析-常用API

前言 本文简要介绍IJKPLAYER的几个常用API&#xff0c;以API使用的角度&#xff0c;来审视其内部运作原理。这里以iOS端直播API调用切入。 调用流程 init 创建播放器实例后&#xff0c;会先调用init方法进行初始化&#xff1a; - (IJKFFMediaPlayer *)init {self [super ini…

计算机网络复习题+答案

文章目录 导文题目一、单项选择题二、填空题三、判断改错题,判断下列命题正误,正确的在其题干后的括号内打“√”,错误的打“”,并改正。四、名词解释五、简答题六、应用题导文 计算机网络复习题 题目 一、单项选择题 在应用层协议中,主要用于IP地址自动配置的协议是: (…

文案自动修改软件-文案自动改写的免费软件下载

文章生成器ai写作机器人 随着人工智能技术的飞速发展&#xff0c;越来越多的新型产品被推向市场。其中&#xff0c;文章生成器AI写作机器人是一个备受关注的新兴行业。它使用机器学习和自然语言处理等技术&#xff0c;为用户自动生成高质量的文章和内容&#xff0c;帮助用户在…

Python——第2章 数据类型、运算符与内置函数

目录 1 赋值语句 2 数据类型 2.1 常用内置数据类型 2.1.1 整数、实数、复数 2.1.2 列表、元组、字典、集合 2.1.3 字符串 2.2 运算符与表达式 2.2.1 算术运算符 2.2.2 关系运算符 2.2.3 成员测试运算符 2.2.4 集合运算符 2.2.5 逻辑运算符 2.3 常用内置…

本地搭建属于自己的ChatGPT:基于PyTorch+ChatGLM-6b+Streamlit+QDrant+DuckDuckGo

本地部署chatglm及缓解时效性问题的思路&#xff1a; 模型使用chatglm-6b 4bit&#xff0c;推理使用hugging face&#xff0c;前端应用使用streamlit或者gradio。 微调对显存要求较高&#xff0c;还没试验。可以结合LoRA进行微调。 缓解时效性问题&#xff1a;通过本地数据库…

Mybatis高级映射及延迟加载

准备数据库表&#xff1a;一个班级对应多个学生。班级表&#xff1a;t_clazz&#xff1b;学生表&#xff1a;t_student 创建pojo&#xff1a;Student、Clazz // Student public class Student {private Integer sid;private String sname;//...... }// Clazz public class Cla…

Flutter PC桌面端 控制应用尺寸是否允许放大缩小

一、需求 桌面端中&#xff0c;登录、注册、找回密码页面不允许用户手动放大缩小&#xff0c;主页面允许 二、插件 window_manager 使用教程请参照这篇博客&#xff1a;Flutter桌面端开发——window_manager插件的使用 题外话&#xff1a; 之前使用的是bitsdojo_window插件…

[golang gin框架] 25.Gin 商城项目-配置清除缓存以及前台列表页面数据渲染公共数据

配置清除缓存 当进入前台首页时,会缓存对应的商品相关数据,这时,如果后台修改了商品的相关数据,缓存中的对应数据并没有随之发生改变,这时就需要需改对应的缓存数据,这里有两种方法: 方法一 在管理后台操作直接清除缓存中的所有数据,当再次访问前台首页时,就会先从数据库中获取…

记frp内网穿透配置

这两天由于想给客户看一下我们的系统&#xff0c;于是想到用内网穿透&#xff0c;但是怎么办呢&#xff0c;没有用过呀&#xff0c;于是各处找资料&#xff0c;但是搞完以后已经不记得参考了那些文档了&#xff0c;对不起各位大神&#xff0c;就只能写出过程和要被自己蠢死的错…