KDP数据分析实战:从0到1完成数据实时采集处理到可视化

智领云自主研发的开源轻量级Kubernetes数据平台,即Kubernetes Data Platform (简称KDP),能够为用户提供在Kubernetes上的一站式云原生数据集成与开发平台。在最新的v1.1.0版本中,用户可借助 KDP 平台上开箱即用的 Airflow、AirByte、Flink、Kafka、MySQL、ClickHouse、Superset 等开源组件快速搭建实时、半实时或批量采集、处理、分析的数据流水线以及可视化报表展示,可视化展示效果如下:

247984561aa6a7370c0c0741a330aded.png

以下我们将介绍一个实时订单数据流水线从数据采集到数据处理,最后到可视化展示的详细建设流程。

 1.流水线设计

借助 KDP 平台的开源组件 Airflow、MySQL、Flink、Kafka、ClickHouse、Superset 完成数据实时采集处理及可视化分析,架构如下: 

8ea91c86309be540823ed486fe2b0dce.jpeg

1.1 数据流

  • 直接使用Flink构建实时数仓,由Flink进行清洗加工转换和聚合汇总,将各层结果集写入Kafka中;

  • ClickHouse从Kafka分别订阅各层数据,将各层数据持久化到ClickHouse中,用于之后的查询分析。

1.2 数据表

本次分析数据基于mock数据,包含数据实时采集处理及可视化分析:

  • 消费者表:customers

字段

字段说明

id

用户ID

name

姓名

age

年龄

gender

性别

  • 订单表:orders

字段

字段说明

order_id

订单ID

order_revenue

订单金额

order_region

下单地区

customer_id

用户ID

create_time

下单时间

1.3 环境说明

在 KDP 页面安装如下组件并完成组件的 QuickStart:

  • MySQL: 实时数据数据源及 Superset/Airflow 元数据库,安装时需要开启binlog

  • Kafka: 数据采集sink

  • Flink: 数据采集及数据处理

  • ClickHouse: 数据存储

  • Superset: 数据可视化

  • Airflow: 作业调度

2. 数据集成与处理

文中使用的账号密码信息请根据实际集群配置进行修改。

2.1 创建MySQL表

2.2 创建 Kafka Topic

进入Kafka broker pod,执行命令创建 Topic,也可以通过Kafka manager 页面创建,以下为进入pod并通过命令行创建的示例:

export BOOTSTRAP="kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092" 


bin/kafka-topics.sh --create \
  --topic ods-order \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP 


bin/kafka-topics.sh --create \
  --topic ods-customers \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP


bin/kafka-topics.sh --create \
  --topic dwd-order-customer-valid \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP


bin/kafka-topics.sh --create \
  --topic dws-agg-by-region \
  --replication-factor 3 \
  --partitions 10 \
  --bootstrap-server $BOOTSTRAP

2.3 创建 ClickHouse 表

进入clickhouse pod,使用`clickhouse-client`执行命令创建表,以下为建表语句:

CREATE DATABASE IF NOT EXISTS kdp_demo;
USE kdp_demo;


-- kafka_dwd_order_customer_valid
CREATE TABLE IF NOT EXISTS kdp_demo.dwd_order_customer_valid (
  order_id Int32,
  order_revenue Float32,
  order_region String,
  create_time DateTime,
  customer_id Int32,
  customer_age Float32,
  customer_name String,
  customer_gender String
) ENGINE = MergeTree()
ORDER BY order_id;


CREATE TABLE kdp_demo.kafka_dwd_order_customer_valid (
  order_id Int32,
  order_revenue Float32,
  order_region String,
  create_time DateTime,
  customer_id Int32,
  customer_age Float32,
  customer_name String,
  customer_gender String
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
  kafka_topic_list = 'dwd-order-customer-valid',
  kafka_group_name = 'clickhouse_group',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n';


CREATE MATERIALIZED VIEW kdp_demo.mv_dwd_order_customer_valid TO kdp_demo.dwd_order_customer_valid AS
SELECT
  order_id,
  order_revenue,
  order_region,
  create_time,
  customer_id,
  customer_age,
  customer_name,
  customer_gender
FROM kdp_demo.kafka_dwd_order_customer_valid;


-- kafka_dws_agg_by_region
CREATE TABLE IF NOT EXISTS kdp_demo.dws_agg_by_region (
  order_region String,
  order_cnt Int64,
  order_total_revenue Float32
) ENGINE = ReplacingMergeTree()
ORDER BY order_region;


CREATE TABLE kdp_demo.kafka_dws_agg_by_region (
  order_region String,
  order_cnt Int64,
  order_total_revenue Float32
) ENGINE = Kafka
SETTINGS
  kafka_broker_list = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
  kafka_topic_list = 'dws-agg-by-region',
  kafka_group_name = 'clickhouse_group',
  kafka_format = 'JSONEachRow',
  kafka_row_delimiter = '\n';


CREATE MATERIALIZED VIEW kdp_demo.mv_dws_agg_by_region TO kdp_demo.dws_agg_by_region AS
SELECT
  order_region,
  order_cnt,
  order_total_revenue
FROM kdp_demo.kafka_dws_agg_by_region;

2.4 创建 Flink SQL 作业

2.4.1 SQL部分

CREATE DATABASE IF NOT EXISTS `default_catalog`.`kdp_demo`;


-- create source tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`orders_src`(
    `order_id` INT NOT NULL,
    `order_revenue` FLOAT NOT NULL,
    `order_region` STRING NOT NULL,
    `customer_id` INT NOT NULL,
    `create_time` TIMESTAMP,
    PRIMARY KEY(`order_id`) NOT ENFORCED
) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'kdp-data-mysql',
    'port' = '3306',
    'username' = 'bdos_dba',
    'password' = 'KdpDba!mysql123',
    'database-name' = 'kdp_demo',
    'table-name' = 'orders'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`customers_src` (
    `id` INT NOT NULL,
    `age` FLOAT NOT NULL,
    `name` STRING NOT NULL,
    `gender` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) with (
    'connector' = 'mysql-cdc',
    'hostname' = 'kdp-data-mysql',
    'port' = '3306',
    'username' = 'bdos_dba',
    'password' = 'KdpDba!mysql123',
    'database-name' = 'kdp_demo',
    'table-name' = 'customers'
);


-- create ods dwd and dws tables
CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_order_table` (
    `order_id` INT,
    `order_revenue` FLOAT,
    `order_region` VARCHAR(40),
    `customer_id` INT,
    `create_time` TIMESTAMP,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'ods-order',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`ods_customers_table` (
    `customer_id` INT,
    `customer_age` FLOAT,
    `customer_name` STRING,
    `gender` STRING,
    PRIMARY KEY (customer_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'ods-customers',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dwd_order_customer_valid` (
    `order_id` INT,
    `order_revenue` FLOAT,
    `order_region` STRING,
    `create_time` TIMESTAMP,
    `customer_id` INT,
    `customer_age` FLOAT,
    `customer_name` STRING,
    `customer_gender` STRING,
    PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'dwd-order-customer-valid',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


CREATE TABLE IF NOT EXISTS `default_catalog`.`kdp_demo`.`dws_agg_by_region` (
    `order_region` VARCHAR(40),
    `order_cnt` BIGINT,
    `order_total_revenue` FLOAT,
    PRIMARY KEY (order_region) NOT ENFORCED
) WITH (
    'connector' = 'upsert-kafka',
    'topic' = 'dws-agg-by-region',
    'properties.bootstrap.servers' = 'kafka-3-cluster-kafka-0.kafka-3-cluster-kafka-brokers.kdp-data.svc.cluster.local:9092',
    'key.format' = 'json',
    'value.format' = 'json'
);


USE kdp_demo;
-- EXECUTE STATEMENT SET
-- BEGIN
INSERT INTO ods_order_table SELECT * FROM orders_src;
INSERT INTO ods_customers_table SELECT * FROM customers_src;
INSERT INTO
    dwd_order_customer_valid
SELECT
    o.order_id,
    o.order_revenue,
    o.order_region,
    o.create_time,
    c.id as customer_id,
    c.age as customer_age,
    c.name as customer_name,
    c.gender as customer_gender
FROM
    customers_src c
        JOIN orders_src o ON c.id = o.customer_id
WHERE
    c.id <> -1;
INSERT INTO
    dws_agg_by_region
SELECT
    order_region,
    count(*) as order_cnt,
    sum(order_revenue) as order_total_revenue
FROM
    dwd_order_customer_valid
GROUP BY
    order_region;
-- END;

2.4.2 使用 StreamPark 创建 Flink SQL 作业

具体使用参考 StreamPark 文档。

maven 依赖:

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mysql-cdc</artifactId>
    <version>3.0.1</version>
</dependency>

2.5 创建 Airflow DAG

2.5.1 DAG 文件部分

import random
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago




default_args = {
    'owner': 'admin',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}


dag = DAG(
    'kdp_demo_order_data_insert',
    description='Insert into orders by using random data',
    schedule_interval=timedelta(minutes=1),
    start_date=days_ago(1),
    catchup=False,
    tags=['kdp-example'],
)


# MySQL connection info
mysql_host = 'kdp-data-mysql'
mysql_db = 'kdp_demo'
mysql_user = 'bdos_dba'
mysql_password = 'KdpDba!mysql123'
mysql_port = '3306'
cities = ["北京", "上海", "广州", "深圳", "成都", "杭州", "重庆", "武汉", "西安", "苏州", "天津", "南京", "郑州",
          "长沙", "东莞", "青岛", "宁波", "沈阳", "昆明", "合肥", "大连", "厦门", "哈尔滨", "福州", "济南", "温州",
          "佛山", "南昌", "长春", "贵阳", "南宁", "金华", "石家庄", "常州", "泉州", "南通", "太原", "徐州", "嘉兴",
          "乌鲁木齐", "惠州", "珠海", "扬州", "兰州", "烟台", "汕头", "潍坊", "保定", "海口"]
city = random.choice(cities)
consumer_id = random.randint(1, 100)
order_revenue = random.randint(1, 100)
# 插入数据的 BashOperator
insert_data_orders = BashOperator(
    task_id='insert_data_orders',
    bash_command=f'''
    mysql -h {mysql_host} -P {mysql_port} -u {mysql_user} -p{mysql_password} {mysql_db} -e "
    INSERT INTO orders(order_revenue,order_region,customer_id) VALUES({order_revenue},'{city}',{consumer_id});"
    ''',
    dag=dag,
)
insert_data_orders

2.5.2 DAG 说明及执行

当前Airflow安装时,需要指定可访问的git 仓库地址,因此需要将 Airflow DAG 提交到 Git 仓库中。每分钟向orders表插入一条数据。

2.6 数据验证

使用ClickHouse验证数据:

(1)进入ClickHouse客户端

clickhouse-client 
# default pass: ckdba.123

(2)执行查询

SELECT * FROM kdp_demo.dwd_order_customer_valid;
SELECT count(*) FROM kdp_demo.dwd_order_customer_valid;

(3)对比验证MySQL中数据是否一致

select count(*) from kdp_demo.orders;

3. 数据可视化

在2.6中数据验证通过后,可以通过Superset进行数据可视化展示。使用账号`admin/admin`登录Superset页面(注意添加本地 Host 解析):http://superset-kdp-data.kdp-e2e.io

3.1 创建图表

导入我们制作好的图表:

  1. 下载面板:https://gitee.com/linktime-cloud/example-datasets/raw/main/superset/dashboard_export_20240607T100739.zip

  2. 导入面板

(1)选择下载的文件导入

eed49ffb69952693ad5a46da6be81f08.png

(2)输入 ClickHouse 的用户`default`的默认密码`ckdba.123`:

4c07d54c7ad0f2f66085481d5bfe77ab.png

3.2 效果展示

最终的实时订单数据图表展示如下,随着订单数据的更新,图表中的数据也会实时更新:

57bd6fa097e66e1da7dc8aa103a599b4.png

快速体验

🚀GitHub项目:

https://github.com/linktimecloud/kubernetes-data-platform

欢迎您参与开源社区的建设🤝

 - FIN -       

1ad0c68fe5ea3d59a392d306012eef0b.png

更多精彩推

  • 我们开源啦!一键部署免费使用!Kubernetes上直接运行大数据平台!

  • 开源 KDP  v1.1.0 版本正式发布,新增数据集成开发应用场景

  • 在 KubeSphere 上快速安装和使用 KDP 云原生数据平台

  • 在 Rancher 上快速安装和使用 KDP 云原生数据平台

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/780353.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

14-35 剑和诗人9 - 普及 Agentic RAG

好吧&#xff0c;让我们直接进入正题——了解 Agentic RAG&#xff08;检索增强生成&#xff09;方法以及它如何彻底改变我们处理信息的方式。系好安全带&#xff0c;因为这将变得疯狂&#xff01; Agentic RAG 的核心在于为 RAG 框架注入智能和自主性。这就像对常规 RAG 系统…

阶段三:项目开发---搭建项目前后端系统基础架构:任务10:SpringBoot框架的原理和使用

任务描述 1、熟悉SpringBoot框架的原理及使用 2、使用IDEA创建基于SpringBoot、MyBatis、MySQL的Java项目 3、当前任务请在client节点上进行 任务指导 1、SpringBoot框架的选择和原理 2、MyBatis-Plus的选择和原理 3、使用IDEA创建基于SpringBootMyBatis-PlusMySQL的Jav…

PCIe驱动开发(1)— 开发环境搭建

PCIe驱动开发&#xff08;1&#xff09;— 开发环境搭建 一、前言 二、Ubuntu安装 参考: VMware下Ubuntu18.04虚拟机的安装 三、QEMU安装 参考文章&#xff1a;QEMU搭建X86_64 Ubuntu虚拟系统环境 四、安装Ubuntu 下载地址&#xff1a;https://old-releases.ubuntu.com…

QWidget窗口抗锯齿圆角的一个实现方案(支持子控件)2

QWidget窗口抗锯齿圆角的一个实现方案&#xff08;支持子控件&#xff09;2 本方案使用了QGraphicsEffect&#xff0c;由于QGraphicsEffect对一些控件会有渲染问题&#xff0c;比如列表、表格等&#xff0c;所以暂时仅作为研究&#xff0c;优先其他方案 在之前的文章中&#…

k8s_集群搭建_在主节点中加入node节点_k8s集群自恢复能力演示_token过期重新生成令牌---分布式云原生部署架构搭建016

然后安装好了master节点以后,我们再来看如何把node节点加入进来,可以看到 只需要执行,命令行中提示的命令就可以了 比如上面的 Your Kubernetes control-plane has initialized successfully!To start using your cluster, you need to run the following as a regular user:…

人脸识别课堂签到系统【PyQt5实现】

人脸识别签到系统 1、运用场景 课堂签到,上班打卡,进出门身份验证。 2、功能类别 人脸录入,打卡签到,声音提醒,打卡信息导出,打包成exe可执行文件 3、技术栈 python3.8,sqlite3,opencv,face_recognition,PyQt5,csv 4、流程图 1、导入库 2、编写UI界面 3、打…

商家店铺电商小程序模板源码

橙色通用的商家入驻&#xff0c;商户商家&#xff0c;商家店铺&#xff0c;购物商城&#xff0c;商家购物平台app小程序网页模板。包含&#xff1a;商家主页、优先商家、商品详情、购物车、结算订单、个人中心、优惠券、会员卡、地址管理等功能页面。 商家店铺电商小程序模板源…

100359.统计X和Y频数相等的子矩阵数量

1.题目描述 给你一个二维字符矩阵 grid&#xff0c;其中 grid[i][j] 可能是 X、Y 或 .&#xff0c;返回满足以下条件的子矩阵数量&#xff1a; 包含 grid[0][0]X 和 Y 的频数相等。至少包含一个 X。 示例 1&#xff1a; 输入&#xff1a; grid [["X","Y",…

算法刷题笔记 滑动窗口(C++实现,非常详细)

文章目录 题目描述基本思路实现代码 题目描述 给定一个大小为n ≤ 10^6的数组。有一个大小为k的滑动窗口&#xff0c;它从数组的最左边移动到最右边。你只能在窗口中看到k个数字。每次滑动窗口向右移动一个位置。以下是一个例子&#xff1a; 该数组为 [1 3 -1 -3 5 3 6 7]&…

leetcode 66. 加一

leetcode 66. 加一 题解 刚开始只是以为在最后一位上加一就可以了 &#xff0c; 没想到还有进位呢&#xff0c; 比如说9的话&#xff0c; 加上1就是10&#xff0c; 返回的数组就是[1. 0],把进位的情况考虑进去就可以了。 class Solution { public:vector<int> plusOne(…

Vue3+.NET6前后端分离式管理后台实战(二十八)

1&#xff0c;Vue3.NET6前后端分离式管理后台实战(二十八)

Raw Socket(一)实现TCP三次握手

实验环境&#xff1a; Windows物理机&#xff1a;192.168.1.4 WSL Ubuntu 20.04.6 LTS&#xff1a;172.19.32.196 Windows下的一个http服务器&#xff1a;HFS&#xff0c;大概长这个样子&#xff1a; 客户端就是Ubuntu&#xff0c;服务端就是这个…

[图解]SysML和EA建模住宅安全系统-12-内部块图

1 00:00:00,580 --> 00:00:02,770 接下来我们来画流了 2 00:00:03,100 --> 00:00:05,050 首先第一个是站点状态 3 00:00:05,140 --> 00:00:08,130 从这里到这里&#xff0c;我们画一个过来 4 00:00:10,290 --> 00:00:11,890 这里流到这里 5 00:00:11,900 -->…

多粒度封锁-封锁粒度、多粒度封锁模式

一、引言 1、若采用封锁技术实现并发控制&#xff0c;事务在访问数据库对象前要在数据库对象上加锁&#xff0c;为提高事务的并发程度&#xff0c;商用DBMS会采用一种多粒度封锁方法 2、事务可访问的数据库对象可以是逻辑单元&#xff0c;包括关系、关系中的元组、关系的属性…

Python学习笔记31:进阶篇(二十)pygame的使用之图形绘制

前言 基础模块的知识通过这么长时间的学习已经有所了解&#xff0c;更加深入的话需要通过完成各种项目&#xff0c;在这个过程中逐渐学习&#xff0c;成长。 我们的下一步目标是完成python crash course中的外星人入侵项目&#xff0c;这是一个2D游戏项目。在这之前&#xff…

Debezium报错处理系列之第111篇:Can‘t compare binlog filenames with different base names

Debezium报错处理系列之第111篇:Cant compare binlog filenames with different base names 一、完整报错二、错误原因三、解决方法Debezium从入门到精通系列之:研究Debezium技术遇到的各种错误解决方法汇总: Debezium从入门到精通系列之:百篇系列文章汇总之研究Debezium技…

论文辅助笔记:ST-LLM

1 时间嵌入 2 PFA&#xff08;Partial Frozen Architecture&#xff09; 3 ST_LLM 3.1 初始化 3.2 forward

蓝桥杯开发板STM32G431RBT6高阶HAL库学习FreeRtos——FreeRTOS任务调度方式

一、任务调度方式 1.1抢占式调度&#xff08;不同优先级&#xff09; 主要是针对优先级不同的任务&#xff0c;每个任务都有一个优先级&#xff0c; 优先级高的任务可以抢占优先级低的任务。1.2时间片调度&#xff08;同优先级&#xff09; 主要针对优先级相同的任务&#x…

视频技术助力智慧城市一网统管:视频资源整合与智能化管理

随着信息技术的飞速发展&#xff0c;智慧城市已成为现代城市发展的重要方向。在智慧城市建设中&#xff0c;一网统管作为城市管理的重要策略&#xff0c;通过整合各类信息资源&#xff0c;实现资源的优化配置和问题的快速响应。其中&#xff0c;视频技术作为一网统管场景中的关…

【Linux系统】动态库和静态库 动态库加载

认识动态库静态库 我们有没有使用过库呢&#xff1f;-- 用过c、c的标准库 c的各种函数&#xff0c;c的各种STL容器&#xff0c;我们使用他们内部必须得有具体实现。 Linux: .so(动态库) .a(静态库) Windows: .dll(动态库) .lib(静态库) 库是拿来给别人使用的&#xff0c;所…