31、Flink的SQL Gateway介绍及示例

Flink 系列文章

1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接

13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(2)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(3)
19、Flink 的Table API 和 SQL 中的自定义函数及示例(4)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
21、Flink 的table API与DataStream API 集成(1)- 介绍及入门示例、集成说明
21、Flink 的table API与DataStream API 集成(2)- 批处理模式和inser-only流处理
21、Flink 的table API与DataStream API 集成(3)- changelog流处理、管道示例、类型转换和老版本转换示例
21、Flink 的table API与DataStream API 集成(完整版)
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
25、Flink 的table api与sql之函数(自定义函数示例)
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
31、Flink的SQL Gateway介绍及示例
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
33、Flink 的Table API 和 SQL 中的时区
35、Flink 的 Formats 之CSV 和 JSON Format
36、Flink 的 Formats 之Parquet 和 Orc Format
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的


文章目录

  • Flink 系列文章
  • 一、概述
    • 1、启动
    • 2、查询示例
      • 1)、Step 1: Open a session
      • 2)、Step 2: Execute a query
      • 3)、Step 3: Fetch results
    • 3、配置
      • 1)、启动配置
      • 2)、运行配置
    • 4、支持的Endpoints
  • 二、REST Endpoint
    • 1、SQL处理概述
      • 1)、Open Session
      • 2)、Submit SQL
      • 3)、Fetch Results
    • 2、Endpoint Options
    • 3、REST API
      • 1)、/api_versions
      • 2)、/info
      • 3)、/sessions
      • 4)、/sessions/:session_handle
        • 1、Delete 请求方式-关闭session
        • 2、Get请求方式-获取session配置信息
      • 5)、/sessions/:session_handle/complete-statement
      • 6)、/sessions/:session_handle/configure-session
      • 7)、/sessions/:session_handle/heartbeat
      • 8)、/sessions/:session_handle/operations/:operation_handle/cancel
      • 9)、/sessions/:session_handle/operations/:operation_handle/close
      • 10)、/sessions/:session_handle/operations/:operation_handle/result/:token
      • 11)、/sessions/:session_handle/operations/:operation_handle/status
      • 12)、/sessions/:session_handle/statements
    • 4、Data Type Mapping
  • 三、HiveServer2 Endpoint


本文介绍了Flink gateway的启动配置、支持的api以及简单的hiveserver2 endpoint部分,特别是api均以示例进行说明。
本文依赖flink集群能正常使用。
本文分为3个部分,即gateway启动、rest api使用和hiveserver2 endpoint。
本文的示例是在Flink 1.17版本中运行。

一、概述

SQL Gateway 是一种使远程多个客户端能够并发执行SQL的服务。它提供了一种提交Flink Job、查找元数据和在线分析数据的简单方法。

SQL Gateway 由可插入endpoints 和SqlGatewayService组成。SqlGatewayService是一个由endpoints 重用以处理请求的处理器。endpoints 是允许用户进行连接的入口点。根据endpoints 的类型,用户可以使用不同的utils进行连接。

在这里插入图片描述

1、启动

SQL Gateway 捆绑在常规的Flink发行版中,因此可以开箱即用。它只需要一个正在运行的Flink集群,在那里可以执行表程序。有关设置Flink集群的更多信息,请参阅集群和部署部分。如果您只是想试用SQL客户端,还可以使用以下命令启动一个带有一个工作进程的本地集群:

$ ./bin/start-cluster.sh
$ ./bin/sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=localhost

[root@minio_api_1049 bin]# hostname
minio_api_1049
[root@minio_api_1049 bin]# ./sql-gateway.sh start -Dsql-gateway.endpoint.rest.address=192.168.10.49
Starting sql-gateway daemon on host minio_api_1049.
[root@minio_api_1049 bin]# jps
29396 SqlClient
6180 SqlGateway
28424 TaskManagerRunner
28077 StandaloneSessionClusterEntrypoint
6430 Jps
[root@minio_api_1049 bin]# curl http://192.168.10.49:8083/v1/info
{"productName":"Apache Flink","version":"1.17.1"}
[root@minio_api_1049 bin]# 

或者在浏览器访问
在这里插入图片描述

2、查询示例

以下三个步骤是紧密结合在一起的,也就是从第一步开始做。

1)、Step 1: Open a session

SQL网关使用返回结果中的sessionHandle来唯一标识每个活动用户。

curl --request POST http://localhost:8083/v1/sessions
{"sessionHandle":"..."}
  • 示例
[root@minio_api_1049 bin]# curl --request POST http://192.168.10.49:8083/v1/sessions
{"sessionHandle":"52c748a3-60be-4131-9c5e-65e872beb3ac"}

2)、Step 2: Execute a query

SQL网关使用返回结果中的operationHandle来唯一标识提交的SQL。

curl --request POST http://localhost:8083/v1/sessions/${sessionHandle}/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"..."}
  • 示例
[root@minio_api_1049 bin]# curl --request POST http://192.168.10.49:8083/v1/sessions/52c748a3-60be-4131-9c5e-65e872beb3ac/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"5b85b27c-30df-4deb-9f46-7054777f0242"}

3)、Step 3: Fetch results

使用上面的sessionHandle和operationHandle,可以获取相应的结果。

$ curl --request GET http://localhost:8083/v1/sessions/${sessionHandle}/operations/${operationHandle}/result/0
  • 示例
[root@minio_api_1049 bin]# curl --request GET http://192.168.10.49:8083/v1/sessions/52c748a3-60be-4131-9c5e-65e872beb3ac/operations/5b85b27c-30df-4deb-9f46-7054777f0242/result/0
{
	"resultType": "PAYLOAD",
	"isQueryResult": true,
	"jobID": "583fd3a557efc5edc5783de1e1491245",
	"resultKind": "SUCCESS_WITH_CONTENT",
	"results": {
		"columns": [{
			"name": "EXPR$0",
			"logicalType": {
				"type": "INTEGER",
				"nullable": false
			},
			"comment": null
		}],
		"rowFormat": "JSON",
		"data": [{
			"kind": "INSERT",
			"fields": [1]
		}]
	},
	"nextResultUri": "/v1/sessions/52c748a3-60be-4131-9c5e-65e872beb3ac/operations/5b85b27c-30df-4deb-9f46-7054777f0242/result/1"
}

结果中的nextResultUri用于在不为null的情况下获取下一批结果。

curl --request GET ${nextResultUri}
  • 示例
[root@minio_api_1049 bin]# curl --request GET http://192.168.10.49:8083/v1/sessions/52c748a3-60be-4131-9c5e-65e872beb3ac/operations/5b85b27c-30df-4deb-9f46-7054777f0242/result/1
{
	"resultType": "EOS",
	"isQueryResult": true,
	"jobID": "583fd3a557efc5edc5783de1e1491245",
	"resultKind": "SUCCESS_WITH_CONTENT",
	"results": {
		"columns": [{
			"name": "EXPR$0",
			"logicalType": {
				"type": "INTEGER",
				"nullable": false
			},
			"comment": null
		}],
		"rowFormat": "JSON",
		"data": []
	}
}

3、配置

1)、启动配置

截至Flink 1.17版本,SQL网关脚本具有以下可选命令。

 ./bin/sql-gateway.sh --help

Usage: sql-gateway.sh [start|start-foreground|stop|stop-all] [args]
  commands:
    start               - Run a SQL Gateway as a daemon
    start-foreground    - Run a SQL Gateway as a console application
    stop                - Stop the SQL Gateway daemon
    stop-all            - Stop all the SQL Gateway daemons
    -h | --help         - Show this help message
    

对于“start”或“start foreground”命令,可以在CLI中配置SQL网关。

./bin/sql-gateway.sh start --help

Start the Flink SQL Gateway as a daemon to submit Flink SQL.

  Syntax: start [OPTIONS]
     -D <property=value>   Use value for given property
     -h,--help             Show the help message with descriptions of all
                           options.

2)、运行配置

可以在下面启动SQL网关时配置SQL网关,也可以配置任何有效的Flink配置条目:

./sql-gateway -Dkey=value

在这里插入图片描述

4、支持的Endpoints

Flink本机支持REST Endpoints 和HiveServer2 Endpoints。默认情况下,SQL网关与REST端点绑定在一起。使用灵活的体系结构,用户可以通过调用

./bin/sql-gateway.sh start -Dsql-gateway.endpoint.type=hiveserver2

或者conf/flink-conf.yaml配置

sql-gateway.endpoint.type: hiveserver2

客户端的优先级高于配置文件的优先级,如果配置项一致的情况下。

二、REST Endpoint

REST端点允许用户使用REST API连接到SQL网关。

1、SQL处理概述

参考上文中的查询示例

1)、Open Session

当客户端连接到SQL网关时,SQL网关会创建一个会话作为上下文,以在客户端和SQL网关之间的交互过程中存储用户指定的信息。创建会话后,SQL网关服务器将返回一个名为SessionHandle的标识符,用于以后的交互。

2)、Submit SQL

会话注册后,客户端可以向SQL网关服务器提交SQL。提交SQL时,SQL将被转换为Operation,并返回名为OperationHandle的标识符以便稍后获取结果。操作有其生命周期,客户端可以取消操作的执行或关闭操作以释放操作使用的资源。

3)、Fetch Results

使用OperationHandle,客户端可以从Operation中获取结果。如果操作准备就绪,SQL网关将返回一批具有相应模式的数据和用于获取下一批数据的URI。当提取完所有结果后,SQL网关将在响应中用值EOS填充resultType,并且下一批数据的URI为null。
在这里插入图片描述

2、Endpoint Options

在这里插入图片描述

3、REST API

该功能目前处于实验性的。

  • Open API v1 specification 用户可以提交sql并执行Allow users to submit statements to the gateway and execute.
  • Open API v2 specification 支持sql client连接gateway Supports SQL Client to connect to the gateway

默认是V2版本。V2版本的yaml文件。

以下示例API V2版本的使用。

1)、/api_versions

  • API描述
/api_versions
请求方式: GEThttp响应码: 200 OK
获取Rest Endpoint的当前可用版本。客户端可以选择一个返回版本作为稍后通信的协议。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetApiVersionResponseBody",
  "properties" : {
    "versions" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    }
  }
}
  • 示例
[root@minio_api_1049 conf]# curl --request GET http://192.168.10.49:8083/v2/api_versions
{"versions":["V1","V2"]}
[root@minio_api_1049 conf]# curl --request GET http://192.168.10.49:8083/v1/api_versions
{"versions":["V1","V2"]}

2)、/info

  • 描述
/info
http请求方式: GEThttp响应码: 200 OK
获取集群的元数据
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:util:GetInfoResponseBody",
  "properties" : {
    "productName" : {
      "type" : "string"
    },
    "version" : {
      "type" : "string"
    }
  }
}
  • 示例
[root@minio_api_1049 conf]# curl --request GET http://192.168.10.49:8083/v2/info
{"productName":"Apache Flink","version":"1.17.1"}
[root@minio_api_1049 conf]# curl --request GET http://192.168.10.49:8083/v1/info
{"productName":"Apache Flink","version":"1.17.1"}

3)、/sessions

  • 描述
/sessions
http请求方式: POSThttp响应码: 200 OK
打开具有特定属性的新会话。可以为当前会话指定特定属性,该属性将覆盖网关的默认属性。
  • 请求报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionRequestBody",
  "properties" : {
    "properties" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "string"
      }
    },
    "sessionName" : {
      "type" : "string"
    }
  }
}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:OpenSessionResponseBody",
  "properties" : {
    "sessionHandle" : {
      "type" : "string"
    }
  }
}
  • 示例
[root@minio_api_1049 conf]# curl --request POST http://192.168.10.49:8083/v1/sessions
{"sessionHandle":"2daa0882-3c17-46bc-b7d8-a23ca48d41e2"}
[root@minio_api_1049 conf]# curl --request POST http://192.168.10.49:8083/v2/sessions
{"sessionHandle":"93597fe1-2574-406f-bce7-6cafb8dee434"}

4)、/sessions/:session_handle

1、Delete 请求方式-关闭session
  • 描述
/sessions/:session_handle
http请求方式: DELETEhttp响应码: 200 OK
关闭指定的session。
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:CloseSessionResponseBody",
  "properties" : {
    "status" : {
      "type" : "string"
    }
  }
}
  • 示例
[root@minio_api_1049 conf]# curl --request DELETE http://192.168.10.49:8083/v1/sessions/2daa0882-3c17-46bc-b7d8-a23ca48d41e2
{"status":"CLOSED"}
2、Get请求方式-获取session配置信息
  • 描述
/sessions/:session_handle
http请求方式: GEThttp响应码: 200 OK
获取指定session的配置信息
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:GetSessionConfigResponseBody",
  "properties" : {
    "properties" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "string"
      }
    }
  }
}
  • 示例
[root@minio_api_1049 conf]# curl --request GET http://192.168.10.49:8083/v2/sessions/93597fe1-2574-406f-bce7-6cafb8dee434
{
	"properties": {
		"state.checkpoints.num-retained": "20",
		"jobmanager.execution.failover-strategy": "region",
		"jobmanager.rpc.address": "localhost",
		"jobmanager.bind-host": "0.0.0.0",
		"execution.savepoint.ignore-unclaimed-state": "false",
		"table.resources.download-dir": "/tmp/sql-gateway-93597fe1-2574-406f-bce7-6cafb8dee434",
		"taskmanager.host": "localhost",
		"parallelism.default": "1",
		"taskmanager.numberOfTaskSlots": "2",
		"pipeline.classpaths": "",
		"taskmanager.memory.process.size": "4096m",
		"execution.checkpointing.mode": "EXACTLY_ONCE",
		"taskmanager.bind-host": "0.0.0.0",
		"execution.savepoint-restore-mode": "NO_CLAIM",
		"sql-gateway.endpoint.rest.address": "192.168.10.49",
		"web.cancel.enable": "true",
		"execution.target": "remote",
		"jobmanager.memory.process.size": "2048m",
		"jobmanager.rpc.port": "6123",
		"rest.bind-address": "0.0.0.0",
		"execution.checkpointing.interval": "5000",
		"execution.attached": "true",
		"execution.checkpointing.externalized-checkpoint-retention": "RETAIN_ON_CANCELLATION",
		"execution.shutdown-on-attached-exit": "false",
		"pipeline.jars": "file:/usr/local/bigdata/flink-1.17.1/opt/flink-python-1.17.1.jar",
		"web.submit.enable": "true",
		"rest.address": "192.168.10.49",
		"state.backend": "filesystem"
	}
}

5)、/sessions/:session_handle/complete-statement

  • 描述
/sessions/:session_handle/complete-statement
http请求方式: GEThttp响应码: 200 OK
在给定位置获取给定语句的完成提示。
路径参数
  • session_handle -上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:CompleteStatementRequestBody",
  "properties" : {
    "position" : {
      "type" : "integer"
    },
    "statement" : {
      "type" : "string"
    }
  }
}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:CompleteStatementResponseBody",
  "properties" : {
    "candidates" : {
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    }
  }
}
  • 示例
# 1、获取session
curl --request POST http://192.168.10.49:8083/v2/sessions
{"sessionHandle":"95e5a02a-be4f-4158-a3b0-adb23407f865"}

# 2、查询
curl --request GET http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/complete-statement -H "Content-Type: application/json" -d '{"statement": "SELECT 1", "position": 0}'

[root@minio_api_1049 bin]# curl --request GET http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/complete-statement -H "Content-Type: application/json" -d '{"statement": "SELECT 1", "position": 0}'
{"candidates":[]}


6)、/sessions/:session_handle/configure-session

  • 描述
/sessions/:session_handle/configure-session
http请求方式: POSThttp响应码: 200 OK
使用以下语句配置会话: CREATE TABLE, DROP TABLE, ALTER TABLE, CREATE DATABASE, DROP DATABASE, ALTER DATABASE, CREATE FUNCTION, DROP FUNCTION, ALTER FUNCTION, CREATE CATALOG, DROP CATALOG, USE CATALOG, USE [CATALOG.]DATABASE, CREATE VIEW, DROP VIEW, LOAD MODULE, UNLOAD MODULE, USE MODULE, ADD JAR.
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:session:ConfigureSessionRequestBody",
  "properties" : {
    "executionTimeout" : {
      "type" : "integer"
    },
    "statement" : {
      "type" : "string"
    }
  }
}
  • 响应报文
{}
  • 示例
# 1、获取session
# 1、获取session
curl --request POST http://192.168.10.49:8083/v2/sessions
{"sessionHandle":"95e5a02a-be4f-4158-a3b0-adb23407f865"}

# 2、创建数据库和表
curl --request POST http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/configure-session  -H "Content-Type: application/json" -d '{"statement": " create database db1"'}

curl --request POST http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/configure-session  -H "Content-Type: application/json" -d "{\"statement\": \"CREATE TABLE myTable2 (id INT,name STRING,score DOUBLE) WITH ('connector' = 'csv','csv.filepath' = '/tmp/test.csv','format' = 'csv');\"}"

# 创建数据库
[root@minio_api_1049 bin]# curl --request POST http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/configure-session  -H "Content-Type: application/json" -d '{"statement": " create database db1"'}
{}

# 创建表
[root@minio_api_1049 bin]# curl --request POST http://192.168.10.49:8083/v2/sessions/95e5a02a-be4f-4158-a3b0-adb23407f865/configure-session  -H "Content-Type: application/json" -d "{\"statement\": \"CREATE TABLE myTable2 (id INT,name STRING,score DOUBLE) WITH ('connector' = 'csv','csv.filepath' = '/tmp/test.csv','format' = 'csv');\"}"
{}

7)、/sessions/:session_handle/heartbeat

  • 描述
/sessions/:session_handle/heartbeat
http请求方式: POSThttp响应码: 200 OK
触发heartbeat以告知服务器客户端处于活动状态,并在配置的超时值内保持会话的活动状态。
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{}
  • 响应报文
{}
  • 示例
[root@minio_api_1049 conf]# curl --request POST http://192.168.10.49:8083/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/heartbeat
{}

8)、/sessions/:session_handle/operations/:operation_handle/cancel

  • 描述
/sessions/:session_handle/operations/:operation_handle/cancel
http请求方式: POSThttp响应码: 200 OK
取消操作。
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • operation_handle - 标识操作的OperationHandle。获取方式参考上文中的示例。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
  "properties" : {
    "status" : {
      "type" : "string"
    }
  }
}
  • 示例
# 1、获取session
curl --request POST http://192.168.10.49:8083/v2/sessions
{"sessionHandle":"34e27e78-a138-4524-986d-31dd632a274d"}

# 2、获取opertional
curl --request POST http://192.168.10.49:8083/v2/sessions/34e27e78-a138-4524-986d-31dd632a274d/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"0ff95159-85e6-488c-ad4c-e113353ccb73"}

# 3、取消操作
curl --request POST http://192.168.10.49:8083/v2/sessions/34e27e78-a138-4524-986d-31dd632a274d/operations/0ff95159-85e6-488c-ad4c-e113353ccb73/cancel
# 日志输出,由于本示例是select 1,状态是已经完成,已经完成的不能取消
org.apache.flink.table.gateway.api.utils.SqlGatewayException: Failed to convert the Operation Status from FINISHED to CANCELED for 0ff95159-85e6-488c-ad4c-e113353ccb73.

9)、/sessions/:session_handle/operations/:operation_handle/close

  • 描述
/sessions/:session_handle/operations/:operation_handle/close
http请求方式: DELETEhttp响应码: 200 OK
Close the operation.
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • operation_handle - 标识操作的OperationHandle。获取方式参考上文中的示例。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
  "properties" : {
    "status" : {
      "type" : "string"
    }
  }
}
  • 示例
# 1、获取session
curl --request POST http://192.168.10.49:8083/v2/sessions
{"sessionHandle":"34e27e78-a138-4524-986d-31dd632a274d"}

# 2、获取opertional
curl --request POST http://192.168.10.49:8083/v2/sessions/34e27e78-a138-4524-986d-31dd632a274d/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"0ff95159-85e6-488c-ad4c-e113353ccb73"}

# 3、关闭
[root@minio_api_1049 bin]# curl --request DELETE http://192.168.10.49:8083/v2/sessions/34e27e78-a138-4524-986d-31dd632a274d/operations/0ff95159-85e6-488c-ad4c-e113353ccb73/close
{"status":"CLOSED"}

10)、/sessions/:session_handle/operations/:operation_handle/result/:token

  • 描述
/sessions/:session_handle/operations/:operation_handle/result/:token
http请求方式: GEThttp响应码: 200 OK
Fetch results of Operation.
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • operation_handle - 标识操作的OperationHandle。获取方式参考上文中的示例。
  • token - The OperationHandle that identifies a operation.
查询参数
  • rowFormat (mandatory): The row format to serialize the RowData.
  • 请求报文
{}
  • 响应报文
{
  "type" : "any"
}
  • 示例
# session_handle
[root@minio_api_1049 conf]#  curl --request POST http://192.168.10.49:8083/v1/sessions
{"sessionHandle":"5e2c4c35-12c2-45f2-b87c-66506a32ca48"}

# operation_handle
[root@minio_api_1049 conf]# curl --request POST http://192.168.10.49:8083/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"e1774434-b657-4b20-ab2b-2579f3c0fc47"}

# /sessions/:session_handle/operations/:operation_handle/result/:token
[root@minio_api_1049 conf]# curl --request GET http://localhost:8083/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/operations/e1774434-b657-4b20-ab2b-2579f3c0fc47/result/0
{
	"resultType": "PAYLOAD",
	"isQueryResult": true,
	"jobID": "49eae7c3a3016df44c36296157958afb",
	"resultKind": "SUCCESS_WITH_CONTENT",
	"results": {
		"columns": [{
			"name": "EXPR$0",
			"logicalType": {
				"type": "INTEGER",
				"nullable": false
			},
			"comment": null
		}],
		"rowFormat": "JSON",
		"data": [{
			"kind": "INSERT",
			"fields": [1]
		}]
	},
	"nextResultUri": "/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/operations/e1774434-b657-4b20-ab2b-2579f3c0fc47/result/1"

# /v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/operations/e1774434-b657-4b20-ab2b-2579f3c0fc47/result/1
[root@minio_api_1049 conf]# curl --request GET http://localhost:8083//v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/operations/e1774434-b657-4b20-ab2b-2579f3c0fc47/result/1
{
	"resultType": "EOS",
	"isQueryResult": true,
	"jobID": "49eae7c3a3016df44c36296157958afb",
	"resultKind": "SUCCESS_WITH_CONTENT",
	"results": {
		"columns": [{
			"name": "EXPR$0",
			"logicalType": {
				"type": "INTEGER",
				"nullable": false
			},
			"comment": null
		}],
		"rowFormat": "JSON",
		"data": []
	}
}
}


11)、/sessions/:session_handle/operations/:operation_handle/status

  • 描述
/sessions/:session_handle/operations/:operation_handle/status
http请求方式: GEThttp响应码: 200 OK
Get the status of operation.
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • operation_handle - 标识操作的OperationHandle。获取方式参考上文中的示例。
  • 请求报文
{}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:operation:OperationStatusResponseBody",
  "properties" : {
    "status" : {
      "type" : "string"
    }
  }
}
  • 示例
# session_handle
[root@minio_api_1049 conf]#  curl --request POST http://192.168.10.49:8083/v1/sessions
{"sessionHandle":"5e2c4c35-12c2-45f2-b87c-66506a32ca48"}

# operation_handle
[root@minio_api_1049 conf]# curl --request POST http://192.168.10.49:8083/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/statements/ --data '{"statement": "SELECT 1"}'
{"operationHandle":"e1774434-b657-4b20-ab2b-2579f3c0fc47"}

[root@minio_api_1049 conf]# curl --request GET http://localhost:8083//v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/operations/e1774434-b657-4b20-ab2b-2579f3c0fc47/status
{"status":"FINISHED"}

12)、/sessions/:session_handle/statements

  • 描述
/sessions/:session_handle/statements
http请求方式: POSThttp响应码: 200 OK
Execute a statement.
路径参数
  • session_handle - 上文示例中获取的session,作为url中的一个路径参数。
  • 请求报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementRequestBody",
  "properties" : {
    "executionConfig" : {
      "type" : "object",
      "additionalProperties" : {
        "type" : "string"
      }
    },
    "executionTimeout" : {
      "type" : "integer"
    },
    "statement" : {
      "type" : "string"
    }
  }
}
  • 响应报文
{
  "type" : "object",
  "id" : "urn:jsonschema:org:apache:flink:table:gateway:rest:message:statement:ExecuteStatementResponseBody",
  "properties" : {
    "operationHandle" : {
      "type" : "string"
    }
  }
}
  • 示例
# session_handle
[root@minio_api_1049 conf]#  curl --request POST http://192.168.10.49:8083/v1/sessions
{"sessionHandle":"5e2c4c35-12c2-45f2-b87c-66506a32ca48"}

[root@minio_api_1049 conf]# curl --request POST http://localhost:8083/v1/sessions/5e2c4c35-12c2-45f2-b87c-66506a32ca48/statements
{"operationHandle":"81432111-c49a-4c6b-b5cc-c7657e68a1ab"}

4、Data Type Mapping

截至Flink 1.17版本,REST端点支持使用查询参数rowFormat序列化RowData。REST端点使用JSON格式来序列化表对象。请参考映射的JSON格式。
REST端点还支持使用PLAIN_TEXT格式序列化RowData,该格式会自动将所有列强制转换为String。

三、HiveServer2 Endpoint

HiveServer2 Endpoint is compatible with HiveServer2 wire protocol and allows users to interact (e.g. submit Hive SQL) with Flink SQL Gateway with existing Hive clients, such as Hive JDBC, Beeline, DBeaver, Apache Superset and so on.
HiveServer2 Endpoint与HiveServer2有线协议兼容,允许用户与现有的配置单元客户端(如配置单元JDBC、Beeline、DBeaver、Apache Superset等)进行Flink SQL网关交互(例如提交配置单元SQL)。

It suggests to use HiveServer2 Endpoint with Hive Catalog and Hive dialect to get the same experience as HiveServer2. Please refer to the Hive Compatibility for more details.

它建议将HiveServer2 Endpoint与配置单元目录和配置单元方言一起使用,以获得与HiveServer2相同的体验。有关详细信息,请参阅配置单元兼容性。

以上,简单介绍了Flink gateway的启动配置、支持的api以及简单的hiveserver2 endpoint部分,特别是api均以示例进行说明。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/175135.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【opencv】计算机视觉:停车场车位实时识别

目录 目标 整体流程 背景 详细讲解 目标 我们想要在一个实时的停车场监控视频中&#xff0c;看看要有多少个车以及有多少个空缺车位。然后我们可以标记空的&#xff0c;然后来车之后&#xff0c;实时告诉应该停在那里最方便、最近&#xff01;&#xff01;&#xff01;实现…

轻量封装WebGPU渲染系统示例<37>- 多个局部点光源应用于非金属材质形成的效果(源码)

当前示例源码github地址: https://github.com/vilyLei/voxwebgpu/blob/feature/rendering/src/voxgpu/sample/BasePbrMaterialMultiLights.ts 当前示例运行效果: 此示例基于此渲染系统实现&#xff0c;当前示例TypeScript源码如下&#xff1a; export class BasePbrMaterial…

Linux免密登录——A登录B密钥设置(SSH SCP)

密钥登录 密钥登录比帐号密码方式更安全、更方便&#xff0c;并提供了更多的自动化和批处理选项。 安全性&#xff1a;使用非对称加密算法&#xff0c;公钥存在服务器&#xff0c;私钥存在本地计算机&#xff0c;私钥不在网络传输&#xff0c;降低被黑客截获风险。强密码&#…

验证回文串

题目链接 验证回文串 题目描述 注意点 1 < s.length < 200000s 仅由可打印的 ASCII 字符组成将所有大写字符转换为小写字符忽略所有非字母数字字符 解答思路 首先将大写字母转为小写字母&#xff0c;再双指针分别从首尾判断对应位置的字符是否相同&#xff0c;注意当…

基于AVR单片机的视觉追踪算法研究与实现

基于AVR单片机的视觉追踪算法研究与实现是一项复杂而有挑战性的工作&#xff0c;旨在实现单片机对特定目标的实时追踪。本文将介绍基于AVR单片机的视觉追踪算法的原理和实现步骤&#xff0c;并提供相应的代码示例。 1. 概述 视觉追踪是一项涉及图像处理和计算机视觉领域的技术…

数据中台之用户画像

用户画像应用领域较为广泛,适合于各个产品周期,从新用户的引流到潜在用户的挖掘、 从老用户 的培养到流失用户的回流等。通过挖掘用户兴趣、偏好、人口统计特征,可以 直接 作用于提升营销精准 度、推荐匹配度,最终提升产品服务和企业利润。还包括广告投放、产品布局和行业报…

单链表OJ--8.相交链表

8.相交链表 160. 相交链表 - 力扣&#xff08;LeetCode&#xff09; /* 解题思路&#xff1a; 此题可以先计算出两个链表的长度&#xff0c;让长的链表先走相差的长度&#xff0c;然后两个链表同时走&#xff0c;直到遇到相同的节点&#xff0c;即为第一个公共节点 */struct Li…

猫咪不长肉怎么回事?搬空家底的增肥效果好的猫罐头分享

秋冬到了&#xff0c;北方有供暖还好&#xff0c;咱南方的小猫咪全靠一身正气&#xff0c;不囤点脂肪天生怕冷的小猫咪要怎么过冬啊&#xff1f;咋吃都吃不胖的猫可愁怀铲屎官了&#xff0c;想想我新手养猫那些年&#xff0c;为了给我家猫养胖点我是做了不少努力&#xff0c;当…

SpringBoot:ch03 yml 数据绑定示例

前言 Spring Boot 提供了强大的配置能力&#xff0c;通过 YAML 文件进行数据绑定是一种常见且便捷的方式。在本示例中&#xff0c;我们将演示如何利用 Spring Boot 的特性&#xff0c;通过 YAML 文件实现数据绑定。借助于 YAML 的简洁语法和结构化特性&#xff0c;我们能够轻松…

vue3-响应式函数

​&#x1f308;个人主页&#xff1a;前端青山 &#x1f525;系列专栏&#xff1a;Vue篇 &#x1f516;人终将被年少不可得之物困其一生 依旧青山,本期给大家带来vue篇专栏内容:vue3-响应式函数 目录 ref 响应式函数 引言&#xff1a; ref 函数 reactive 函数 Reactive 与…

详解python淘宝秒杀抢购脚本程序实现

文章目录 前言一、官网下载火狐浏览器二、下载geckodriver&#xff0c;并解压到火狐浏览器文件夹根目录三、添加火狐浏览器根目录到系统环境变量四、下载并安装python及pycharm开发工具五、进入淘宝六、使用Pycharm运行脚本&#xff0c;新建python文件&#xff0c;将代码复制到…

英语六级范文模板

目录 现象解释 观点选择 问题解决 六级只考议论文&#xff0c;我们将从现象解释&#xff0c;观点选择&#xff0c;问题解决三个角度给出范文&#xff1a; 多次使用的句子&#xff0c;就可以作为模板记下来~~ 现象解释 In the contemporary world, the ability to meet cha…

单链表OJ题--9.环形链表

9.环形链表 141. 环形链表 - 力扣&#xff08;LeetCode&#xff09; /* 解题思路&#xff1a; 定义快慢指针fast,slow, 如果链表确实有环&#xff0c;fast指针一定会在环内追上slow指针。 */typedef struct ListNode Node; bool hasCycle(struct ListNode *head) {Node* slow …

程序员指南六:数据平面开发套件

PORT HOTPLUG FRAMEWORK 端口热插拔框架为DPDK应用程序提供在运行时附加和分离端口的能力。由于该框架依赖于PMD实现&#xff0c;PMD无法处理的端口超出了该框架的范围。此外&#xff0c;在从DPDK应用程序分离端口后&#xff0c;该框架不提供从系统中移除设备的方法。对于由物…

【代码随想录】刷题笔记Day32

前言 实在不想做项目&#xff0c;周末和npy聊了就业的焦虑&#xff0c;今天多花点时间刷题&#xff01;刷刷刷刷&#xff01; 93. 复原 IP 地址 - 力扣&#xff08;LeetCode&#xff09; 分割startindex类似上一题&#xff0c;难点在于&#xff1a;判断子串合法性(0~255)、&…

锯木棍

题目描述 有一根粗细均匀长度为 L 的木棍&#xff0c;先用红颜色刻度线将它 m 等分&#xff0c;再用蓝色刻度线将 其 n 等分&#xff08; m>n &#xff09;&#xff0c;然后按所有刻度线将该木棍锯成小段&#xff0c;计算并输出长度最长的木棍的长度和根数。 输入格式…

移动机器人,开启智能柔性制造新篇章

智能制造是当今工业发展的必然趋势&#xff0c;而柔性制造则是智能制造的重要组成部分。在这个快速变革的时代&#xff0c;如何提高生产效率、降低成本、增强灵活性成为了制造业的关键挑战。富唯智能移动机器人应运而生&#xff0c;为柔性制造注入了新的活力。 基于富唯智能AI-…

泛型边界的问题

作者简介&#xff1a;大家好&#xff0c;我是smart哥&#xff0c;前中兴通讯、美团架构师&#xff0c;现某互联网公司CTO 联系qq&#xff1a;184480602&#xff0c;加我进群&#xff0c;大家一起学习&#xff0c;一起进步&#xff0c;一起对抗互联网寒冬 我们花了两篇文章讲述了…

40、Flink 的Apache Kafka connector(kafka source的介绍及使用示例)-1

Flink 系列文章 1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接 13、Flink 的table api与sql的基本概念、通用api介绍及入门示例 14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性 15、Flink 的ta…