【SQL篇】一、Flink动态表与流的关系以及DDL语法

文章目录

  • 1、启动SQL客户端
  • 2、SQL客户端常用配置
  • 3、动态表和持续查询
  • 4、将流转为动态表
  • 5、用SQL持续查询
  • 6、动态表转为流
  • 7、时间属性
  • 8、DDL-数据库相关
  • 9、DDL-表相关

在这里插入图片描述

1、启动SQL客户端

启动Flink(基于yarn-session模式为例):

/opt/module/flink-1.17.0/bin/yarn-session.sh -d

再启动sql的客户端:

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session

简单看下:

show databases;

2、SQL客户端常用配置

设置结果的显示模式,默认table,还可以设置为tableau、changelog

SET sql-client.execution.result-mode=changelog;

设置执行环境,默认streaming,也可以设置batch

SET execution.runtime-mode=streaming;

设置默认并行度:

SET parallelism.default=1;

设置状态TTL:

SET table.exec.state.ttl=1000;

通过SQL文件初始化,可以发现,exit退出客户端时,刚创建的库表都被清空了,这个SQL初始化文件就是在启动客户端时你想执行的SQL语句

# 创建SQL文件

vim conf/sql-client-init.sql

SET sql-client.execution.result-mode=tableau;
CREATE DATABASE mydatabase;

# 启动时,-i指定SQL文件

/opt/module/flink-1.17.0/bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql

3、动态表和持续查询

和MySQL等关系型表不同的是,无限流下,会有源源不断的数据过来进入表中,即动态表,来一条数据,往表中插入一条数据。对应的,想获取最新结果就要写条SQL去不间断的查询,即持续查询(每次数据到来都会触发查询操作),持续查询的结果也是一个动态表。

关系型表流处理的动态表
处理的数据对象字段元组的有界集合字段元组的无限序列
查询时对数据的访问可以访问到完整的数据输入无法访问到所有数据,必须“持续”等待流式输入
查询终止条件生成固定大小的结果集后终止永不停止, 根据持续收到的数据不断更新查询结果

在这里插入图片描述

如图,持续查询的流程为:

  • 流(stream)被转换为动态表(dynamic table)
  • 对动态表进行持续查询(continuous query),生成新的动态表
  • 生成的动态表被转换成流

如此,就通过执行SQL实现了对数据流的处理。

4、将流转为动态表

把流看作一张表,来一条数据,insert一次,比如有个记录用户点击事件的无限流:

在这里插入图片描述

5、用SQL持续查询

代码中定义一条查询SQL:

Table urlCountTable = tableEnv.sqlQuery("SELECT user, COUNT(url) as cnt FROM EventTable GROUP BY user");

此时,结果表(动态),可能是简单的insert,如Bob这条数据,也可能是对旧数据的更新update,如Alice,这就是更新查询。 此时,结果表转DataStream调用toChangelogStream()方法。
在这里插入图片描述

修改查询SQL,使用TUMBLE加一个开窗,每个窗口触发时,输出结果,此时对结果表就只有insert追加数据,没有update,即追加查询。

在这里插入图片描述

6、动态表转为流

仅追加(Append-only)流

动态表仅仅通过insert来修改,转为流时,对应一个仅追加的流,流中的每条数据,就是动态表的每行数据。

撤回(Retract)流

流中有添加消息add和撤回消息retract两种,对应表中:

  • insert就是add消息
  • delete就是retract
  • update就是被改行的retract+新结果的add

在这里插入图片描述

更新插入(Upsert)流

流中有更新插入消息upsert和删除消息delete两种,对应表中:

  • update和insert是upsert消息
  • delete为delete消息

在这里插入图片描述

最后,注意,在代码里将动态表转换为DataStream时,只支持仅追加(append-only)和撤回(retract)流两种。

7、时间属性

在表中加个时间字段,数据类型为TimeStamp,分为事件时间和处理时间。事件时间通过watermark语句来定义:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts TIMESTAMP(3),
  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
) WITH (
  ...
);

# TIMESTAMP后的3为精确度

以上,ts字段为事件时间属性,且基于ts设置5s的水位线延迟,注意,延迟秒数5必须加单引号。时间戳类型需要转为秒或者毫秒时,可:

# ...
ts BIGINT,
time_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
# ...
3即精确到毫秒

定义处理时间属性用procTime函数:

CREATE TABLE EventTable(
  user STRING,
  url STRING,
  ts AS PROCTIME()
) WITH (
  ...
);

8、DDL-数据库相关

建库:

CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
  [COMMENT database_comment]
  WITH (key1=val1, key2=val2, ...)

查询所有库:

SHOW DATABASES

查当前库:

SHOW CURRENT DATABASE

修改库的某些属性:

ALTER DATABASE [catalog_name.]db_name SET (key1=val1, key2=val2, ...)

删库:

DROP DATABASE [IF EXISTS] [catalog_name.]db_name [ (RESTRICT | CASCADE) ]

注意,

  • RESTRICT:删除非空数据库会触发异常。默认启用
  • CASCADE:删除非空数据库也会删除所有相关的表和函数,慎用

切换当前库:

USE database_name;

9、DDL-表相关

建表:

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  # 字段
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    # 定义watermark
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )   
  # 注释  
  [COMMENT table_comment]
  # 分区
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  # Flink特色
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

关于表中的字段:physical_column就是常规列。metadata_column是元数据列,可访问到数据源本身的一些元数据,必须加METADATA关键字标识,如:读取数据写入Kafka时,Kafka引擎给数据打上的时间戳标记:

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `name` STRING,
  `record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp'  # !!!
) WITH (
  'connector' = 'kafka'
  ...
);

自定义的列名称和 Connector 中定义 metadata 字段的名称一样时,后面的FROM省略:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA  # !!!
) WITH (
'connector' = 'kafka'
...
);

自定义列的数据类型和 Connector 中定义的 metadata 字段的数据类型不一致时,会自动强转,因此这两个类型必须可以强转:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
-- 将时间戳强转为 BIGINT
`timestamp` BIGINT METADATA
) WITH (
'connector' = 'kafka'
...
);

默认metadata_column列可读可写,加VIRTUAL表示只读:

CREATE TABLE MyTable (
  `timestamp` BIGINT METADATA, 
  `offset` BIGINT METADATA VIRTUAL,  # !!!!
  `user_id` BIGINT,
  `name` STRING,
) WITH (
  'connector' = 'kafka'
  ...
);

computed_column即计算列,把几列的计算结果做为新列,这在关系型SQL中一般在查询语句中完成,而不存成一个新列。

CREATE TABLE MyTable (
  `user_id` BIGINT,
  `price` DOUBLE,
  `quantity` DOUBLE,
  `cost` AS price * quanitity   # !!!
) WITH (
  'connector' = 'kafka'
  ...
);

主键的定义,只支持 not enforced:

CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
PARYMARY KEY(user_id) not enforced   # !!!
) WITH (
'connector' = 'kafka'
...
);

with子句,用于指定这个表相关的外部系统的相关配置,如Kafka:

CREATE TABLE KafkaTable (
`user_id` BIGINT,
`name` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

like子句,即在现有表的基础上,创建另一种表:

CREATE TABLE Orders (
    `user` BIGINT,
    product STRING,
    order_time TIMESTAMP(3)
) WITH ( 
    'connector' = 'kafka',
    'scan.startup.mode' = 'earliest-offset'
);

CREATE TABLE Orders_with_watermark (
    -- Add watermark definition
    WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND 
) WITH (
    -- Overwrite the startup-mode
    'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;  # !!!!

举例:新表中的value字段加偏引号是因为value和关键字冲突了

CREATE TABLE test(
    id INT, 
    ts BIGINT, 
    vc INT
) WITH (
'connector' = 'print'
);

CREATE TABLE test1 (
    `value` STRING
)
LIKE test;

create-table-as-select,即CTAS语句,通过查询结果创建表:

CREATE TABLE my_ctas_table
WITH (
    'connector' = 'kafka',
    ...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;

# 注意此时不能自己来定义列

查所有表:

SHOW TABLES [ ( FROM | IN ) [catalog_name.]database_name ] [ [NOT] LIKE <sql_like_pattern> ]

查某张表信息:

{ DESCRIBE | DESC } [catalog_name.][db_name.]table_name

修改表名:

ALTER TABLE [catalog_name.][db_name.]table_name RENAME TO new_table_name

修改表属性:

ALTER TABLE [catalog_name.][db_name.]table_name SET (key1=val1, key2=val2, ...)

删表:

DROP [TEMPORARY] TABLE [IF EXISTS] [catalog_name.][db_name.]table_name

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/118109.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

全自动批量AI改写文章发布软件【软件脚本+技术教程】

项目原理&#xff1a; 利用AI工具将爆款文章改写发布到平台上流量变现,通过播放量赚取收益 软件功能&#xff1a; 1.可以根据你选的文章领域&#xff0c;识别你在网站上抓取的文章链接进来自动洗稿生成过原创的文章&#xff0c;自动配图 2.同时还可以将管理的账号导入进脚本软…

oracle体系结构

oracle数据库主要有三种文件: control,log,dbf文件。 oracle实例是oracle内存和进程的统称。一个数据库一般对应一个实例。 多个实例对用一个数据库就是oracle的rac技术。 这样比如有100个用户连接数据库&#xff0c;可以让50个用户连接实例1&#xff0c; 另外50个用户连接用…

Filter 和 Listener

Filter 表示过滤器。是JavaWeb三大组件&#xff08;Servlet、Filter、Listener&#xff09;之一。 过滤器可以把对资源的请求 拦截 下来。浏览器可以访问服务器上所有的资源&#xff0c;而在访问到这些资源之前可以使用过滤器拦截下来&#xff0c;也就是说在访问资源之前会先经…

Android性能优化--Perfetto用SQL性能分析

Android性能优化–Perfetto用SQL性能分析 文章目录 Android性能优化--Perfetto用SQL性能分析介绍Perfetto SQL 基础使用 Perfetto SQL 进行性能分析总结 本文首发地址 https://blog.csdn.net/CSqingchen/article/details/134167741 最新更新地址 https://gitee.com/chenjim/che…

什么是本地存储的有效期?

前言 本地存储是一种在Web开发中常用的客户端存储数据的方式&#xff0c;它可以让网页应用程序在用户的浏览器中存储和检索数据&#xff0c;而无需依赖服务器来保存信息。本地存储的有效期是指数据存储在用户的设备上可以被访问和保留的时间段。在本地存储中&#xff0c;有两种…

嵌入式发展历史

MPU、MCU、SoC、Application Processors 在一个电子系统中&#xff0c;处理器占据最重要的位置&#xff0c;被称为中央处理器单元&#xff08;CPU&#xff1a;Central Processing Unit&#xff09;。它从IO设备读取数据&#xff0c;处理&#xff0c;然后输出。 CPU的发展历史…

【RabbitMQ】RabbitMQ 消息的可靠性 —— 生产者和消费者消息的确认,消息的持久化以及消费失败的重试机制

文章目录 前言&#xff1a;消息的可靠性问题一、生产者消息的确认1.1 生产者确认机制1.2 实现生产者消息的确认1.3 验证生产者消息的确认 二、消息的持久化2.1 演示消息的丢失2.2 声明持久化的交换机和队列2.3 发送持久化的消息 三、消费者消息的确认3.1 配置消费者消息确认3.2…

系统设计中的缓存技术:完整指南

Image.png 缓存是软件工程中用于提高系统性能和用户体验的基本技术。它通过临时存储频繁访问的数据在缓存中&#xff0c;缓存比数据的原始来源更容易访问。 作为一名软件工程师&#xff0c;了解缓存以及它在不同类型的系统中的工作方式是至关重要的。在本文中&#xff0c;我们将…

基于AOSP源码Android-10.0.0_r41分支编译,framework开发,修改系统默认字体大小

文章目录 基于AOSP源码Android-10.0.0_r41分支编译&#xff0c;framework开发&#xff0c;修改系统默认字体大小 基于AOSP源码Android-10.0.0_r41分支编译&#xff0c;framework开发&#xff0c;修改系统默认字体大小 主要修改一个地方就行 代码源码路径 frameworks/base/co…

旅游业为什么要选择VR全景,VR全景在景区旅游上有哪些应用

引言&#xff1a; VR全景技术的引入为旅游业带来了一场变革。这项先进技术不仅提供了前所未有的互动体验&#xff0c;还为景区旅游文化注入了新的生机。 一&#xff0e;VR全景技术&#xff1a;革新旅游体验 1.什么是VR全景技术&#xff1f; VR全景技术是一种虚拟现实技术&am…

【C++初阶(四)aoto关键字与基于范围的for循环】

本专栏内容为&#xff1a;C学习专栏&#xff0c;分为初阶和进阶两部分。 通过本专栏的深入学习&#xff0c;你可以了解并掌握C。 &#x1f493;博主csdn个人主页&#xff1a;小小unicorn ⏩专栏分类&#xff1a;C &#x1f69a;代码仓库&#xff1a;小小unicorn的代码仓库&…

python实现MC协议(SLMP 3E帧)的TCP服务端(篇二)

python实现MC协议&#xff08;SLMP 3E帧&#xff09;的TCP服务端是一件稍微麻烦点的事情。它不像modbusTCP那样&#xff0c;可以使用现成的pymodbus模块去实现。但是&#xff0c;我们可以根据协议帧进行组包&#xff0c;自己去实现帧的格式&#xff0c;而这一切可以基于socket模…

Kotlin 进阶函数式编程技巧

Kotlin 进阶函数式编程技巧 Kotlin 简介 软件开发环境不断变化&#xff0c;要求开发人员不仅适应&#xff0c;更要进化。Kotlin 以其简洁的语法和强大的功能迅速成为许多人进化过程中的信赖伙伴。虽然 Kotlin 的初始吸引力可能是它的简洁语法和与 Java 的互操作性&#xff0c…

每天一点python——day61

#第61天 #字符串的驻留机制字符串&#xff1a;python中基本数据类型&#xff0c;是一个不可变的序列【目前我们学了两个&#xff1a;元组、字符串】 可以使用单引号&#xff0c;双引号&#xff0c;三引号来定义#定义字符串 apython#用单引号&#xff0c;双引号&#xff0c;三引…

Redis原理到常用语法基础图文讲解

在初期&#xff0c;已经讲述了Redis安装问题。现在正式进入Redis的入门阶段 系统架构的演进 传统单机架构 一台机器运行应用程序、数据库服务器 现在大部分公司的产品都是这种单机架构。因为现在计算机硬件发展速度很快&#xff0c;哪怕只有一台主机&#xff0c;性能也很高…

智慧城市排水系统,管网水位监测仪怎么监测

地下排水管网应用于城市的多个环境之中&#xff0c;比如排放雨水&#xff0c;污水或者是地表水等&#xff0c;总之是在维护城市的安全运行&#xff0c;并且保护城市地下生命线处于正常状态。但是一旦排水系统面对各种极端天气&#xff0c;便有可能会突发安全事故&#xff0c;导…

如何再kali中下载iwebsec靶场

这个靶场有三种搭建方法&#xff1a; 第一种是在线靶场&#xff1a;http://www.iwebsec.com:81/ 第二种是虚拟机版本的&#xff0c;直接下载到本地搭建 官网地址下载&#xff1a;http://www.iwebsec.com/ 而第三种就是利用docker搭建这个靶场&#xff0c;我这里是用kali进行…

部署kubevirt教程

前提条件 已安装&#xff1a;kubernetes集群、kubectl、docker apt install -y qemu-kvm libvirt virt-install bridge-utils 【所有节点全部安装】 virt-host-validate qemu部署kubevirt 下载kubevirt-cr.yaml和kubevirt-operator.yaml 先执行&#xff1a; Kubectl apply …

网络安全(黑客)-0基础小白自学

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟…

Go Gin中间件

Gin是一个用Go语言编写的Web框架&#xff0c;它提供了一种简单的方式来创建HTTP路由和处理HTTP请求。中间件是Gin框架中的一个重要概念&#xff0c;它可以用来处理HTTP请求和响应&#xff0c;或者在处理请求之前和之后执行一些操作。 以下是关于Gin中间件开发的一些基本信息&am…