Flink SQL -- 命令行的使用

1、启动Flink SQL
首先启动Flink的集群,选择独立集群模式或者是session的模式。此处选择是时session的模式:

yarn-session.sh -d 

在启动Flink SQL的client:
sql-client.sh
2、kafka SQL 连接器
在使用kafka作为数据源的时候需要上传jar包到flnik的lib下:

/usr/local/soft/flink-1.15.2/lib

可以去官网找对应的版本下载上传。

 

1、创建表:

再流上定义表
再flink中创建表相当于创建一个视图(视图中不存数据,只有查询视图时才会去原表中读取数据)


CREATE TABLE students (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING    
) WITH (
  'connector' = 'kafka',
  'topic' = 'student',
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'csv'
)


2、查询数据(连续查询):

select clazz,count(1) as c from students group by clazz;


3、客户端为维护和可视化结果提供了三种的模式:

        1、表格模式(默认使用的模式),(table mode),在内存中实体化结果,并将结果用规则的分页表格可视化展示出来

SET 'sql-client.execution.result-mode' = 'table';

        2、变更日志模式,(changelog mode),不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET 'sql-client.execution.result-mode' = 'changelog';

        3、Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET 'sql-client.execution.result-mode' = 'tableau';

4、 Flink SQL流批一体:
        1、流处理:

                a、流处理即可以处理有界流也可以处理无界流

                b、流处理的输出的结果是连续的结果

                c、流处理的底层是持续流的模型,上游的Task和下游的Task同时启动等待数据的到达

SET 'execution.runtime-mode' = 'streaming'; 
        2、批处理:

                a、批处理只能用于处理有界流

                b、输出的是最终的结果

                c、批处理的底层是MapReduce模型,会先执行上游的Task,在执行下游的Task 

SET 'execution.runtime-mode' = 'batch';
Flink做批处理,读取一个文件:

-- 创建一个有界流的表
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/spark/stu/students.txt',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);


select clazz,count(1) as c from 
students_hdfs
group by clazz
5、Flink SQL的连接器:
        1、kafka SQL 连接器

对于一些参数需要从官网进行了解。

                1、kafka source 

-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

                2、kafka sink 

-- 创建kafka 表
CREATE TABLE students_kafka_sink (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students_sink', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 将查询结果保存到kafka中
insert into students_kafka_sink
select * from students_hdfs;

kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic students_sink

        3、将更新的流写入到kafka中 

因为在Kafka是一个消息队列,是不会去重的。所以只需要将读取数据的格式改成canal-json。当数据被读取回来还是原来的流模式。

CREATE TABLE clazz_num_kafka (
    clazz STRING,
    num BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'clazz_num', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'canal-json' -- 读取数据的格式
);

-- 将更新的数据写入kafka需要使用canal-json格式,数据中会带上操作类型
{"data":[{"clazz":"文科一班","num":71}],"type":"INSERT"}
{"data":[{"clazz":"理科三班","num":67}],"type":"DELETE"}


insert into clazz_num_kafka
select clazz,count(1) as num from 
students
group by clazz;


kafka-console-consumer.sh --bootstrap-server  master:9092,node1:9092,node2:9092 --from-beginning --topic clazz_num
        2、 hdfs SQL 连接器

                1、hdfs source

                        Flink读取文件可以使用有界流的方式,也可以是无界流方式。

-- 有界流
CREATE TABLE students_hdfs_batch (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

select * from students_hdfs_batch;

-- 无界流
-- 基于hdfs做流处理,读取数据是以文件为单位,延迟比kafka大
CREATE TABLE students_hdfs_stream (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv' ,                    -- 必选:文件系统连接器指定 format
    'source.monitor-interval' = '5000' -- 每隔一段时间扫描目录,生成一个无界流
);


select * from students_hdfs_stream;

                2、hdfs sink

-- 1、批处理模式(使用方式和底层原理和hive类似)
SET 'execution.runtime-mode' = 'batch';

-- 创建表
CREATE TABLE clazz_num_hdfs (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num',  -- 必选:指定路径
  'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);
-- 将查询结果保存到表中
insert into clazz_num_hdfs
select clazz,count(1) as num
from students_hdfs_batch
group by clazz;


-- 2、流处理模式
SET 'execution.runtime-mode' = 'streaming'; 

-- 创建表,如果查询数据返回的十更新更改的流需要使用canal-json格式
CREATE TABLE clazz_num_hdfs_canal_json (
    clazz STRING,
    num BIGINT
)WITH (
  'connector' = 'filesystem',           -- 必选:指定连接器类型
  'path' = 'hdfs://master:9000/data/clazz_num_canal_json',  -- 必选:指定路径
  'format' = 'canal-json'                     -- 必选:文件系统连接器指定 format
);

insert into clazz_num_hdfs_canal_json
select clazz,count(1) as num
from students_hdfs_stream
group by clazz;
3、MySQL SQL 连接器

        1、整合:

# 1、上传依赖包到flink 的lib目录下/usr/local/soft/flink-1.15.2/lib
flink-connector-jdbc-1.15.2.jar
mysql-connector-java-5.1.49.jar

# 2、需要重启flink集群
yarn application -kill [appid]
yarn-session.sh -d

# 3、重新进入sql命令行
sql-client.sh

         2、mysql   source 

-- 有界流
-- flink中表的字段类型和字段名需要和mysql保持一致
CREATE TABLE students_jdbc (
    id BIGINT,
    name STRING,
    age BIGINT,
    gender STRING,
    clazz STRING,
    PRIMARY KEY (id) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'students',
    'username' ='root',
    'password' ='123456'
);

select * from students_jdbc;

        3、mysql sink 

-- 创建kafka 表
CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'earliest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 创建mysql sink表
CREATE TABLE clazz_num_mysql (
    clazz STRING,
    num BIGINT,
    PRIMARY KEY (clazz) NOT ENFORCED -- 主键
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://master:3306/student',
    'table-name' = 'clazz_num',
    'username' ='root',
    'password' ='123456'
);

--- 再mysql创建接收表
CREATE TABLE clazz_num (
    clazz varchar(10),
    num BIGINT,
    PRIMARY KEY (clazz) -- 主键
) ;

-- 将sql查询结果实时写入mysql
-- 将更新更改的流写入mysql,flink会自动按照主键更新数据
insert into clazz_num_mysql
select 
clazz,
count(1) as num from 
students_kafka
group by clazz;

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic students 插入一条数据
        4、DataGen:用于生成随机数据,一般用在高性能测试上
-- 创建包(只能用于source表)
CREATE TABLE students_datagen (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
    'connector' = 'datagen',
    'rows-per-second'='5', -- 每秒随机生成的数据量
    'fields.age.min'='1',
    'fields.age.max'='100',
    'fields.sid.length'='10',
    'fields.name.length'='2',
    'fields.sex.length'='1',
    'fields.clazz.length'='4'
);

        5、print:用于高性能测试 只能用于sink表
CREATE TABLE print_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
     'connector' = 'print'
);

insert into print_table
select * from students_datagen;



结果需要在提交的任务中查看。
        6、BlackHole :是用于高性能测试使用,在后面可以用于Flink的反压的测试。
CREATE TABLE blackhole_table (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'blackhole'
);

insert into blackhole_table
select * from students_datagen;
6、SQL 语法
        1、Hints:

               用于提示执行,在Flink中可以动态的修改表中的属性,在Spark中可以用于广播。在修改动态表中属性后,不需要在重新建表,就可以读取修改后的需求。

CREATE TABLE students_kafka (
    `offset` BIGINT METADATA VIRTUAL, -- 偏移量
    `event_time` TIMESTAMP(3) METADATA FROM 'timestamp', --数据进入kafka的时间,可以当作事件时间使用
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'students', -- 数据的topic
  'properties.bootstrap.servers' = 'master:9092,node1:9092,node2:9092', -- broker 列表
  'properties.group.id' = 'testGroup', -- 消费者组
  'scan.startup.mode' = 'latest-offset', -- 读取数据的位置earliest-offset latest-offset
  'format' = 'csv' -- 读取数据的格式
);

-- 动态修改表属性,可以在查询数据时修改读取kafka数据的位置,不需要重新创建表
select * from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;


-- 有界流
CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;
         2、WITH:

                当一段SQL语句在被多次使用的时候,就将通过with给这个SQL起一个别名,类似于封装起来,就是为这个SQL创建一个临时的视图(并不是真正的视图),方便下次使用。

CREATE TABLE students_hdfs (
    sid STRING,
    name STRING,
    age INT,
    sex STRING,
    clazz STRING
)WITH (
    'connector' = 'filesystem',           -- 必选:指定连接器类型
    'path' = 'hdfs://master:9000/data/student',  -- 必选:指定路径
    'format' = 'csv'                     -- 必选:文件系统连接器指定 format
);

-- 可以在查询hdfs时,不需要再重新的创建表就可以动态改成无界流
select * from students_hdfs /*+ OPTIONS('source.monitor-interval' = '5000' )  */;



-- tmp别名代表的时子查询的sql,可以在后面的sql中多次使用
with tmp as (
    select * from students_hdfs 
    /*+ OPTIONS('source.monitor-interval' = '5000' )  */
    where clazz='文科一班'
)
select * from tmp
union all
select * from tmp;
        3、DISTINCT:

在flink 的流处理中,使用distinct,flink需要将之前的数据保存在状态中,如果数据一直增加,状态会越来越大 状态越来越大,checkpoint时间会增加,最终会导致flink任务出问题

select 
count(distinct sid) 
from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */;

select 
    count(sid)  
from (
    select 
    distinct *
    from students_kafka /*+ OPTIONS('scan.startup.mode' = 'earliest-offset') */
);

注意事项:

       1、 当Flink Client客户端退出来以后,里面创建的动态表就不存在了。这些表结构是元数据,是存储在内存中的。

        2、当在进行where过滤的时候,字符串会出现三种情况:空的字符串、空格字符串、null的字符串,三者是有区别的:

        这三者是不同的概念,在进行where过滤的时候过滤的条件是不同的。

1、过滤空的字符串:
 where s!= ‘空字符串’

2、过滤空格字符串:
 where s!= ‘空格’

3、过滤null字符串:

where s!= null
Flink SQL中常见的函数:

from_unixtime: 
   
    以字符串格式 string 返回数字参数 numberic 的表示形式(默认为 ‘yyyy-MM-dd HH:mm:ss’

to_timestamp:  
    
    将格式为 string2(默认为:‘yyyy-MM-dd HH:mm:ss’)的字符串 string1 转换为 timestamp




本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/129336.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python+requests接口自动化测试

原来的web页面功能测试转变成接口测试,之前大多都是手工进行,利用postman和jmeter进行的接口测试,后来,组内有人讲原先web自动化的测试框架移驾成接口的自动化框架,使用的是java语言,但对于一个学java&…

Linux学习之进程三

目录 进程控制 fork函数 什么是写时拷贝 进程终止 mian函数的返回值 退出码 错误码 exit() 进程等待 1.什么是进程等待? 2.为什么要进行进程等待? 3.如何进程进程等待? wait,waitpid: waitpid 进程替换 …

Lua更多语法与使用

文章目录 目的错误处理元表和元方法垃圾回收协程模块面向对象总结 目的 在前一篇文章: 《Lua入门使用与基础语法》 中介绍了一些基础的内容。这里将继续介绍Lua一些更多的内容。 同样的本文参考自官方手册: https://www.lua.org/manual/ 错误处理 下…

node插件MongoDB(四)—— 库mongoose 操作文档使用(新增、删除、更新、查看文档)(二)

文章目录 前言(1)问题:安装的mongoose 库版本不应该过高导致的问题(2)重新安装低版本 一、插入文档1. 代码2. node终端效果3. 使用mongo.exe查询数据库的内容 二、删除文档1. 删除一条2. 批量删除3. 代码 三、修改文档…

Go基础知识全面总结

文章目录 go基本数据类型bool类型数值型字符字符串 数据类型的转换运算符和表达式1. 算数运算符2.关系运算符3. 逻辑运算符4. 位运算符5. 赋值运算符6. 其他运算符运算符优先级转义符 go基本数据类型 bool类型 布尔型的值只可以是常量 true 或者 false。⼀个简单的例⼦&#…

MIPSsim模拟器 使用说明

(一) 启动模拟器 双击MIPSsim.exe,即可启动该模拟器。模拟器启动时,自动将自己初始化为默认状态。所设置的默认值为: u所有通用寄存器和浮点寄存器为全0; u内存清零; u流水寄存器为全0&#xff…

C++结构体定义 创建 赋值 结构体数组 结构体指针 结构体嵌套结构体

结构体是什么&#xff1f; struct是自定义数据类型&#xff0c;是一些类型集合组成的一个类型。结构体的定义方式 #include<iostream> using namespace std;struct Student {string name;int age;int score; };创建结构体变量并赋值 方式一&#xff0c;先创建结构体变…

基于springboot+vue开发的教师工作量管理系

教师工作量管理系 springboot31 源码合集&#xff1a;www.yuque.com/mick-hanyi/javaweb 源码下载&#xff1a;博主私 摘要 随着信息技术在管理上越来越深入而广泛的应用&#xff0c;管理信息系统的实施在技术上已逐步成熟。本文介绍了教师工作量管理系统的开发全过程。通过…

人工智能技术的高速发展,普通人如何借助AI实现弯道超车?

人工智能技术的高速发展&#xff0c;普通人如何借助AI实现弯道超车&#xff1f; 随着互联网信息传播的爆炸&#xff0c;人类科技文明的快速发展“人工智能”成为新的话题&#xff0c;科技的进步也让普通人觉得自己与社会脱节&#xff0c;找工作越来越难&#xff0c;创业越来越难…

Python使用Numba装饰器进行加速

Python使用Numba装饰器进行加速 前言前提条件相关介绍实验环境Numba装饰器进行加速未加速的代码输出结果 numba.jit加速的代码输出结果 前言 由于本人水平有限&#xff0c;难免出现错漏&#xff0c;敬请批评改正。更多精彩内容&#xff0c;可点击进入Python日常小操作专栏、Ope…

Aspose.OCR for .NET 2023Crack

Aspose.OCR for .NET 2023Crack 为.NET在图片上播放OCR使所有用户和程序员都可以从特定的图像片段中提取文本和相关的细节&#xff0c;如字体、设计以及书写位置。这一特定属性为OCR的性能及其在扫描遵循排列的记录时的功能提供了动力。OCR的库使用一条线甚至几条线来处理这些特…

什么是证书管理

在自带设备和物联网文化的推动下&#xff0c;数字化使连接到互联网的设备数量空前加速。在企业网络环境中&#xff0c;每个在线运行的设备都需要一个数字证书来证明其合法性和安全运行。这些数字证书&#xff08;通常称为 X.509 证书&#xff09;要么来自称为证书颁发机构 &…

长虹智能电视使用123

1、开机 在接通电源的情况下&#xff0c;长虹智能电视开机有两种方式。 方式1&#xff1a; 按电视右下角开机按钮 方式2&#xff1a; 按电视遥控器开机按钮 长虹智能电视开机后会进入其操作系统&#xff08;安卓&#xff09;。 屏幕左右双箭头图表&#xff0c;手指点击会…

力扣876:链表的中间结点

力扣876&#xff1a;链表的中间结点 题目描述&#xff1a; 给你单链表的头结点 head &#xff0c;请你找出并返回链表的中间结点。 如果有两个中间结点&#xff0c;则返回第二个中间结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5] 输出&#xff1a;[3,4,5]…

无线优化之RRM模板

一、简介 RRM即,Radio Resource Management,射频资源管理 WLAN技术是以射频信号(如2.4G/5G的无线电磁波)作为传输介质,无线电磁波在传输过程中因周围环境导致无线信号衰减,从而影响无线用户上网的服务质量。 RRM模板主要用于保持最优的频射资源状态,自动检查周围无线…

二十四、城市建成区提取结果制图——建成区出图

一、前言 其实制图这一系列文章主要是为了照顾初学者,因为很多初学者并不是特别熟悉GIS平台一些操作,可能对于初步的制图有一定了解,但是对于一些稍微看起来高级并且复杂一点的图如何制作?例如下面这种多景的制作,其实吧万变不离其宗,这种仅仅只是拼接多幅数据框在一起,…

关于css 推荐几个超好看渐变色!

1.多彩糖果渐变 background: linear-gradient(135deg, #ff00cc, #ffcc00, #00ffcc, #ff0066);这个渐变色使用了多个鲜艳的颜色&#xff0c;从紫红色 (#ff00cc) 渐变到橙色 (#ffcc00)&#xff0c;然后到青色 (#00ffcc)&#xff0c;最后到鲜艳的粉红色 (#ff0066)。它给人一种快乐…

为啥$p(w|D)=p(y|X,w)$?

为啥 p ( w ∣ D ) p ( y ∣ X , w ) p(w|D)p(y|X,w) p(w∣D)p(y∣X,w)&#xff1f; p ( w ∣ X , y ) p ( w ∣ D ) p(w|X,y)p(w|D) p(w∣X,y)p(w∣D), p ( w ∣ D ) p ( D , w ) / p ( D ) p(w|D)p(D,w)/p(D) p(w∣D)p(D,w)/p(D)为啥 p ( D ∣ w ) p ( y ∣ X , w ) p(D|…

kubernetes istio

目录 一、部署 二、部署示例应用 三、部署遥测组件 四、流量管理 五、熔断 官网&#xff1a;https://istio.io/latest/zh/about/service-mesh/ 一、部署 提前准备好文件 tar zxf 15t10-1.19.3-linux-amd64.tar.gz cd 15t10-1.19.3/ export PATH$PWD/bin:$PATHistioctl install …

强化学习 - DQN及进化过程(Double DQN,Dueling DQN)

1.DQN 1.1概念 DQN相对于Q-Learning进行了三处改进&#xff1a; 1.引入神经网络&#xff1a;如下图所示希望能从状态S中提取Q(s,a) 2.经验回放机制&#xff1a;连续动作空间采样时&#xff0c;前后数据具有强关联性&#xff0c;而神经网络训练时要求数据之间具有独立同分布特性…