Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
- Flink 系列文章
- 一、函数分类
- 1、分类标准及类别
- 2、函数引用
- 1)、精确函数引用
- 2)、模糊函数引用
- 3、函数解析顺序
- 1)、精确函数引用
- 2)、模糊函数引用
- 二、系统(内置)函数
- 1、标量函数
- 1)、比较函数
- 2)、逻辑函数
- 3)、算术函数
- 4)、字符串函数
- 5)、时间函数
- 6)、条件函数
- 7)、类型转换函数
- 8)、集合函数
- 9)、JSON Functions
- 1、IS JSON
- 2、JSON_EXISTS
- 3、JSON_STRING
- 4、JSON_VALUE
- 5、JSON_QUERY
- 6、JSON_OBJECT
- 7、JSON_ARRAY
- 8、JSON_ARRAYAGG
- 10、JSON_OBJECTAGG
- 10)、值构建函数
- 11)、值获取函数
- 12)、分组函数
- 13)、哈希函数
- 2、聚合函数
- 3、时间间隔单位和时间点单位标识符
- 4、列函数
本文介绍了flink的函数分类、内置函数的说明及示例,特别是针对json function函数每个均以可运行的示例进行说明。
本文依赖flink集群能正常使用。
本文分为2个部分,即函数分类以及内置函数。
本文的示例均在Flink 1.17版本中运行。
一、函数分类
Flink 允许用户在 Table API 和 SQL 中使用函数进行数据的转换。
Flink 中的函数有两个划分标准。
1、分类标准及类别
一个划分标准是:系统(内置)函数和 Catalog 函数。系统函数没有名称空间,只能通过其名称来进行引用。 Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库命名空间。 用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数名 来对 Catalog 函数进行引用。
另一个划分标准是:临时函数和持久化函数。 临时函数始终由用户创建,它容易改变并且仅在会话的生命周期内有效。 持久化函数不是由系统提供,就是存储在 Catalog 中,它在会话的整个生命周期内都有效。
这两个划分标准给 Flink 用户提供了 4 种函数:
- 临时性系统函数
- 系统函数
- 临时性 Catalog 函数
- Catalog 函数
系统函数始终优先于 Catalog 函数解析,临时函数始终优先于持久化函数解析, 函数解析优先级如下所述。
2、函数引用
用户在 Flink 中可以通过精确、模糊两种引用方式引用函数。
1)、精确函数引用
精确函数引用允许用户跨 Catalog,跨数据库调用 Catalog 函数。
例如:select mycatalog.mydb.myfunc(x) from mytable 和 select mydb.myfunc(x) from mytable。
仅 Flink 1.10 以上版本支持。
2)、模糊函数引用
在模糊函数引用中,用户只需在 SQL 查询中指定函数名,例如: select myfunc(x) from mytable。
3、函数解析顺序
当函数名相同,函数类型不同时,函数解析顺序才有意义。
例如:当有三个都名为 “myfunc” 的临时性 Catalog 函数,Catalog 函数,和系统函数时, 如果没有命名冲突,三个函数将会被解析为一个函数。
1)、精确函数引用
由于系统函数没有命名空间,Flink 中的精确函数引用必须 指向临时性 Catalog 函数或 Catalog 函数。
解析顺序如下:
- 临时性 catalog 函数
- Catalog 函数
2)、模糊函数引用
解析顺序如下:
- 临时性系统函数
- 系统函数
- 临时性 Catalog 函数, 在会话的当前 Catalog 和当前数据库中
- Catalog 函数, 在会话的当前 Catalog 和当前数据库中
二、系统(内置)函数
Flink Table API & SQL 为用户提供了一组内置的数据转换函数。
1、标量函数
标量函数将零、一个或多个值作为输入并返回单个值作为结果。
1)、比较函数
2)、逻辑函数
3)、算术函数
4)、字符串函数
5)、时间函数
6)、条件函数
7)、类型转换函数
8)、集合函数
9)、JSON Functions
JSON 函数使用 SQL 标准的 ISO/IEC TR 19075-6 中所述的 JSON 路径表达式(JSON path expressions )。它们的语法受到 ECMAScript 的启发并采用了 ECMAScript 的许多功能,但既不是它的子集也不是它的超集。
路径表达式有两种风格,宽松和严格( lax and strict.)。省略时,它默认为严格模式。
严格模式旨在从架构角度检查数据,每当数据不符合路径表达式时,就会引发错误。但是,像 JSON_VALUE 这样的函数允许在遇到错误时定义回退行为。
宽松模式更宽容,并将错误转换为空序列。
特殊字符 $ 表示 JSON 路径中的根节点。路径可以访问属性 ( . a )、数组元素( .a)、数组元素 ( .a)、数组元素(.a[0].b) 或分支数组中的所有元素 ($.a[*].b)。
已知限制:
截至Flink 1.17版本并非正确支持宽松模式的所有功能。这是一个上游错误 (CALCITE-4717)。不保证非标准行为。
1、IS JSON
确定给定字符串是否为有效的 JSON。
指定可选的类型参数会限制允许哪种类型的 JSON 对象。如果字符串是有效的 JSON,但不是该类型,则返回 false。默认值为 VALUE。
- SQL语法
IS JSON [ { VALUE | SCALAR | ARRAY | OBJECT } ]
- table api语法
STRING.isJson([JsonType type])
- 示例
-- TRUE
Flink SQL> select '1' IS JSON;
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | TRUE |
+----+--------+
Flink SQL> select '[]' IS JSON;
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | TRUE |
+----+--------+
-- The following statements return TRUE.
SELECT '1' IS JSON;
SELECT '[]' IS JSON;
SELECT '{}' IS JSON;
SELECT '"abc"' IS JSON;
SELECT '1' IS JSON SCALAR;
SELECT '{}' IS JSON OBJECT;
-- The following statements return FALSE.
SELECT 'abc' IS JSON;
SELECT '1' IS JSON ARRAY;
SELECT '1' IS JSON OBJECT;
SELECT '{}' IS JSON SCALAR;
SELECT '{}' IS JSON ARRAY;
# 以下示例一样,不再赘述
'1' IS JSON
'[]' IS JSON
'{}' IS JSON
-- TRUE
'"abc"' IS JSON
-- FALSE
'abc' IS JSON
NULL IS JSON
-- TRUE
'1' IS JSON SCALAR
-- FALSE
'1' IS JSON ARRAY
-- FALSE
'1' IS JSON OBJECT
-- FALSE
'{}' IS JSON SCALAR
-- FALSE
'{}' IS JSON ARRAY
-- TRUE
'{}' IS JSON OBJECT
2、JSON_EXISTS
确定 JSON 字符串是否满足给定的路径搜索条件。
如果省略错误行为,则假定 FALSE ON ERROR 为默认值。
- SQL语法
JSON_EXISTS(jsonValue, path [ { TRUE | FALSE | UNKNOWN | ERROR } ON ERROR ])
- table api语法
STRING.jsonExists(STRING path [, JsonExistsOnError onError])
- 示例
Flink SQL> SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR);
+----+--------+
| op | EXPR$0 |
+----+--------+
| +I | FALSE |
+----+--------+
-- The following statements return TRUE.
SELECT JSON_EXISTS('{"a": true}', '$.a');
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}', '$.a[0].b');
SELECT JSON_EXISTS('{"a": true}', 'strict $.b' TRUE ON ERROR);
-- The following statements return FALSE.
SELECT JSON_EXISTS('{"a": true}', '$.b');
SELECT JSON_EXISTS('{"a": true}', 'strict $.b' FALSE ON ERROR);
-- TRUE
SELECT JSON_EXISTS('{"a": true}', '$.a');
-- FALSE
SELECT JSON_EXISTS('{"a": true}', '$.b');
-- TRUE
SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}',
'$.a[0].b');
-- TRUE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' TRUE ON ERROR);
-- FALSE
SELECT JSON_EXISTS('{"a": true}',
'strict $.b' FALSE ON ERROR);
3、JSON_STRING
将值序列化为 JSON。
此函数返回包含序列化值的 JSON 字符串。如果值为 NULL,则该函数返回 NULL。
- SQL语法
JSON_STRING(value)
- table api语法
jsonString(value)
- 示例
Flink SQL> SELECT JSON_STRING(1);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | 1 |
+----+--------------------------------+
-- returns NULL
SELECT JSON_STRING(CAST(NULL AS INT));
-- returns '1'
SELECT JSON_STRING(1);
-- returns 'true'
SELECT JSON_STRING(TRUE);
-- returns '"Hello, World!"'
JSON_STRING('Hello, World!');
-- returns '[1,2]'
JSON_STRING(ARRAY[1, 2])
-- NULL
JSON_STRING(CAST(NULL AS INT))
-- '1'
JSON_STRING(1)
-- 'true'
JSON_STRING(TRUE)
-- '"Hello, World!"'
JSON_STRING('Hello, World!')
-- '[1,2]'
JSON_STRING(ARRAY[1, 2])
4、JSON_VALUE
从 JSON 字符串中提取标量。
此方法在 JSON 字符串中搜索给定的路径表达式,如果该路径的值为标量,则返回该值。不能返回非标量值。
默认情况下,该值以 STRING 形式返回。使用 returningType 可以选择不同的类型,并支持以下类型:
- VARCHAR / STRING
- BOOLEAN
- INTEGER
- DOUBLE
对于空路径表达式或错误,可以将行为定义为返回 null、引发错误或返回定义的默认值。
省略时,默认值分别为 NULL ON EMPTY 或 NULL ON ERROR。
默认值可以是文本或表达式。如果默认值本身引发错误,则它将下降到 ON EMPTY 的错误行为,并引发 ON ERROR 的错误。
对于包含空格等特殊字符的路径,可以使用 [‘property’] 或 [“property”] 选择父对象中的指定属性。
请务必在属性名称两边加上单引号或双引号。
在 SQL 中使用 JSON_VALUE 时,路径是一个字符参数,该参数已经是单引号,因此您必须对属性名称周围的单引号进行转义,
例如 JSON_VALUE(‘{“a b”: “true”}’, ‘$.[’‘a b’‘]’)。
- SQL语法
JSON_VALUE(jsonValue, path [RETURNING <dataType>] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON EMPTY ] [ { NULL | ERROR | DEFAULT <defaultExpr> } ON ERROR ])
- table api语法
STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError])
- 示例
Flink SQL> SELECT JSON_VALUE('{"a": true}', '$.a');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | true |
+----+--------------------------------+
Flink SQL> SELECT JSON_VALUE('{"contains blank": "right"}', 'strict $.[''contains blank'']' NULL ON EMPTY DEFAULT 'wrong' ON ERROR);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | right |
+----+--------------------------------+
-- returns "true"
SELECT JSON_VALUE('{"a": true}', '$.a');
-- returns TRUE
SELECT JSON_VALUE('{"a": true}', '$.a' RETURNING BOOLEAN);
-- returns "false"
SELECT JSON_VALUE('{"a": true}', 'lax $.b' DEFAULT FALSE ON EMPTY);
-- returns "false"
SELECT JSON_VALUE('{"a": true}', 'strict $.b' DEFAULT FALSE ON ERROR);
-- returns 0.998D
SELECT JSON_VALUE('{"a.b": [0.998,0.996]}','$.["a.b"][0]' RETURNING DOUBLE);
-- returns "right"
SELECT JSON_VALUE('{"contains blank": "right"}', 'strict $.[''contains blank'']' NULL ON EMPTY DEFAULT 'wrong' ON ERROR);
5、JSON_QUERY
目前不支持 RETURNING 子句。
wrappingBehavior 确定是否应将提取的值包装到数组中,以及是无条件地包装,还是仅在值本身还不是数组时才这样做。
onEmpty 和 onError 分别确定路径表达式为空或引发错误时的行为。
默认情况下,在这两种情况下都返回 null。其他选择是使用空数组、空对象或引发错误。
- SQL语法
JSON_QUERY(jsonValue, path [ { WITHOUT | WITH CONDITIONAL | WITH UNCONDITIONAL } [ ARRAY ] WRAPPER ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON EMPTY ] [ { NULL | EMPTY ARRAY | EMPTY OBJECT | ERROR } ON ERROR ])
- table api语法
STRING.jsonQuery(path [, JsonQueryWrapper [, JsonQueryOnEmptyOrError, JsonQueryOnEmptyOrError ] ])
- 示例
Flink SQL> SELECT JSON_QUERY('{ "a": { "b": 1 } }', '$.a');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"b":1} |
+----+--------------------------------+
Flink SQL> SELECT JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {} |
+----+--------------------------------+
-- returns '{ "b": 1 }'
SELECT JSON_QUERY('{ "a": { "b": 1 } }', '$.a');
-- returns '[1, 2]'
SELECT JSON_QUERY('[1, 2]', '$');
-- returns NULL
SELECT JSON_QUERY(CAST(NULL AS STRING), '$');
-- returns '["c1","c2"]'
SELECT JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}', 'lax $.a[*].c');
-- Wrap the result into an array.
-- returns '[{}]'
SELECT JSON_QUERY('{}', '$' WITH CONDITIONAL ARRAY WRAPPER);
-- returns '[1, 2]'
SELECT JSON_QUERY('[1, 2]', '$' WITH CONDITIONAL ARRAY WRAPPER);
-- returns '[[1, 2]]'
SELECT JSON_QUERY('[1, 2]', '$' WITH UNCONDITIONAL ARRAY WRAPPER);
-- Scalars must be wrapped to be returned.
-- returns NULL
SELECT JSON_QUERY(1, '$');
-- returns '[1]'
SELECT JSON_QUERY(1, '$' WITH CONDITIONAL ARRAY WRAPPER);
-- Behavior if the path expression is empty.
-- returns '{}'
SELECT JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY);
-- Behavior if the path expression has an error.
-- returns '[]'
SELECT JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR);
-- '{ "b": 1 }'
JSON_QUERY('{ "a": { "b": 1 } }', '$.a')
-- '[1, 2]'
JSON_QUERY('[1, 2]', '$')
-- NULL
JSON_QUERY(CAST(NULL AS STRING), '$')
-- '["c1","c2"]'
JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}',
'lax $.a[*].c')
-- Wrap result into an array
-- '[{}]'
JSON_QUERY('{}', '$' WITH CONDITIONAL ARRAY WRAPPER)
-- '[1, 2]'
JSON_QUERY('[1, 2]', '$' WITH CONDITIONAL ARRAY WRAPPER)
-- '[[1, 2]]'
JSON_QUERY('[1, 2]', '$' WITH UNCONDITIONAL ARRAY WRAPPER)
-- Scalars must be wrapped to be returned
-- NULL
JSON_QUERY(1, '$')
-- '[1]'
JSON_QUERY(1, '$' WITH CONDITIONAL ARRAY WRAPPER)
-- Behavior if path expression is empty / there is an error
-- '{}'
JSON_QUERY('{}', 'lax $.invalid' EMPTY OBJECT ON EMPTY)
-- '[]'
JSON_QUERY('{}', 'strict $.invalid' EMPTY ARRAY ON ERROR)
6、JSON_OBJECT
从键值对列表生成 JSON 对象字符串。
请注意,键必须是非 NULL 字符串文本,而值可以是任意表达式。
此函数返回一个 JSON 字符串。ON NULL 行为定义如何处理 NULL 值。如果省略,则默认假定 NULL ON NULL。
从另一个 JSON 构造函数调用(JSON_OBJECT、JSON_ARRAY)创建的值是直接插入的,而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
- SQL语法
JSON_OBJECT([[KEY] key VALUE value]* [ { NULL | ABSENT } ON NULL ])
- table api语法
jsonObject(JsonOnNull, keyValues...)
- 示例
Flink SQL> SELECT JSON_OBJECT(
> KEY 'K1'
> VALUE JSON_OBJECT(
> KEY 'K2'
> VALUE 'V'
> )
> );
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"K1":{"K2":"V"}} |
+----+--------------------------------+
Flink SQL> SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {} |
+----+--------------------------------+
-- returns '{}'
SELECT JSON_OBJECT();
-- returns '{"K1":"V1","K2":"V2"}'
SELECT JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2');
-- Use an expression as a value.
SELECT JSON_OBJECT('orderNo' VALUE orders.orderId);
-- ON NULL
-- '{"K1":null}'
SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL);
-- ON NULL
-- '{}'
SELECT JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL);
-- returns '{"K1":{"K2":"V"}}'
SELECT JSON_OBJECT(
KEY 'K1'
VALUE JSON_OBJECT(
KEY 'K2'
VALUE 'V'
)
);
-- '{}'
JSON_OBJECT()
-- '{"K1":"V1","K2":"V2"}'
JSON_OBJECT('K1' VALUE 'V1', 'K2' VALUE 'V2')
-- Expressions as values
JSON_OBJECT('orderNo' VALUE orders.orderId)
-- ON NULL
JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) NULL ON NULL) -- '{"K1":null}'
JSON_OBJECT(KEY 'K1' VALUE CAST(NULL AS STRING) ABSENT ON NULL) -- '{}'
-- '{"K1":{"K2":"V"}}'
JSON_OBJECT(
KEY 'K1'
VALUE JSON_OBJECT(
KEY 'K2'
VALUE 'V'
)
)
7、JSON_ARRAY
从值列表生成 JSON 数组字符串。
此函数返回一个 JSON 字符串。这些值可以是任意表达式。ON NULL 行为定义如何处理 NULL 值。如果省略,则默认假定 ABSENT ON NULL。
从另一个 JSON 构造函数调用(JSON_OBJECT、JSON_ARRAY)创建的元素是直接插入的,而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
- SQL语法
JSON_ARRAY([value]* [ { NULL | ABSENT } ON NULL ])
- table api语法
jsonArray(JsonOnNull, values...)
- 示例
Flink SQL>
> SELECT JSON_ARRAY(1, '2');
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [1,"2"] |
+----+--------------------------------+
Received a total of 1 row
Flink SQL> SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [] |
+----+--------------------------------+
-- returns '[]'
SELECT JSON_ARRAY();
-- returns '[1,"2"]'
SELECT JSON_ARRAY(1, '2');
-- Use an expression as a value.
SELECT JSON_ARRAY(orders.orderId);
-- ON NULL
-- returns '[null]'
SELECT JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL);
-- ON NULL
-- returns '[]'
SELECT JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL);
-- returns '[[1]]'
SELECT JSON_ARRAY(JSON_ARRAY(1));
-- '[]'
JSON_ARRAY()
-- '[1,"2"]'
JSON_ARRAY(1, '2')
-- Expressions as values
JSON_ARRAY(orders.orderId)
-- ON NULL
JSON_ARRAY(CAST(NULL AS STRING) NULL ON NULL) -- '[null]'
JSON_ARRAY(CAST(NULL AS STRING) ABSENT ON NULL) -- '[]'
-- '[[1]]'
JSON_ARRAY(JSON_ARRAY(1))
8、JSON_ARRAYAGG
将明细聚合到 JSON 数组字符串中。
JSON_ARRAYAGG 函数通过将指定的项聚合到数组中来创建 JSON 对象字符串。
item 表达式可以是任意的,包括其他 JSON 函数。
如果值为 NULL,则 ON NULL 行为定义要执行的操作。如果省略,则 ABSENT ON NULL 为默认值。
OVER 窗口、无限会话窗口或 HOP 窗口不支持JSON_ARRAYAGG函数。
- SQL语法
JSON_ARRAYAGG(items [ { NULL | ABSENT } ON NULL ])
- table api语法
在这里插入代码片
- 示例
Flink SQL> CREATE TABLE source_table (
> userId INT,
> age INT,
> balance DOUBLE,
> userName STRING,
> t_insert_time AS localtimestamp,
> WATERMARK FOR t_insert_time AS t_insert_time
> ) WITH (
> 'connector' = 'datagen',
> 'rows-per-second'='5',
> 'fields.userId.kind'='sequence',
> 'fields.userId.start'='1',
> 'fields.userId.end'='10',
>
> 'fields.balance.kind'='random',
> 'fields.balance.min'='1',
> 'fields.balance.max'='100',
>
> 'fields.age.min'='1',
> 'fields.age.max'='1000',
>
> 'fields.userName.length'='10'
> );
[INFO] Execute statement succeed.
Flink SQL> select * from source_table;
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| op | userId | age | balance | userName | t_insert_time |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
| +I | 1 | 555 | 90.45012880441223 | 7e2b6c7beb | 2023-11-06 17:29:05.273 |
| +I | 2 | 209 | 32.07201650494765 | f652baac94 | 2023-11-06 17:29:05.274 |
| +I | 3 | 278 | 24.299962537076734 | 11b4353416 | 2023-11-06 17:29:05.274 |
| +I | 4 | 433 | 58.634356546049574 | 21d5d09603 | 2023-11-06 17:29:05.274 |
| +I | 5 | 55 | 16.20617629075601 | d626f31213 | 2023-11-06 17:29:05.274 |
| +I | 6 | 442 | 98.87803427244727 | 0305c21dc5 | 2023-11-06 17:29:06.267 |
| +I | 7 | 19 | 96.11095443982174 | ea873b2df2 | 2023-11-06 17:29:06.268 |
| +I | 8 | 806 | 36.5775262369553 | f8df556b22 | 2023-11-06 17:29:06.268 |
| +I | 9 | 919 | 69.47517602162831 | 85074390f3 | 2023-11-06 17:29:06.268 |
| +I | 10 | 46 | 47.519467818569815 | 662990446f | 2023-11-06 17:29:06.268 |
+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
Received a total of 10 rows
Flink SQL> SELECT
> JSON_ARRAYAGG(userName)
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | ["ee2e4edb32"] |
| -U | ["ee2e4edb32"] |
| +U | ["ee2e4edb32","66e13f3f77"] |
| -U | ["ee2e4edb32","66e13f3f77"] |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
| -U | ["ee2e4edb32","66e13f3f77",... |
| +U | ["ee2e4edb32","66e13f3f77",... |
+----+--------------------------------+
Received a total of 19 rows
Flink SQL> SELECT
> JSON_ARRAYAGG(userId)
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | [1] |
| -U | [1] |
| +U | [1,2] |
| -U | [1,2] |
| +U | [1,2,3] |
| -U | [1,2,3] |
| +U | [1,2,3,4] |
| -U | [1,2,3,4] |
| +U | [1,2,3,4,5] |
| -U | [1,2,3,4,5] |
| +U | [1,2,3,4,5,6] |
| -U | [1,2,3,4,5,6] |
| +U | [1,2,3,4,5,6,7] |
| -U | [1,2,3,4,5,6,7] |
| +U | [1,2,3,4,5,6,7,8] |
| -U | [1,2,3,4,5,6,7,8] |
| +U | [1,2,3,4,5,6,7,8,9] |
| -U | [1,2,3,4,5,6,7,8,9] |
| +U | [1,2,3,4,5,6,7,8,9,10] |
+----+--------------------------------+
Received a total of 19 rows
10、JSON_OBJECTAGG
将key-value表达式聚合到 JSON 字符串中。
JSON_OBJECTAGG 函数通过将key-value表达式聚合到单个 JSON 对象中来创建 JSON 对象字符串。
key表达式必须返回不可为 null 的字符串。value表达式可以是任意的,包括其他 JSON 函数。
密钥必须是唯一的。如果一个key多次出现,则会引发错误。
如果value为 NULL,则 ON NULL 行为定义要执行的操作。如果省略,则 NULL ON NULL 为默认值。
OVER 窗口中不支持 JSON_OBJECTAGG 函数。
- SQL语法
JSON_OBJECTAGG([KEY] key VALUE value [ { NULL | ABSENT } ON NULL ])
- table api语法
在这里插入代码片
- 示例
Flink SQL> select
> JSON_OBJECTAGG(userName VALUE 'f652baac94' )
> FROM source_table;
+----+--------------------------------+
| op | EXPR$0 |
+----+--------------------------------+
| +I | {"0c3ceeca6f":"f652baac94"} |
| -U | {"0c3ceeca6f":"f652baac94"} |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
| -U | {"0c3ceeca6f":"f652baac94",... |
| +U | {"0c3ceeca6f":"f652baac94",... |
+----+--------------------------------+
10)、值构建函数
11)、值获取函数
12)、分组函数
13)、哈希函数
2、聚合函数
聚合函数将所有的行作为输入,并返回单个聚合值作为结果。
3、时间间隔单位和时间点单位标识符
下表列出了时间间隔单位和时间点单位标识符。
对于 Table API,请使用 _ 代替空格(例如 DAY_TO_HOUR)
4、列函数
列函数用于选择或丢弃表的列。
列函数仅在 Table API 中使用。
详细语法如下:
//列函数:
withColumns(columnExprs)
withoutColumns(columnExprs)
//多列表达式:
columnExpr [, columnExpr]*
//单列表达式:
columnRef | columnIndex to columnIndex | columnName to columnName
//列引用:
columnName(The field name that exists in the table) | columnIndex(a positive integer starting from 1)
列函数的用法如下表所示(假设我们有一个包含 5 列的表:(a: Int, b: Long, c: String, d:String, e: String)):
列函数可用于所有需要列字段的地方,例如 select、groupBy、orderBy、UDFs 等函数,例如:
table
.groupBy(withColumns(range(1, 3)))
.select(withColumns(range("a", "b")), myUDAgg(myUDF(withColumns(range(5, 20)))));
以上,介绍了flink的函数分类、内置函数的说明及示例,特别是针对json function函数每个均以可运行的示例进行说明。