Flink SQL DataGen Connector 示例
1、概述
使用 Flink SQL DataGen Connector,可以快速地生成符合规则的测试数据,可以在不依赖真实数据的情况下进行开发和测试。
2、使用示例
创建一个名为 “users” 的表,包含 6 个字段:id、name、age、email、created_at 和 updated_at。
在表的定义中,指定各个字段的规则:
id 字段使用序列生成器,生成的范围从 1 到 1000;
name 字段的长度为 10 个字符;
age 字段的范围从 18 到 60 岁;
email 字段的长度为随机的10个字符;
created_at 和 updated_at 字段使用随机时间生成器,时间范围从 2022 年 1 月 1 日到 2022 年 12 月 31 日。
3、官网参数介绍
1)数据类型注释
Type | Supported Generators | Notes |
---|---|---|
BOOLEAN | random | |
CHAR | random / sequence | |
VARCHAR | random / sequence | |
STRING | random / sequence | |
DECIMAL | random / sequence | |
TINYINT | random / sequence | |
SMALLINT | random / sequence | |
INT | random / sequence | |
BIGINT | random / sequence | |
FLOAT | random / sequence | |
DOUBLE | random / sequence | |
DATE | random | Always resolves to the current date of the local machine. |
TIME | random | Always resolves to the current time of the local machine. |
TIMESTAMP | random | Always resolves to the current timestamp of the local machine. |
TIMESTAMP_LTZ | random | Always resolves to the current timestamp of the local machine. |
INTERVAL YEAR TO MONTH | random | |
INTERVAL DAY TO MONTH | random | |
ROW | random | Generates a row with random subfields. |
ARRAY | random | Generates an array with random entries. |
MAP | random | Generates a map with random entries. |
MULTISET | random | Generates a multiset with random entries. |
2)连接器参数:
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定要使用的连接器,这里是 ‘datagen’。 |
rows-per-second | 可选 | 10000 | Long | 每秒生成的行数,用以控制数据发出速率。 |
fields.#.kind | 可选 | random | String | 指定 ‘#’ 字段的生成器。可以是 ‘sequence’ 或 ‘random’。 |
fields.#.min | 可选 | (Minimum value of type) | (Type of field) | 随机生成器的最小值,适用于数字类型。 |
fields.#.max | 可选 | (Maximum value of type) | (Type of field) | 随机生成器的最大值,适用于数字类型。 |
fields.#.length | 可选 | 100 | Integer | 随机生成器生成字符的长度,适用于 char、varchar、string。 |
fields.#.start | 可选 | (none) | (Type of field) | 序列生成器的起始值。 |
fields.#.end | 可选 | (none) | (Type of field) | 序列生成器的结束值。 |
4、代码示例
CREATE TABLE users (
id BIGINT,
name STRING,
age INT,
text STRING,
created_at TIMESTAMP(3),
updated_at TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
'fields.id.kind' = 'sequence',
'fields.id.start' = '1',
'fields.id.end' = '1000',
'fields.name.length' = '10',
'fields.age.min' = '18',
'fields.age.max' = '60',
'fields.text.length' = '5'
);
测试结果:
select * from users;