【大数据】Flink 测试利器:DataGen

Flink 测试利器:DataGen

  • 1.什么是 FlinkSQL ?
  • 2.什么是 Connector ?
  • 3.DataGen Connector
    • 3.1 Demo
    • 3.2 支持的类型
    • 3.3 连接器属性
  • 4.DataGen 使用案例
    • 4.1 场景一:生成一亿条数据到 Hive 表
    • 4.2 场景二:持续每秒生产 10 万条数到消息队列
  • 5.思考

1.什么是 FlinkSQL ?

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和优化器构建的,支持 ANSI SQL 标准,允许使用标准的 SQL 语句来处理流式和批处理数据。通过 Flink SQL,可以以声明式的方式描述数据处理逻辑,而无需编写显式的代码。使用 Flink SQL,可以执行各种数据操作,如 过滤聚合连接转换 等。它还提供了 窗口操作时间处理复杂事件处理 等功能,以满足流式数据处理的需求。

Flink SQL 提供了许多扩展功能和语法,以适应 Flink 的流式和批处理引擎的特性。它是 Flink 最高级别的抽象,可以与 DataStream API 和 DataSet API 无缝集成,利用 Flink 的分布式计算能力和容错机制。

在这里插入图片描述
使用 Flink SQL 处理数据的基本步骤:

  • 定义输入表:使用 CREATE TABLE 语句定义输入表,指定表的模式(字段和类型)和数据源(如 Kafka、文件等)。
  • 执行 SQL 查询:使用 SELECT、INSERT INTO 等 SQL 语句来执行数据查询和操作。您可以在 SQL 查询中使用各种内置函数、聚合操作、窗口操作和时间属性等。
  • 定义输出表:使用 CREATE TABLE 语句定义输出表,指定表的模式和目标数据存储(如 Kafka、文件等)。
  • 提交作业:将 Flink SQL 查询作为 Flink 作业提交到 Flink 集群中执行。Flink 会根据查询的逻辑和配置自动构建执行计划,并将数据处理任务分发到集群中的任务管理器进行执行。

总而言之,我们可以通过 Flink SQL 查询和操作来处理流式和批处理数据。它提供了一种简化和加速数据处理开发的方式,尤其适用于熟悉 SQL 的开发人员和数据工程师。

2.什么是 Connector ?

Flink Connector 是指 用于连接外部系统和数据源的组件。它允许 Flink 通过特定的连接器与不同的数据源进行交互,例如数据库、消息队列、文件系统等。它负责处理与外部系统的通信、数据格式转换、数据读取和写入等任务。无论是作为输入数据表还是输出数据表,通过使用适当的连接器,可以在 Flink SQL 中访问和操作外部系统中的数据。目前实时平台提供了很多常用的连接器:

例如:

  • JDBC:用于与关系型数据库(如 MySQL、PostgreSQL)建立连接,并支持在 Flink SQL 中读取和写入数据库表的数据。
  • JDQ:用于与 JDQ 集成,可以读取和写入 JDQ 主题中的数据。
  • Elasticsearch:用于与 Elasticsearch 集成,可以将数据写入 Elasticsearch 索引或从索引中读取数据。
  • File Connector:用于读取和写入各种文件格式(如 CSV、JSON、Parquet)的数据。
  • ……

还有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于与不同的数据源进行集成。通过使用 Flink SQL Connector,我们可以轻松地与外部系统进行数据交互,将数据导入到 Flink 进行处理,或 将处理结果导出到外部系统

在这里插入图片描述

3.DataGen Connector

DataGen 是 Flink SQL 提供的一个内置连接器,用于生成模拟的测试数据,以便在开发和测试过程中使用。

使用 DataGen,可以生成具有不同数据类型和分布的数据,例如整数、字符串、日期等。这样可以模拟真实的数据场景,并帮助验证和调试 Flink SQL 查询和操作。

3.1 Demo

以下是一个使用 DataGen 函数的简单示例:

-- 创建输入表
CREATE TABLE input_table (
	order_number BIGINT,
	price DECIMAL(32,2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3)
) WITH (
	'connector' = 'datagen',
);

在上面的示例中,我们使用 DataGen 连接器创建了一个名为 input_table 的输入表。该表包含了 order_numberpricebuyerorder_time 四个字段。默认是 Random 随机生成对应类型的数据,生产速率是 10000 10000 10000 条/秒,只要任务不停,就会源源不断的生产数据。当然也可以指定一些参数来定义生成数据的规则,例如每秒生成的行数、字段的数据类型和分布。

生成的数据样例:

{
    "order_number": -6353089831284155505,
    "price": 253422671148527900374700392448,
    "buyer": {
        "first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
        "last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
    },
    "order_time": "2023-09-21 06:22:29.618"
}
{
    "order_number": 1102733628546646982,
    "price": 628524591222898424803263250432,
    "buyer": {
        "first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
        "last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
    },
    "order_time": "2023-09-21 06:23:01.69"
}

3.2 支持的类型

字段类型数据生成方式
BOOLEANrandom
CHARrandom / sequence
VARCHARrandom / sequence
STRINGrandom / sequence
DECIMALrandom / sequence
TINYINTrandom / sequence
SMALLINTrandom / sequence
INTrandom / sequence
BIGINTrandom / sequence
FLOATrandom / sequence
DOUBLErandom / sequence
DATErandom
TIMErandom
TIMESTAMPrandom
TIMESTAMP_LTZrandom
INTERVAL YEAR TO MONTHrandom
INTERVAL DAY TO MONTHrandom
ROWrandom
ARRAYrandom
MAPrandom
MULTISETrandom

3.3 连接器属性

属性是否必填默认值类型
描述
connectorrequired(none)String‘datagen’
rows-per-secondoptional 10000 10000 10000Long数据生产速率
number-of-rowsoptional(none)Long指定生产的数据条数,默认是不限制
fields.#.kindoptionalrandomString指定字段的生产数据的方式 random 还是 sequence
fields.#.minoptional(Minimum value of type)(Type of field)random 生成器的指定字段 # 最小值,支持数字类型
fields.#.maxoptional(Maximum value of type)(Type of field)random 生成器的指定字段 # 最大值,支持数字类型
fields.#.lengthoptional 100 100 100Integerchar / varchar / string / array / map / multiset 类型的长度
fields.#.startoptional(none)(Type of field)sequence 生成器的开始值
fields.#.endoptional(none)(Type of field)sequence 生成器的结束值

4.DataGen 使用案例

4.1 场景一:生成一亿条数据到 Hive 表

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) WITH ( 
	'connector'='datagen', 
	'number-of-rows'='100000000',
	'rows-per-second' = '100000'
);


CREATE CATALOG myhive
WITH (
	'type'='hive',
	'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
	'partition.time-extractor.timestamp-pattern'='$dt',
	'sink.partition-commit.trigger'='partition-time',
	'sink.partition-commit.delay'='1 h',
	'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

当每秒生产 10 万条数据的时候,17 分钟左右就可以完成,当然我们可以通过增加 Flink 任务的计算节点、并行度、提高生产速率 rows-per-second 的值等来更快速的完成大数据量的生产。

4.2 场景二:持续每秒生产 10 万条数到消息队列

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price INT,
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_array ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH ( 
	'connector'='datagen', --连接器类型
	'rows-per-second'='100000', --生产速率
	'fields.order_number.kind'='random', --字段order_number的生产方式
	'fields.order_number.min'='1', --字段order_number最小值
	'fields.order_number.max'='1000', --字段order_number最大值
	'fields.price.kind'='sequence', --字段price的生产方式
	'fields.price.start'='1', --字段price开始值
	'fields.price.end'='1000', --字段price最大值
	'fields.col_array.element.length'='5', --每个元素的长度
	'fields.col_map.key.length'='5', --map key的长度
	'fields.col_map.value.length'='5' --map value的长度
);

CREATE TABLE jdqsink1 (
	order_number BIGINT,
	price DECIMAL(32, 2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_ARRAY ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH (
	'connector'='jdq',
	'topic'='jrdw-fk-area_info__1',
	'jdq.client.id'='xxxxx',
	'jdq.password'='xxxxxxx',
	'jdq.domain'='db.test.group.com',
	'format'='json'
);

INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;

5.思考

通过以上案例可以看到,通过 Datagen 结合其他连接器可以模拟各种场景的数据。

  • 性能测试:我们可以利用 Flink 的高处理性能,来调试任务的外部依赖的阈值(超时,限流等)到一个合适的水位,避免自己的任务有过多的外部依赖出现木桶效应。
  • 边界条件测试:我们通过使用 Flink DataGen 生成特殊的测试数据,如最小值、最大值、空值、重复值等来验证 Flink 任务在边界条件下的正确性和鲁棒性。
  • 数据完整性测试:我们通过 Flink DataGen 可以生成包含错误或异常数据的数据集,如无效的数据格式、缺失的字段、重复的数据等。从而可以测试 Flink 任务对异常情况的处理能力,验证 Flink 任务在处理数据时是否能够正确地保持数据的完整性。

总之,Flink DataGen 是一个强大的工具,可以帮助测试人员构造各种类型的测试数据。通过合理的使用 ,测试人员可以更有效地进行测试,并发现潜在的问题和缺陷。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/333460.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

FunTester 性能测试中获取 JVM 资源信息

在以往性能测试中&#xff0c;通常施压机的硬件资源不会成为压力瓶颈&#xff0c;但是在多任务并行的场景中&#xff0c;如果一个任务占用当前机器资源过多&#xff0c;会影响其他任务执行。或者当前用例本身存在问题&#xff0c;导致性能无法进一步提升&#xff0c;影响了性能…

鸿蒙开发系列教程(四)--ArkTS语言:基础知识

1、ArkTS语言介绍 ArkTS是HarmonyOS应用开发语言。它在保持TypeScript&#xff08;简称TS&#xff09;基本语法风格的基础上&#xff0c;对TS的动态类型特性施加更严格的约束&#xff0c;引入静态类型。同时&#xff0c;提供了声明式UI、状态管理等相应的能力&#xff0c;让开…

脏牛漏洞(CVE-2016-5195)复现过程(详细完整版)

1、实验环境 KaLi 攻击机 Linux靶机 靶场 实验目的&#xff1a; 掌握漏洞利用的方法 掌握脏牛漏洞的原理 提高对内核安全性的认识 2、靶场搭建 VMware导入靶场 靶场地址&#xff1a;链接&#xff1a;百度网盘 请输入提取码百度网盘为您提供文件的网络备份、同步和分享服务。…

SEO品牌推广的核心步骤

在当今数字化的商业环境中&#xff0c;SEO&#xff08;搜索引擎优化&#xff09;品牌推广已经成为企业不可或缺的一部分。通过优化网站&#xff0c;提高在搜索引擎结果中的排名&#xff0c;企业能够更好地吸引潜在客户&#xff0c;提升品牌知名度。本文将专心分享如何做好SEO品…

线性表的顺序存储实现

前言 线性表的顺序存储及基本操作的实现 一、线性表 线性表&#xff08;List&#xff09;是由同类型数据元素构成有序序列的线性结构&#xff0c;用户处理线性表数据时常常需要初始化、查找、插入、删除、计算数据长度等操作。 线性表还包含以下几个要素&#xff1a; 表中元…

C语言编译和链接

翻译环境和运行环境 在ANSI C的任何一种实现中&#xff0c;存在两个不同的环境 .第一种是翻译环境&#xff0c;在这个环境中源代码被转换为可执行的机器指令 .第二种是执行环境&#xff0c;它用于实际执行代码 翻译环境 翻译环境是由编译和链接两个大过程组成&#xff0c;而…

交叉编译工具 aarch64-linux-gnu-gcc 的介绍与安装

AArch64 是随 ARMv8 ISA 一起引入的 64 位架构&#xff0c;用于执行 A64 指令的计算机。而且在 AArch64 状态下执行的代码只能使用 A64 指令集。&#xff0c;而不能执行 A32 或 T32 指令。但是&#xff0c;与 AArch32 中不同&#xff0c;在64位状态下&#xff0c;指令可以访问 …

ArcGIS Pro控件汇总

控件来源 我们对其一一进行查看是否有控件 查看位置 控件展示 ribbonControls 展示 代码 <controls:ProWindow x:Class"ProAppModule9.ProWindowRibbon"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:controls"clr-…

集成学习算法(Bagging 思想、Boosting思想)及具体案例

概述&#xff1a;是机器学习中的一种思想&#xff0c;通过多个模型的组合形成一个精度更高的模型&#xff0c;参与组合的模型称为弱学习器 1、Bagging 思想 有放回的抽样&#xff08;booststrap抽样&#xff09;产生不同的训练集&#xff0c;从而训练不同的学习器&#xff1b;…

FairGuard游戏安全2023年度报告

导 读&#xff1a;2023年&#xff0c;游戏行业摆脱了疫情带来诸多负面影响&#xff0c;国内游戏市场收入与用户规模双双实现突破&#xff0c;迎来了历史新高点。但游戏黑灰产规模也在迅速扩大&#xff0c;不少游戏饱受其侵扰&#xff0c;游戏厂商愈发重视游戏安全问题。 为帮助…

重磅发布!基于百度飞桨的《人工智能基础及应用》书籍正式上线

科技日新月异的今天&#xff0c;人工智能已经成为引领未来的核心驱动力。为了帮助大家更好地深入理解人工智能的理论和技术&#xff0c;为未来发展做好准备&#xff0c;百度飞桨教材编写组联合北京交通大学王方石教授、北京邮电大学杨煜清特聘副研究员共同撰写推出了《人工智能…

使用 FFmpeg 轻松调整视频的大小/缩放/更改分辨率

在此 FFmpeg 教程中&#xff0c;我们学习使用 FFmpeg 的命令行工具更改视频的分辨率&#xff08;或调整视频的大小/缩放&#xff09;。 更改视频的分辨率&#xff08;也称为调整大小或缩放&#xff09;是视频编辑、处理和压缩中非常常见的操作。对于 ABR 视频流尤其如此&#…

【笔记】Helm-3 主题-6 Chart仓库指南

Chart仓库指南 本节介绍如何创建和使用chart仓库。在高层级中&#xff0c;chart仓库是打包的chart存储和分享的位置。 社区的Helm chart仓位于 Artifact Hub &#xff0c;欢迎加入。不过Helm也可以创建并运行您自己的chart仓库。该指南将介绍如何操作。 Artifact Hub 先决条…

防爆气象站需要如何维护

TH-FBCQX2 在工业生产中&#xff0c;防爆气象站是保障安全生产的重要设备之一。由于其特殊的使用环境和功能&#xff0c;防爆气象站的维护保养工作显得尤为重要。 一、日常维护保养 清洁&#xff1a;防爆气象站的外部和内部组件需要定期清洁&#xff0c;以去除灰尘、油渍和杂质…

快速入门:搭建宠物用品小程序商城的必备知识

小程序商城逐渐成为商家展示和销售产品的重要渠道。对于宠物用品商家来说&#xff0c;搭建一个宠物用品小程序商城不仅可以提高品牌知名度&#xff0c;还能吸引更多的潜在客户。本文将介绍如何通过乔拓云平台搭建宠物用品小程序商城。 首先&#xff0c;商家需要登录乔拓云平台后…

GaussDB与openGauss有什么相同和不同?

众所周知&#xff0c;GaussDB是华为自主创新研发的分布式关系型数据库&#xff0c;为企业提供功能全面、稳定可靠、扩展性强、性能优越的企业级数据库服务&#xff0c;openGauss是开源数据库&#xff0c;两者之间又是什么样的关系&#xff0c;有什么相同和不同&#xff0c;让我…

Git教程学习:03 记录每次更新到仓库

文章目录 1 检查当前文件状态2 跟踪新文件3 暂存已修改的文件4 状态简览5 忽略文件6 查看已暂存和未暂存的修改7 提交更新8 跳过使用暂存区域9 移除文件10 移动文件 现在我们的机器上有了一个 真实项目 的 Git 仓库&#xff0c;并从这个仓库中检出了所有文件的 工作副本。 通常…

【技术科普】什么是AI视频识别分析?干货满满!

视频AI识别分析是指利用人工智能技术对视频数据进行智能化检测、分析和提取有用信息的过程。通过视频AI分析&#xff0c;可以自动化地识别、检测和理解视频中的对象、动作、场景等元素&#xff0c;并进行标记或者相关处理&#xff0c;最后形成相应事件的处理和告警信息。这项技…

软件是什么?前端,后端,数据库

软件是什么&#xff1f; 由于很多东西没有实际接触&#xff0c;很难理解&#xff0c;对于软件的定义也是各种各样。但是我还是不理解&#xff0c;软件开发中的前端&#xff0c;后端&#xff0c;数据库到底有什么关系呢&#xff01; 这个问题足足困扰了三年半&#xff0c;练习时…

分布式ID(2):雪花算法生成ID

1 雪花算法简介 这种方案大致来说是一种以划分命名空间(UUID也算,由于比较常见,所以单独分析)来生成ID的一种算法,这种方案把64-bit分别划分成多段,分开来标示机器、时间等,比如在snowflake中的64-bit分别表示如下图(图片来自网络)所示: 41-bit的时间可以表示(1L&l…