Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

Flink CDC系列之:基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

  • 一、技术路线
  • 二、MySQL数据库建表
  • 三、PostgreSQL数据库建表
  • 四、在 Flink SQL CLI 中使用 Flink DDL 创建表
  • 五、关联订单数据并且将其写入 Elasticsearch 中
  • 六、Kibana查看商品和物流信息的订单数据
  • 七、修改数据库中表的数据,Kibana查看更新

一、技术路线

在这里插入图片描述

二、MySQL数据库建表

mysql数据库创建数据库和表 products,orders

创建products表

-- MySQL

CREATE DATABASE mydb;USE mydb;

CREATE TABLE products (id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,name VARCHAR(255) NOT NULL,description VARCHAR(512));

ALTER TABLE products AUTO_INCREMENT = 101;

创建orders表

CREATE TABLE orders (
    order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
    order_date DATETIME NOT NULL,
    customer_name VARCHAR(255) NOT NULL,
    price DECIMAL(10, 5) NOT NULL,
    product_id INTEGER NOT NULL,
    order_status BOOLEAN NOT NULL -- Whether order has been placed
 ) AUTO_INCREMENT = 10001;

products表插入数据

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

orders表插入数据

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
(default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
(default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

三、PostgreSQL数据库建表

创建表 shipments

-- PG
CREATE TABLE shipments (
shipment_id SERIAL NOT NULL PRIMARY KEY,
order_id SERIAL NOT NULL,
origin VARCHAR(255) NOT NULL,
destination VARCHAR(255) NOT NULL,
is_arrived BOOLEAN NOT NULL);

插入数据

ALTER SEQUENCE public.shipments_shipment_id_seq RESTART WITH 1001;


ALTER TABLE public.shipments REPLICA IDENTITY FULL;

INSERT INTO shipments
VALUES (default,10001,'Beijing','Shanghai',false),
       (default,10002,'Hangzhou','Shanghai',false),   
       (default,10003,'Shanghai','Hangzhou',false);

四、在 Flink SQL CLI 中使用 Flink DDL 创建表

首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s;

然后, 对于数据库中的表 products, orders, shipments, 使用 Flink SQL CLI 创建对应的表,用于同步这些底层数据库表的数据

-- Flink SQL

Flink SQL> CREATE TABLE products (
id INT,
name STRING,
description STRING,
PRIMARY KEY (id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products'
);
Flink SQL> CREATE TABLE orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'orders');

最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
product_name STRING,
product_description STRING,
shipment_id INT,
origin STRING,
destination STRING,
is_arrived BOOLEAN,
PRIMARY KEY (order_id) NOT ENFORCED) WITH (
'connector' = 'elasticsearch-7',
'hosts' = 'http://localhost:9200',
'index' = 'enriched_orders'
);

五、关联订单数据并且将其写入 Elasticsearch 中

使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中

-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
SELECT o.*, p.name, p.description, s.shipment_id, s.origin, s.destination, s.is_arrived
FROM orders AS o
LEFT JOIN products AS p ON o.product_id = p.id
LEFT JOIN shipments AS s ON o.order_id = s.order_id;

六、Kibana查看商品和物流信息的订单数据

创建 index pattern enriched_orders
在这里插入图片描述
查看写入的数据
在这里插入图片描述

七、修改数据库中表的数据,Kibana查看更新

修改 MySQL 和 Postgres 数据库中表的数据,Kibana中显示的订单数据也将实时更新:

在 MySQL 的 orders 表中插入一条数据

--MySQL

INSERT INTO orders
VALUES (default, '2020-07-30 15:22:00', 'Jark', 29.71, 104, false);

在 Postgres 的 shipment 表中插入一条数据

--PG
INSERT INTO shipmentsVALUES (default,10004,'Shanghai','Beijing',false);

在 MySQL 的 orders 表中更新订单的状态

--MySQL
UPDATE orders SET order_status = true WHERE order_id = 10004;

在 Postgres 的 shipment 表中更新物流的状态

--PG
UPDATE shipments SET is_arrived = true WHERE shipment_id = 1004;

在 MYSQL 的 orders 表中删除一条数据

--MySQL
DELETE FROM orders WHERE order_id = 10004;

每执行一步就刷新一次 Kibana,可以看到 Kibana 中显示的订单数据将实时更新,如下所示:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/80504.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

leetcode611. 有效三角形的个数(java)

有效三角形的个数 有效三角形的个数排序加二分排序 双指针 上期算法 有效三角形的个数 给定一个包含非负整数的数组 nums ,返回其中可以组成三角形三条边的三元组个数。 示例 1: 输入: nums [2,2,3,4] 输出: 3 解释:有效的组合是: 2,3,4 (使用第一个 2) 2,3,4 (使…

如何修复损坏的DOC和DOCX格式Word文件?

我们日常办公中,经常用到Word文档。但是有时会遇到word文件损坏、无法打开的情况。这时该怎么办?接着往下看,小编在这里就给大家带来最简单的Word文件修复方法! 很多时候DOC和DOCX Word文件会无缘无故的损坏无法打开,一…

【C++ 记忆站】引用

文章目录 一、引用概念二、引用特性1、引用在定义时必须初始化2、一个变量可以有多个引用3、引用一旦引用一个实体,再不能引用其他实体 三、常引用四、使用场景1、做参数1、输出型参数2、大对象传参 2、做返回值1、传值返回2、传引用返回 五、传值、传引用效率比较六…

【C语言】每日一题(找到所有数组中消失的数字)

找到所有数组中消失的数字,链接奉上。 这里简单说一下,因为还没有接触到动态内存,数据结构,所以知识有限,也是尽力而为,结合题库的评论区找到了适合我的解法,以后有机会,会补上各种…

图数据库_Neo4j中文版_Centos7.9安装Neo4j社区版3.5.9_基于jdk1.8---Neo4j图数据库工作笔记0012

由于我们在国内使用啊,具体还是要用中文版滴,找了好久这个neo4j,原来还是有中文版的, https://we-yun.com/doc/neo4j-chs/ 中文版下载地址在这里: 所有版本都在这里了,需要哪个自己去下载就可以了,要注意下载以后,参考: https://we-yun.com/blog/prod-56.html 在这个位置下载…

画质提升+带宽优化,小红书音视频团队端云结合超分落地实践

随着视频业务和短视频播放规模不断增长,小红书一直致力于研究:如何在保证提升用户体验质量的同时降低视频带宽成本? 在近日结束的音视频技术大会「LiveVideoStackCon 2023」上海站中,小红书音视频架构视频图像处理算法负责人剑寒向…

2023.8 - java - 对象和类

public class Dog {String breed;int size;String colour;int age;void eat() {}void run() {}void sleep(){}void name(){} } 一个类可以包含以下类型变量: 局部变量:在方法、构造方法或者语句块中定义的变量被称为局部变量。变量声明和初始化都是在方…

实现Java异步调用的高效方法

文章目录 为什么需要异步调用?Java中的异步编程方式1. 使用多线程2. 使用Java异步框架 异步调用的关键细节结论 🎉欢迎来到Java学习路线专栏~实现Java异步调用的高效方法 ☆* o(≧▽≦)o *☆嗨~我是IT陈寒🍹✨博客主页:IT陈寒的博…

LabVIEW开发最小化5G系统测试平台

LabVIEW开发最小化5G系统测试平台 由于具有大量存储能力和数据的应用程序的智能手机的激增,当前一代产品被迫提高其吞吐效率。正交频分复用由于其卓越的品质,如单抽头均衡和具有成本效益的实施,现在被广泛用作物理层技术。这些好处是以严格的…

Azure存储访问层

blob数据的热访问层,冷访问层和存档访问层 Azure Blob 存储是一种托管对象存储服务,可用于存储和访问大量非结构化数据,如文本和二进制数据。Azure Blob 存储提供了三个不同层级的访问方式,以适应不同数据的使用模式和成本效益需…

手把手教学——终端工具xshell与文件传输工具xftp使用步骤及详解

前言 xshell是一款常用于连接本地linux服务以及云服务器的终端远程连接工具,该款终端工具常搭配远程文件传输工具xftp一起使用,由于还有很多小伙伴还不知道这两款终端工具的使用流程及步骤,Darren洋在这里给小伙伴们进行详细讲解。 一、下载工…

proteus结合keil-arm编译器构建STM32单片机项目进行仿真

proteus是可以直接创建设计图和源码的,但是源码编译它需要借助keil-arm编译器,也就是我们安装keil-mdk之后自带的编译器。 下面给出一个完整的示例,主要是做一个LED灯闪烁的效果。 新建工程指定路径,Schematic,PCB layout都选择默…

【马蹄集】第二十三周——进位制专题

进位制专题 目录 MT2186 二进制?不同!MT2187 excel的烦恼MT2188 单条件和MT2189 三进制计算机1MT2190 三进制计算机2 MT2186 二进制?不同! 难度:黄金    时间限制:1秒    占用内存:128M 题目…

推荐一个绘图平台(可替代Visio)

不废话,简易记网址: draw.io 网站会重定向到:https://app.diagrams.net/

《TCP IP网络编程》第十八章

第 18 章 多线程服务器端的实现 18.1 理解线程的概念 线程背景: 第 10 章介绍了多进程服务端的实现方法。多进程模型与 select 和 epoll 相比的确有自身的优点,但同时也有问题。如前所述,创建(复制)进程的工作本身会…

【leetcode 力扣刷题】旋转矩阵(循环过程边界控制)

力扣刷题 旋转矩阵 二维矩阵按圈遍历(顺时针 or 逆时针)遍历59. 旋转矩阵Ⅱ54. 旋转矩阵剑指 Offer 29. 顺时针打印矩阵 二维矩阵按圈遍历(顺时针 or 逆时针)遍历 下面的题目的主要考察点都是,二维数组从左上角开始顺…

【Nginx18】Nginx学习:WebDav文件存储与图片媒体处理模块

Nginx学习:WebDav文件存储与图片媒体处理模块 今天的内容怎么说呢?有两个感觉非常有意思,另外一些就差点意思。有意思的是,咱们可以直接用 Nginx 的 Webdav 功能搭建一个网盘,另外也可以实现动态的图片处理。这两个功能…

MyBatis的入门级环境搭建及增删改查,详细易懂

目录 一.mybatis的简介 二.MyBatis的环境搭建 2.1 导入pom依赖 2.2 数据库文件导入连接 2.3 修改web.xml文件 2.4 安装插件 2.5 配置文件 2.5.1 mybatis.cfg.xml文件 2.5.2 generatorConfig.xml文件 2.6 最后测试生成代码 三.MyBatis的增删改查 3.1 写service类&#xff…

CSS3:图片边框

简介 图片也可以作为边框&#xff0c;以下是实例演示 注意 实现该效果必须添加border样式&#xff0c;且必须位于border-image-socure之前否则不会生效 实例 <html lang"en"><head><style>p {width: 600px;margin: 200px auto;border: 30px soli…

CSS 背景属性

前言 背景属性 属性说明background-color背景颜色background-image背景图background-repeat背景图平铺方式background-position背景图位置background-size背景图缩放background-attachment背景图固定background背景复合属性 背景颜色 可以使用background-color属性来设置背景…