Flink之Table API SQL连接器

连接器

  • Table API & SQL连接器
    • 1.概述
    • 2.支持连接器
  • DataGen连接器
    • 1.概述
    • 2.SQL客户端执行
    • 3.Table API执行
  • FileSystem连接器
    • 1.创建FileSystem映射表
    • 2.创建source数据源表
    • 3.写入数据
    • 4.解决异常
    • 5.查询fileTable
    • 6.查看HDFS
  • Kafka连接器
    • 1.添加kafka连接器依赖
    • 2.重启yarn-session、sql-client
    • 3.创建Kafka映射表
    • 4.创建source数据源表
    • 5.插入Kafka表
    • 6.查询Kafka表
  • Upsert Kafka连接器
    • 1.添加kafka连接器依赖
    • 2.重启yarn-session、sql-client
    • 3.创建upsert-kafka的映射表
    • 4.创建source数据源表
    • 5.插入upsert-kafka表
    • 6.查询upsert-kafka表
  • JDBC连接器
    • 1.添加JDBC连接器依赖
    • 2.重启flink集群和sql-client
    • 3.创建JDBC表
    • 4.创建JDBC映射表
    • 5.写入数据
    • 6.查询数据
  • MongoDB连接器
    • 1.添加依赖
    • 2.示例
    • 3.验证
  • Elasticsearch连接器
    • 1.添加依赖
    • 2.示例
    • 3.验证

Table API & SQL连接器

1.概述

Flink的Table API 和 SQL 程序可以连接到其他外部系统,以读写批处理和流式表。表源提供对存储在外部系统(例如数据库、键值存储、消息队列或文件系统)中的数据的访问。表接收器将表发送到外部存储系统。根据源和接收器的类型,它们支持不同的格式,例如CSV、Avro、Parquet 或 ORC。

2.支持连接器

Flink原生支持各种连接器,以下是所有可用的连接器。

名称版本数据源数据接收器
文件系统有界和无界扫描流式接收器,批处理接收器
Elasticsearch6.x & 7.x不支持流式接收器,批处理接收器
Opensearch1.x & 2.x不支持流式接收器,批处理接收器
Apache Kafka0.10+无界扫描流式接收器,批处理接收器
Amazon DynamoDB不支持流式接收器,批处理接收器
Amazon Kinesis Data Streams无界扫描流式接收器
Amazon Kinesis Data Firehose不支持流式接收器
JDBC有界扫描,查找流式接收器,批处理接收器
Apache HBase1.4.x & 2.2.x有界扫描,查找流式接收器,批处理接收器
Apache Hive支持的版本无界扫描,有界扫描,查找流式接收器,批处理接收器
MongoDB有界扫描,查找流式接收器,批处理接收器

注意:这些连接器可以使用FlinkTable API操作,也可以使用Flink SQL客户端操作。

DataGen连接器

1.概述

DataGen 连接器允许按数据生成规则进行读取。

每个列,都有两种生成数据的方法:
1.随机生成器:

默认的生成器,您可以指定随机生成的最大和最小值。char、varchar、binary、varbinary, string (类型)可以指定长度。它是无界的生成器。

2.序列生成器:

可以指定序列的起始和结束值。它是有界的生成器,当序列数字达到结束值,读取结束。

连接器参数说明:

参数描述
connector参数必须,指定要使用的连接器,这里是 ‘datagen’
rows-per-second可选参数,默认值10000,每秒生成的行数,用以控制数据发出速率
fields.#.kind可选参数,默认值random,指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’
fields.#.min可选参数随机生成器的最小值,适用于数字类型
fields.#.max可选参数随机生成器的最大值,适用于数字类型
fields.#.length可选参数,默认值100,随机生成器生成字符的长度,适用于 char、varchar、binary、varbinary、string
fields.#.start可选参数,序列生成器的起始值
fields.#.end可选参数,序列生成器的结束值

2.SQL客户端执行

在 Flink SQL客户端中创建DataGen表

Flink SQL> CREATE TABLE datagen (
 id INT,
 name STRING,
 age INT,
 ts AS localtimestamp,
 WATERMARK FOR ts AS ts
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.f_sequence.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='50',
 'fields.age.min'='1',
 'fields.age.max'='150',
 'fields.name.length'='10'
)

查询表

Flink SQL> show tables;
+------------+
| table name |
+------------+
|  datagen   |
+------------+
1 row in set

执行查询

Flink SQL> select * from datagen;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     f82aac1d2e |          97 |
| +I |           2 |                     09bc2c62a6 |          75 |
| +I |           3 |                     9e3e0cca2f |         146 |
| +I |           4 |                     05bca80edc |          61 |
| +I |           5 |                     93bfca82f7 |          54 |

3.Table API执行

使用Table API操作,需要引入相关依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--负责Table API和下层DataStream API的连接支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--在本地的集成开发环境里运行Table APISQL的支持-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

在程序中通过Table API操作DataGen SQL连接器

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id INT,\n" +
                " name STRING,\n" +
                " age INT,\n" +
                " ts AS localtimestamp,\n" +
                " WATERMARK FOR ts AS ts\n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='50',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();
        Table select = tableEnv.from("datagen").select($("id"), $("name"), $("age"));
        // 打印
        select.execute().print();
    }

控制台执行结果如下:

+------------+
| table name |
+------------+
|    datagen |
+------------+
1 row in set
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     f82aac1d2e |          97 |
| +I |           2 |                     09bc2c62a6 |          75 |
| +I |           3 |                     9e3e0cca2f |         146 |
| +I |           4 |                     05bca80edc |          61 |

FileSystem连接器

文件系统连接器,不需要添加额外的依赖。相应的 jar 包可以在 Flink 工程项目的 /lib 目录下找到。从文件系统中读取或者向文件系统中写入行时,需要指定相应的 format。

1.创建FileSystem映射表

文件系统连接器允许从本地或分布式文件系统进行读写。文件系统表可以定义为:

CREATE TABLE fileTable( id int, name string, age int )
WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://node01:9000/flink/fileTable',
  'format' = 'json'
);

2.创建source数据源表

CREATE TABLE datagen (
 id INT,
 name  string,
 age  INT
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.id.kind'='sequence',
 'fields.id.start'='1',
 'fields.id.end'='1000',
 'fields.age.min'='1',
 'fields.age.max'='100',
 'fields.name.length'='10'
);

3.写入数据

Flink SQL> insert into fileTable select * from datagen;
[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory

4.解决异常

在执行插入操作出现如下异常:

[ERROR] Could not execute SQL statement. Reason:
java.lang.ClassNotFoundException: org.apache.flink.table.planner.delegation.DialectFactory

解决方案有两种:

1.由于flink/lib目录下存在flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar导致,因此删除该Jar包

 mv lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar  lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar.back

重启Flink与SQL客户端,然后执行

Flink SQL> insert into fileTable select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c449c029599125c92da98514b512b0de

2.更换flink/lib目录下的flink-table-planner-loader-1.17.0.jar

通常会使用flink-sql-connector-hive连接器,所以不可能任意删除的,故先恢复

 mv lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar.back  lib/flink-sql-connector-hive-2.3.9_2.12-1.17.0.jar

注意:只有在使用Hive方言或HiveServer2时需要移动Jar操作,是Hive集成的推荐设置。

[root@node01 flink]# mv lib/flink-table-planner-loader-1.17.0.jar opt/

[root@node01 flink]# mv opt/flink-table-planner_2.12-1.17.0.jar lib/

重启Flink与SQL客户端,然后执行

Flink SQL> insert into fileTable select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 7a2bc2fcb7407cec33530fb086f4242f

5.查询fileTable


Flink SQL> select * from fileTable where id=1;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                     6a296add45 |           8 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

6.查看HDFS

在这里插入图片描述

Kafka连接器

Kafka 连接器提供从 Kafka topic 中消费和写入数据的能力。

1.添加kafka连接器依赖

下载连接器:flink-sql-connector-kafka

将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录

cp ./flink-sql-connector-kafka-1.17.0.jar /usr/local/program/flink/lib

2.重启yarn-session、sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建Kafka映射表

CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  `ts` TIMESTAMP(3) METADATA FROM 'timestamp',
  --当列名和元数据名一致可以省略 FROM 'xxxx', VIRTUAL表示只读 
  `partition` BIGINT METADATA VIRTUAL,
  `offset` BIGINT METADATA VIRTUAL
) WITH (
  'connector' = 'kafka',
  'topic' = 'testTopic',
  'properties.bootstrap.servers' = 'node01:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  --消息体时使用的格式
  'format' = 'json',
  --fixed为flink实现的分区器,一个并行度只写往kafka一个分区
  'sink.partitioner' = 'fixed'
);

查询表信息

Flink SQL> show tables;
+------------+
| table name |
+------------+
| KafkaTable |
+------------+
1 row in set

Flink SQL> desc KafkaTable;
+-----------+--------------+------+-----+-------------------------------+-----------+
|      name |         type | null | key |                        extras | watermark |
+-----------+--------------+------+-----+-------------------------------+-----------+
|   user_id |       BIGINT | TRUE |     |                               |           |
|   item_id |       BIGINT | TRUE |     |                               |           |
|  behavior |       STRING | TRUE |     |                               |           |
|        ts | TIMESTAMP(3) | TRUE |     |     METADATA FROM 'timestamp' |           |
| partition |       BIGINT | TRUE |     |              METADATA VIRTUAL |           |
|    offset |       BIGINT | TRUE |     |              METADATA VIRTUAL |           |
+-----------+--------------+------+-----+-------------------------------+-----------+
7 rows in set

4.创建source数据源表

CREATE TABLE datagen (
 user_id  INT,
 item_id  INT,
 behavior STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.user_id.kind'='sequence',
 'fields.user_id.start'='1',
 'fields.user_id.end'='1000',
 'fields.item_id.min'='1',
 'fields.item_id.max'='1000',
 'fields.behavior.length'='10'
);

5.插入Kafka表

Flink SQL> insert into KafkaTable(user_id,item_id,behavior) select * from datagen;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: e4679f9e14823079d2ce355d592cae93

6.查询Kafka表

Flink SQL> select * from KafkaTable;
+----+----------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+
| op |              user_id |              item_id |                       behavior |                      ts |            partition |               offset |
+----+----------------------+----------------------+--------------------------------+-------------------------+----------------------+----------------------+
| +I |                    1 |                   92 |                     c3baec3158 | 2023-07-11 23:22:45.564 |                    0 |                    0 |
| +I |                    2 |                  516 |                     804b7f09a2 | 2023-07-11 23:22:45.565 |                    0 |                    1 |
| +I |                    3 |                  784 |                     9940556819 | 2023-07-11 23:22:45.566 |                    0 |                    2 |
| +I |                    4 |                   62 |                     053ec345db | 2023-07-11 23:22:45.566 |                    0 |                    3 |
| +I |                    5 |                  375 |                     b4aa55a998 | 2023-07-11 23:22:45.566 |                    0 |                    4 |
| +I |                    6 |                  507 |                     b31794a773 | 2023-07-11 23:22:45.566 |                    0 |                    5 |

Upsert Kafka连接器

Upsert Kafka 连接器支持以upsert方式从Kafka topic中读取数据并将数据写入Kafka topic。如果当前表存在更新操作,那么普通的kafka连接器将无法满足,此时可以使用Upsert Kafka连接器。

1.作为source

upsert-kafka连接器生产changelog流,其中每条数据记录代表一个更新或删除事件。数据记录中的 value被解释为同一 key的最后一个value的UPDATE,如果不存在相应的key,则该更新被视为INSERT。

changelog流中的数据记录被解释为UPSERT,也称为INSERT/UPDATE,因为任何具有相同key的现有行都被覆盖。另外,value为空的消息将会被视作为DELETE 消息。

2.作为sink

upsert-kafka连接器可以消费changelog流。它会将 INSERT/UPDATE_AFTER数据作为正常的 Kafka 消息写入,并将 DELETE 数据以 value 为空的 Kafka 消息写入(表示对应 key 的消息被删除)。

Flink 将根据主键列的值对数据进行分区,从而保证主键上的消息有序,因此同一主键上的更新/删除消息将落在同一分区中。

1.添加kafka连接器依赖

下载连接器:flink-sql-connector-kafka

将flink-sql-connector-kafka-1.17.0.jar上传到flink的lib目录

cp ./flink-sql-connector-kafka-1.17.0.jar /usr/local/program/flink/lib

2.重启yarn-session、sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建upsert-kafka的映射表

注意:必须定义主键

CREATE TABLE kafka( 
    user_id int , 
    item_id int ,
    behavior STRING,
    primary key (item_id) NOT ENFORCED 
)
WITH (
  'connector' = 'upsert-kafka',
  'properties.bootstrap.servers' = 'node01:9092',
  'topic' = 'upsertTopic',
  'key.format' = 'json',
  'value.format' = 'json'
)

4.创建source数据源表

CREATE TABLE datagen (
 user_id  INT,
 item_id  INT,
 behavior STRING
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='5',
 'fields.user_id.kind'='sequence',
 'fields.user_id.start'='1',
 'fields.user_id.end'='1000',
 'fields.item_id.min'='1',
 'fields.item_id.max'='1000',
 'fields.behavior.length'='10'
);

5.插入upsert-kafka表

注意:当使用 GROUP BY 子句时,SELECT 子句中可以出现的列只能是分组键列或聚合函数应用的列。

insert into kafka 
select sum(user_id) as user_id, item_id, behavior 
from datagen 
group by item_id, behavior;

6.查询upsert-kafka表

upsert-kafka无法从指定的偏移量读取,只会从主题的源读取。如此,才知道整个数据的更新过程。并且通过-U+U+I等符号来显示数据的变化过程。


Flink SQL> select * from kafka;
+----+-------------+-------------+--------------------------------+
| op |     user_id |     item_id |                       behavior |
+----+-------------+-------------+--------------------------------+
| +I |          73 |         362 |                     bc998a7d48 |
| +I |          74 |         945 |                     2237c26098 |
| +I |          75 |         755 |                     2537957698 |
| -U |          70 |         604 |                     6dce4c7081 |
| +U |          76 |         604 |                     288d5d98f7 |
| +I |          77 |         228 |                     a6b76e7a0d |
| +I |          78 |         568 |                     5dcd3a80f3 |
| -U |          65 |         623 |                     9e2c52dec9 |
| +U |          79 |         623 |                     ba177640f7 |
| +I |          80 |         699 |                     9fa8198fbd |
| +I |          81 |         489 |                     2061468e63 |
| +I |          82 |         124 |                     8d2405e54c |
| +I |          83 |         115 |                     34d418f37c |

JDBC连接器

JDBC 连接器允许使用 JDBC 驱动向任意类型的关系型数据库读取或者写入数据。

Flink将数据写入外部数据库时,如果使用DDL中定义的主键,则连接器以upsert模式与外部系统交换 UPDATE/DELETE消息,否则连接器以append模式与外部系统交换消息且不支持消费 UPDATE/DELETE 消息。

在upsert模式下,Flink会根据主键插入新行或更新现有行,这样可以保证幂等性。

在追加模式下,Flink将所有记录解释为INSERT消息,如果底层数据库中发生了主键或唯一约束违反,则INSERT操作可能会失败。

为了保证输出结果符合预期,建议为表定义主键,并确保主键是底层数据库表的唯一键集或主键之一。

这里使用MySQL JDBC连接器为例。

1.添加JDBC连接器依赖

下载:flink-connector-jdbc

下载:MySQL驱动包:mysql-connector-j

上传jdbc连接器的jar包和mysql的连接驱动包到flink/lib下

cp ./flink-connector-jdbc-3.1.0-1.17.jar /usr/local/program/flink/lib

cp ./mysql-connector-j/8.0.33/mysql-connector-j-8.0.33.jar /usr/local/program/flink/lib

2.重启flink集群和sql-client

bin/start-cluster.sh

bin/sql-client.sh

3.创建JDBC表

在MySQL中的demo数据库中创建表

CREATE TABLE `flinkTable` (
  `id` bigint(20) NOT NULL,
  `name` varchar(20) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

4.创建JDBC映射表

在 Flink SQL 中注册一张 MySQL 表

CREATE TABLE mysqlTable(
  id BIGINT,
  name STRING,
  age INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://node01:3306/demo?useUnicode=true&characterEncoding=UTF-8',
    'username' = 'root',
    'password' = '123456',
	-- 连接到 JDBC 表的名称
    'table-name' = 'flinkTable',
    -- 最大重试超时时间,以秒为单位且不应该小于 1 秒
    'connection.max-retry-timeout' = '60s',
    --维表缓存的最大行数,若超过该值,则最老的行记录将会过期。
    'sink.buffer-flush.max-rows' = '500',
    --flush 间隔时间,超过该时间后异步线程将 flush 数据。
    'sink.buffer-flush.interval' = '2s',
    --查询数据库失败的最大重试次数
    'sink.max-retries' = '3',
    --用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度
    'sink.parallelism' = '1'
);

5.写入数据

Flink SQL> insert into mysqlTable values(1,'flink',20);
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 78eb69d8f16fd239bbb24ea4e13aac5c

6.查询数据

Flink SQL> select * from mysqlTable ;
+----+----------------------+--------------------------------+-------------+
| op |                   id |                           name |         age |
+----+----------------------+--------------------------------+-------------+
| +I |                    1 |                          flink |          20 |
+----+----------------------+--------------------------------+-------------+
Received a total of 1 row

MongoDB连接器

MongoDB 连接器提供了从 MongoDB 中读取和写入数据的能力。

连接器可以在 upsert 模式下运行,使用在 DDL 上定义的主键与外部系统交换 UPDATE/DELETE 消息

如果 DDL 上没有定义主键,则连接器只能以 append 模式与外部系统交换 INSERT 消息且不支持消费 UPDATE/DELETE 消息

1.添加依赖

使用Table API操作,需要额外引入MongoDB连接器依赖

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-mongodb</artifactId>
            <version>1.0.1-1.17</version>
        </dependency>

2.示例

public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建source数据源表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT \n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='10',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");

		// 创建mongodb映射表
        tableEnv.executeSql("CREATE TABLE tb_mongodb (\n" +
                "  _id STRING,\n" +
                "  name STRING,\n" +
                "  age INT,\n" +
                "  PRIMARY KEY (_id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "   'connector' = 'mongodb',\n" +
                "   'uri' = 'mongodb://IP:27017',\n" +
                "   'database' = 'my_db',\n" +
                "   'collection' = 'users'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();

        tableEnv.executeSql(" insert into tb_mongodb select * from datagen;").print();
        Table tb_mongodb = tableEnv.from("tb_mongodb").select($("_id"), $("name"), $("age"));
        // 打印
        tb_mongodb.execute().print();
    }

3.验证

查看MongoDB
在这里插入图片描述

Elasticsearch连接器

Elasticsearch连接器允许将数据写入到 Elasticsearch 引擎的索引中。

连接器可以工作在 upsert 模式,使用 DDL 中定义的主键与外部系统交换 UPDATE/DELETE 消息

如果 DDL 中没有定义主键,那么连接器只能工作在 append 模式,只能与外部系统交换 INSERT 消息

1.添加依赖

有2个版本

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch6</artifactId>
  <version>3.0.1-1.17</version>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-elasticsearch7</artifactId>
  <version>3.0.1-1.17</version>
</dependency>

注意:需要引入flink-json,否则将出现如下异常

Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'json' that implements 'org.apache.flink.table.factories.SerializationFormatFactory' in the classpath.
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>

2.示例

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        // 创建表
        tableEnv.executeSql("CREATE TABLE datagen (\n" +
                " id STRING,\n" +
                " name STRING,\n" +
                " age INT \n" +
                ") WITH (\n" +
                " 'connector' = 'datagen',\n" +
                " 'rows-per-second'='5',\n" +
                " 'fields.id.kind'='sequence',\n" +
                " 'fields.id.start'='1',\n" +
                " 'fields.id.end'='10',\n" +
                " 'fields.age.min'='1',\n" +
                " 'fields.age.max'='150',\n" +
                " 'fields.name.length'='10'\n" +
                ");");


        tableEnv.executeSql("CREATE TABLE tb_es (\n" +
                "  id STRING,\n" +
                "  name STRING,\n" +
                "  age BIGINT,\n" +
                "  PRIMARY KEY (id) NOT ENFORCED\n" +
                ") WITH (\n" +
                "  'connector' = 'elasticsearch-7',\n" +
                "  'hosts' = 'http://IP:9200',\n" +
                "  'index' = 'users'\n" +
                ");");

        // 执行查询
        tableEnv.executeSql("show tables;").print();
		// 插入数据
        tableEnv.executeSql(" insert into tb_es select * from datagen;").print();
    }

注意:由于es的表不支持source,故不能查询,查询会报如下错误

Caused by: org.apache.flink.table.api.ValidationException: Connector 'elasticsearch-7' can only be used as a sink. It cannot be used as a source.

3.验证

使用ElasticSearch Head查看连接查看

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/133238.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

vue.cli 中怎样使用自定义的组件

目录 创建自定义组件 注册并使用自定义组件 全局注册自定义组件 使用 Props 传递数据 总结 前言 在Vue CLI中使用自定义组件是构建交互式和模块化Web应用的重要一环。Vue CLI为开发者提供了使用自定义组件的灵活性和简便性。通过Vue CLI&#xff0c;你可以创建、注册和使…

【算法练习Day45】最长公共子序列不相交的线最大子数组和

​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;练题 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录 最长公共子序列不相交的线最…

开发者眼中的向量数据库应用领域

目录 引言向量数据库概念向量数据库优势应用领域亚马逊云科技向量数据库向量数据库的使用步骤最后 引言 随着人工智能和大数据技术的快速发展&#xff0c;越来越多的技术倾向于数据存储方面&#xff0c;数据库领域也随着人工智能和大数据的发展而发展&#xff0c;尤其是向量…

月销破30万辆后,比亚迪整了波大的

最近乘联会公布了 2023 年 10 月新能源乘用车厂商销量榜单。 其中最为亮眼犹如鹤立鸡群的榜首&#xff0c;没错依然是我们熟悉的那个迪子&#xff01; 单月销量超 30 万辆&#xff0c;相较去年同期暴涨 38.4%&#xff0c;创下了比亚迪有史以来新高。 同时也成为了国内首个月销…

PEFT概述:最先进的参数高效微调技术

了解参数高效微调技术&#xff0c;如LoRA&#xff0c;如何利用有限的计算资源对大型语言模型进行高效适应。 PEFT概述&#xff1a;最先进的参数高效微调技术 什么是PEFT什么是LoRA用例使用PEFT训练LLMs入门PEFT配置4位量化封装基础Transformer模型保存模型加载模型推理 结论 什…

Java学习 9.Java-数组 讲解及习题

一、数组的定义与使用 1.数组的基本概念 1.1 为什么要使用数组 数组是最简单的一种数据结构 组织一组相同类型数据的集合 数据结构本身是来描述和组织数据的 数据加结构 1.2.1 数组的创建 代码实现 new int 可省略&#xff1b; char[] chars{a,b,c};//定义一个整形类型数…

在线免费语音克隆工具,1分钟,复制你的声音

hi&#xff0c;同学们&#xff0c;我是赤辰。玩自媒体这么多年&#xff0c;总结出凡是用自己的声音做短视频&#xff0c;既有识别度&#xff0c;也更容易上热门&#xff0c;但是录制音频的艰难&#xff0c;试过的都知道&#xff0c;市面上也有很多克隆工具&#xff0c;但是基本…

【Git】Git分支与标签掌握这些技巧让你成为合格的码农

&#x1f389;&#x1f389;欢迎来到我的CSDN主页&#xff01;&#x1f389;&#x1f389; &#x1f3c5;我是Java方文山&#xff0c;一个在CSDN分享笔记的博主。&#x1f4da;&#x1f4da; &#x1f31f;推荐给大家我的专栏《Git》。&#x1f3af;&#x1f3af; &#x1f449…

Qt——连接mysql增删查改(仓库管理极简版)

目录 UI布局设计 .pro文件 mainwindow.h main.cpp UI布局设计 .pro文件 QT core gui QT core gui sql QT sqlgreaterThan(QT_MAJOR_VERSION, 4): QT widgetsCONFIG c11# The following define makes your compiler emit warnings if you use # any …

【算法-链表4】环形链表2的两种解法

今天&#xff0c;带来链表相关算法的讲解。文中不足错漏之处望请斧正&#xff01; 理论基础点这里 环形链表 1. 思路 利用链表相交 我们在环内任意一处断开&#xff0c;然后判断断开处的下一个位置和head是否相交&#xff0c;如果相交&#xff0c;相交处就是环口。 公式法 …

ArcGIS10.8 连接 PostgreSQL 及遇到的两个问题

前提 以前同事用过我的电脑连PostgreSQL&#xff0c;失败了。当时不知道原因&#xff0c;只能使用GeoServer来发布数据了。现在终于搞明白了&#xff0c;原因是ArcGIS10.2版本太老&#xff0c;无法连接PostgreSQL9.4。参考这里 为了适应时代的发展&#xff0c;那我就用新的Ar…

Spark的转换算子和操作算子

1 Transformation转换算子 1.1 Value类型 1&#xff09;创建包名&#xff1a;com.shangjack.value 1.1.1 map()映射 参数f是一个函数可以写作匿名子类&#xff0c;它可以接收一个参数。当某个RDD执行map方法时&#xff0c;会遍历该RDD中的每一个数据项&#xff0c;并依次应用f函…

python Flask框架,调用MobileNetV2图像分类模型,实现前端上传图像分类

python Flask框架&#xff0c;调用MobileNetV2图像分类模型&#xff0c;实现前端上传图像分类 今天博主介绍一个图像分类的小项目 基于flask 和mobileNetV2模型的前端图像分类项目 环境配置如下&#xff1a; python版本3.7.6 安装库的版本如下&#xff1a; tensorflow 2.11.…

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错

LabVIEW中NIGPIB设备与驱动程序不相关的MAX报错 当插入GPIB-USB设备时&#xff0c;看到了NI MAX中列出该设备&#xff0c;但却显示了黄色警告指示&#xff0c;并且指出Windows没有与您的设备相关的驱动程序。 解决方案 需要安装能兼容的NI-488.2驱动程序。 通过交叉参考以下有…

STM32--时钟树

一、什么是时钟&#xff1f; 时钟是单片机的脉搏&#xff0c;是系统工作的同步节拍。单片机上至CPU&#xff0c;下至总线外设&#xff0c;它们工作时序的配合&#xff0c;都需要一个同步的时钟信号来统一指挥。时钟信号是周期性的脉冲信号。 二、什么是时钟树&#xff1f; S…

“Git实践指南:深入探索开发测试上线、分支管理与标签“

文章目录 引言一、Git的分支的使用1.分支2.标签3.分支与标签的关系4. 分支在实际中的作用5. 四个环境以及各自的功能特点6. 分支策略分支应用场景 二、Git的标签3.1 标签的基本使用3.3 标签的共享与推送 总结 引言 在现代软件开发中&#xff0c;版本控制是一个关键的环节&…

2023年【危险化学品经营单位主要负责人】免费试题及危险化学品经营单位主要负责人证考试

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年危险化学品经营单位主要负责人免费试题为正在备考危险化学品经营单位主要负责人操作证的学员准备的理论考试专题&#xff0c;每个月更新的危险化学品经营单位主要负责人证考试祝您顺利通过危险化学品经营单位主…

【扩散模型】万字长文全面理解与应用Stable Diffusion

万字长文全面理解与应用Stable Diffusion 1. Stable Diffusion简介1.1 基本概念1.2 主体结构1.3 训练细节1.4 模型评测1.5 模型应用1.6 模型版本1.7 其他类型的条件生成模型1.8 使用DreamBooth进行微调 2. 实战Stable Diffusion2.1 环境准备2.2 从文本生成图像2.3 Stable Diffu…

LIBGDX实时绘制字符、实时绘制中文

LIBGDX实时绘制字符、实时绘制中文 转自&#xff1a;https://lingkang.top/archives/libgdx-shi-shi-hui-zhi-zi-fu 注意&#xff0c;相比于贴图字体&#xff0c;实时绘制会有一定的失真、模糊 Maven项目依赖&#xff1a; <properties><maven.compiler.source>…

抢量双11!抖音商城「官方立减」 缘何成为“爆单神器”?

10月20日抖音商城双11好物节正式开跑&#xff0c;仅仅三天&#xff0c;抖音商城整体GMV对比去年同期提升了200%&#xff0c;而在开跑一周后&#xff0c;一些品牌的销售额已经超过了今年整个618&#xff0c;可谓增势迅猛。其中&#xff0c;平台官方特别推出的「官方立减」玩法&a…