[实时计算flink]基于Paimon的数据库实时入湖快速入门

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。本文通过Paimon Catalog和MySQL连接器,将云数据库RDS中的订单数据和表结构变更导入Paimon表中,并使用Flink对Paimon表进行简单分析。

背景信息

Apache Paimon是一种流批统一的湖存储格式,支持高吞吐的写入和低延迟的查询。目前阿里云实时计算Flink版,以及开源大数据平台E-MapReduce上常见的计算引擎(例如Spark、Hive或Trino)都与Paimon有着较为完善的集成度。您可以借助Apache Paimon快速地在HDFS或者OSS上构建自己的数据湖存储服务,并接入计算引擎实现数据湖的分析。

前提条件

  • 如果您使用RAM用户或RAM角色等身份访问,需要确认已具有Flink控制台相关权限,详情请参见权限管理。

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

步骤一:准备数据源

  1. 快速创建RDS MySQL实例。

    说明

    RDS MySQL版实例需要与Flink工作空间处于同一VPC。不在同一VPC下时请参见网络连通性。

  2. 创建数据库和账号。

    创建名称为orders的数据库,并创建高权限账号或具有数据库orders读写权限的普通账号。

  3. 通过DMS登录RDS MySQL,在orders数据库中创建表orders_1和orders_2。

    CREATE TABLE `orders_1` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
    
    CREATE TABLE `orders_2` (
        orderkey BIGINT NOT NULL,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        comment VARCHAR(100),
        PRIMARY KEY (orderkey)
    );
  4. 插入如下测试数据。

    INSERT INTO `orders_1` VALUES (1, 1, 'O', 131251.81, '1996-01-02', '5-LOW', 'Clerk#000000951', 0, 'nstructions sleep furiously among ');
    INSERT INTO `orders_1` VALUES (2, 3, 'O', 40183.29, '1996-12-01', '1-URGENT', 'Clerk#000000880', 0, ' foxes. pending accounts at the pending, silent asymptot');
    INSERT INTO `orders_1` VALUES (3, 6, 'F', 160882.76, '1993-10-14', '5-LOW', 'Clerk#000000955', 0, 'sly final accounts boost. carefully regular ideas cajole carefully. depos');
    INSERT INTO `orders_1` VALUES (4, 6, 'O', 31084.79, '1995-10-11', '5-LOW', 'Clerk#000000124', 0, 'sits. slyly regular warthogs cajole. regular, regular theodolites acro');
    INSERT INTO `orders_1` VALUES (5, 2, 'F', 86615.25, '1994-07-30', '5-LOW', 'Clerk#000000925', 0, 'quickly. bold deposits sleep slyly. packages use slyly');
    INSERT INTO `orders_1` VALUES (6, 2, 'F', 36468.55, '1992-02-21', '4-NOT SPECIFIED', 'Clerk#000000058', 0, 'ggle. special, final requests are against the furiously specia');
    INSERT INTO `orders_1` VALUES (7, 2, 'O', 171488.73, '1996-01-10', '2-HIGH', 'Clerk#000000470', 0, 'ly special requests ');
    INSERT INTO `orders_1` VALUES (8, 6, 'O', 116923.00, '1995-07-16', '2-HIGH', 'Clerk#000000616', 0, 'ise blithely bold, regular requests. quickly unusual dep');
    INSERT INTO `orders_1` VALUES (9, 3, 'F', 99798.76, '1993-10-27', '3-MEDIUM', 'Clerk#000000409', 0, 'uriously. furiously final request');
    INSERT INTO `orders_1` VALUES (10, 3, 'O', 41670.02, '1998-07-21', '3-MEDIUM', 'Clerk#000000223', 0, 'ly final packages. fluffily final deposits wake blithely ideas. spe');
    INSERT INTO `orders_2` VALUES (11, 6, 'O', 148789.52, '1995-10-23', '4-NOT SPECIFIED', 'Clerk#000000259', 0, 'zzle. carefully enticing deposits nag furio');
    INSERT INTO `orders_2` VALUES (12, 5, 'O', 38988.98, '1995-11-03', '1-URGENT', 'Clerk#000000358', 0, ' quick packages are blithely. slyly silent accounts wake qu');
    INSERT INTO `orders_2` VALUES (13, 4, 'F', 113701.89, '1992-06-03', '3-MEDIUM', 'Clerk#000000456', 0, 'kly regular pinto beans. carefully unusual waters cajole never');
    INSERT INTO `orders_2` VALUES (14, 6, 'O', 46366.56, '1996-08-21', '4-NOT SPECIFIED', 'Clerk#000000604', 0, 'haggle blithely. furiously express ideas haggle blithely furiously regular re');
    INSERT INTO `orders_2` VALUES (15, 4, 'O', 219707.84, '1996-09-20', '3-MEDIUM', 'Clerk#000000659', 0, 'ole express, ironic requests: ir');
    INSERT INTO `orders_2` VALUES (16, 1, 'F', 20065.73, '1994-07-16', '3-MEDIUM', 'Clerk#000000661', 0, 'wake fluffily. sometimes ironic pinto beans about the dolphin');
    INSERT INTO `orders_2` VALUES (17, 0, 'P', 65883.92, '1995-03-18', '1-URGENT', 'Clerk#000000632', 0, 'ular requests are blithely pending orbits-- even requests against the deposit');
    INSERT INTO `orders_2` VALUES (18, 6, 'F', 79258.24, '1994-01-20', '5-LOW', 'Clerk#000000743', 0, 'y pending requests integrate');
    INSERT INTO `orders_2` VALUES (19, 2, 'O', 116227.05, '1996-12-19', '4-NOT SPECIFIED', 'Clerk#000000547', 0, 'symptotes haggle slyly around the furiously iron');
    INSERT INTO `orders_2` VALUES (20, 1, 'O', 215135.72, '1998-04-18', '3-MEDIUM', 'Clerk#000000440', 0, ' pinto beans sleep carefully. blithely ironic deposits haggle furiously acro');

步骤二:创建Catalog

  1. 进入元数据管理页面。

    1. 登录实时计算控制台。

    2. 单击目标工作空间操作列下的控制台

    3. 单击左侧的元数据管理

  2. 创建Paimon Catalog。

    1. 单击创建Catalog内置Catalog页签,选择Apache Paimon后,单击下一步。

    2. 填写配置信息。

      image.png

      配置项

      说明

      备注

      catalog name

      您自定义的Paimon Catalog名称。

      填写为自定义的英文名。

      metastore

      Paimon表的元数据存储类型:

      • filesystem:仅将元数据存储在OSS中。

      • dlf:除了将元数据存储在OSS上外,还会将元数据同步到阿里云数据湖构建服务DLF中。

      本文选择filesystem。

      warehouse

      Paimon Catalog的存储根目录,是一个OSS目录。可以选择创建实时计算Flink版时使用的OSS Bucket,也可以使用同一账号同一地域下的其他OSS Bucket。

      格式为oss://<bucket>/<object>。其中:

      • bucket:表示您创建的OSS Bucket名称。

      • object:表示您存放数据的路径。

      您可以在OSS管理控制台上查看您的bucket和object名称。

    3. 单击确定

步骤三:创建Flink作业

  1. 数据开发 > ETL页面,单击新建

  2. 选择空白的流作业草稿,单击下一步

  3. 新建作业草稿对话框,填写作业配置信息。

    作业参数

    说明

    文件名称

    作业的名称。

    说明

    作业名称在当前项目中必须保持唯一。

    存储位置

    指定该作业的存储位置。

    您还可以在现有文件夹右侧,单击

    新建文件夹

    图标,新建子文件夹。

    引擎版本

    当前作业使用的Flink的引擎版本。引擎版本号含义、版本对应关系和生命周期重要时间点详情请参见引擎版本介绍。

  4. 单击创建

  5. 输入以下语句,实时捕获orders数据库中相关表的变化,并同步到Paimon表中。

    -- 使用刚刚创建的Paimon Catalog
    USE CATALOG `test`;
    
    -- 创建一张MySQL临时表,捕获表名符合正则表达式orders_\d+的MySQL表的变化
    CREATE TEMPORARY TABLE mysql_orders (
        orderkey BIGINT,
        custkey BIGINT,
        order_status VARCHAR(100),
        total_price DOUBLE,
        order_date DATE,
        order_priority VARCHAR(100),
        clerk VARCHAR(100),
        ship_priority INT,
        `comment` VARCHAR(100),
        PRIMARY KEY (orderkey) NOT ENFORCED
    ) WITH (
        'connector' = 'mysql',
        'hostname' = 'rm-bp1s1xgll21ey****.mysql.rds.aliyuncs.com',
        'port' = '3306',
        'username' = 'your_username',
        'password' = '${secret_values.mysql_pw}',
        'database-name' = 'orders',
        'table-name' = 'orders_\d+',
        'server-time-zone' = 'Asia/Shanghai'
    );
    
    -- 将MySQL表的变化同步到Paimon表中
    CREATE TABLE IF NOT EXISTS orders AS TABLE mysql_orders;

    ​参数说明如下,您可以根据实际情况进行修改。MySQL连接器更多参数详情请参见MySQL。

    参数

    说明

    备注

    connector

    连接器类型。

    本示例固定值为mysql

    hostname

    MySQL数据库的IP地址或者Hostname。

    本文填写为RDS实例的内网地址。

    username

    MySQL数据库服务的用户名。

    无。

    password

    MySQL数据库服务的密码。

    本示例通过使用名为mysql_pw密钥的方式填写密码值,避免信息泄露,详情请参见变量管理。

    database-name

    MySQL数据库名称。

    本示例填写为步骤一:准备数据源中创建的数据库。

    table-name

    MySQL表名。

    作为源表时,表名支持正则表达式以读取多个表的数据。

    port

    MySQL数据库服务的端口号。

    无。

  6. (可选)单击右上方的深度检查,确认作业Flink SQL语句中是否存在语法错误。

  7. 单击右上方的部署,单击确定

  8. 在左侧导航栏,单击运维中心 > 作业运维,单击目标作业名称,进入作业部署详情页面。

  9. 单击运行参数配置区域右侧的编辑

    本文为了更快观察到任务运行的结果,将系统检查点间隔两次系统检查点之间的最短时间间隔均改为10s,单击保存

    image

  10. 在目标作业部署详情页顶部,单击启动,选择无状态启动

    image.png

  11. 查询Paimon数据。

    1. 数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

      select custkey, sum(total_price) from `test`.`default`.`orders` group by custkey;
    2. 结果浏览完成后,单击左侧的

      image.png

      停止调试。

      image.png

步骤四:更新MySQL表结构

本部分将演示MySQL表结构变更同步到Paimon表的功能。

  1. 登录云数据库RDS控制台。

  2. 在orders数据库,输入如下SQL语句,然后单击执行,为两张数据表添加一列,并填充一些数据。

    ALTER TABLE `orders_1` ADD COLUMN quantity BIGINT; 
    ALTER TABLE `orders_2` ADD COLUMN quantity BIGINT; 
    UPDATE `orders_1` SET `quantity` = 100 WHERE `orderkey` < 5;
    UPDATE `orders_2` SET `quantity` = 100 WHERE `orderkey` > 15;
  3. 在实时计算控制台数据开发 > 数据查询页面的查询脚本页签,将如下代码拷贝到查询脚本后,选中目标片段后单击左侧代码行上的运行

    select * from `test`.`default`.`orders` where `quantity` is not null;

    结果如下,浏览完成后,可单击左侧的

    image.png

    停止调试。

    Image 32

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/899816.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

深度学习之梯度下降法 | Chapter 2 | Deep Learning | 3Blue1Brown

目录 前言1. 总览2. 回顾3. 训练数据的使用4. 代价函数5. 梯度下降法6. 梯度向量7. 梯度下降小结8. 分析网络9. 网络如何学习的研究相关资料结语 前言 3Blue1Brown 视频笔记&#xff0c;仅供自己参考 这个章节主要介绍梯度下降的思想&#xff0c;之后进一步探索网络的能力以及隐…

助农贷款、保险精准定价,背后的“星绽”机密计算全球开源

文&#xff5c;白 鸽 编&#xff5c;王一粟 河南平顶山种植日本引进白草莓的李朝阳&#xff0c;和山东临沂种植山楂和桃子的李东旭&#xff0c;都是网商银行“农户秒贷”项目的受益者。 “发果农工资&#xff0c;收购水果&#xff0c;遇上天气灾害时周转应急时&#xff0c;“…

E/MicroMsg.SDK.WXMediaMessage:checkArgs fail,thumbData is invalid 图片资源太大导致分享失败

1、微信分享报&#xff1a; 2、这个问题是因为图片太大导致&#xff1a; WXWebpageObject webpage new WXWebpageObject();webpage.webpageUrl qrCodeUrl;//用 WXWebpageObject 对象初始化一个 WXMediaMessage 对象WXMediaMessage msg new WXMediaMessage(webpage);msg.tit…

MySQL-12.DQL-条件查询

一.DQL-条件查询 -- DQL:条件查询 -- 1.查询 姓名 为 杨逍 的员工 select id, username, password, name, gender, image, job, entrydate, create_time, update_timefrom tb_emp where name 杨逍;-- 2.查询 id小于等于5 的员工信息 select * from tb_emp where id < 5;-…

Flutter 小技巧之 equatable 包解析以及宏编程解析

今天我们聊聊 equatable 包的实现&#xff0c;并通过 equatable 去理解 Dart 宏编程的作用和实现&#xff0c;对于 Flutter 开发者来说&#xff0c;Dart 宏编程可以说是「望眼欲穿」。 equatable 正如 equatable 这个包名所示&#xff0c;它的功能很简单&#xff0c;主要是用…

LeetCode 热题100之双指针

1.移动零 思路分析1&#xff08;纯模拟&#xff09; 定义指针j&#xff0c;用来收集不是0的数&#xff1b;收集完毕之后&#xff0c;再把剩下位置处置为0即可。 具体实现代码&#xff08;详解版&#xff09;&#xff1a; class Solution { public:void moveZeroes(vector<…

前端vue框架配置基础信息详解分析

前端vue2、vue3框架是我们最近常用的框架&#xff0c;今天我们分析一下配置基础信息、详解其中的功能含义。 1、vue.config.js 文件分析 这个 vue.config.js 文件是 Vue CLI 项目中用于配置项目构建行为和开发环境设置的文件。它能够让开发者定制打包、代理、路径、样式等方面…

国产单片机及其特点

国产单片机在近年来取得了显著的发展&#xff0c;不仅在技术上不断突破&#xff0c;还在市场上占据了越来越重要的位置。 主要国产单片机品牌及特点 兆易创新&#xff08;GD&#xff09; 主要系列&#xff1a;GD32系列&#xff0c;基于ARM Cortex-M内核。特点&#xff1a;高性能…

Android 中的串口开发

一&#xff1a;背景 本文着重讲安卓下的串口。 由于开源的Android在各种智能设备上的使用越来越多&#xff0c;如车载系统等。在我们的认识中&#xff0c;Android OS的物理接口一般只有usb host接口和耳机接口&#xff0c;但其实安卓支持各种各样的工业接口&#xff0c;如HDM…

ResNet18果蔬图像识别分类

1. 项目简介 本项目的目标是开发一个基于ResNet18深度学习模型的果蔬图像分类系统。随着现代农业与人工智能的结合&#xff0c;智能果蔬分类技术在供应链、生产和销售管理中扮演了越来越重要的角色。本项目的背景源于提升果蔬分类效率的需求&#xff0c;通过使用计算机视觉技术…

基于SSM+微信小程序的酒店管理系统1

&#x1f449;文末查看项目功能视频演示获取源码sql脚本视频导入教程视频 1、项目介绍 基于微信小程序开发的酒店管理系统管理员&#xff0c;酒店管理员以及用户。 1、管理员功能可以管理个人中心&#xff0c;用户信息管理&#xff0c;酒店管理员管理&#xff0c;房间类型管…

YOLO11改进 | 注意力机制 | 添加SE注意力机制

秋招面试专栏推荐 &#xff1a;深度学习算法工程师面试问题总结【百面算法工程师】——点击即可跳转 &#x1f4a1;&#x1f4a1;&#x1f4a1;本专栏所有程序均经过测试&#xff0c;可成功执行&#x1f4a1;&#x1f4a1;&#x1f4a1; 本文介绍了YOLOv11添加SE注意力机制&…

Redis中String类型数据扩容原理分析

大家好&#xff0c;我是 V 哥。在 Java 中&#xff0c;我们有动态数组ArrayList&#xff0c;当插入新元素空间不足时&#xff0c;会进行扩容&#xff0c;好奇 Redis 中的 String 类型&#xff0c;C 语言又是怎样的实现策略&#xff0c;带着疑问&#xff0c;咱们来了解一下。 最…

Python酷库之旅-第三方库Pandas(167)

目录 一、用法精讲 766、pandas.Interval.open_left属性 766-1、语法 766-2、参数 766-3、功能 766-4、返回值 766-5、说明 766-6、用法 766-6-1、数据准备 766-6-2、代码示例 766-6-3、结果输出 767、pandas.Interval.open_right属性 767-1、语法 767-2、参数 …

[LeetCode] 78. 子集

题目描述&#xff1b; 给你一个整数数组 nums &#xff0c;数组中的元素 互不相同 。返回该数组所有可能的子集&#xff08;幂集&#xff09;。 解集 不能 包含重复的子集。你可以按 任意顺序 返回解集。 示例 1&#xff1a; 输入&#xff1a;nums [1,2,3] 输出&#xff1…

Windows通过netsh控制安全中心防火墙和网络保护策略

Windows通过netsh控制安全中心防火墙和网络保护策略 1. 工具简介 【1】. Windows安全中心 【2】. netsh工具 netsh(Network Shell) 是一个Windows系统本身提供的功能强大的网络配置命令行工具。 2. 开启/关闭防火墙策略 在设置端口&#xff08;禁用/启用&#xff09;前&am…

传输层协议UDP详解

目录 一. 知识准备 1.1 传输层 1.2 重识端口号 二. UDP协议 三. UDP协议特点 一. 知识准备 1.1 传输层 前面已经讲过&#xff0c;HTTP协议是应用层协议&#xff0c;在此之前&#xff0c;我们短暂的认为HTTP是直接通过应用层与外界通信的。但是我们要知道&…

DOTween动画插件超详解(保姆级巨细)

文章目录 一、前言二、DOTween简介与安装&#xff08;一&#xff09;什么是DOTween&#xff1f;&#xff08;二&#xff09;下载安装 三、DOTween 的使用&#xff08;基础&#xff09;&#xff08;一&#xff09;使用前注意事项1. 引入命名空间2. 进行初始化3. 清除遗留4. 设置…

基于Java的电商书城系统源码带本地搭建教程

技术框架&#xff1a;jQuery MySQL5.7 mybatis jsp shiro 运行环境&#xff1a;jdk8 IntelliJ IDEA maven3 宝塔面板 系统功能介绍 该系统分为前台展示和后台管理两大模块&#xff0c;前台主要是为消费者服务。该子系统实现了注册&#xff0c;登录&#xff0c; 以及…

asp.net core mvc发布时输出视图文件Views

var builder WebApplication.CreateBuilder(args); builder.Services.AddRazorPages();builder.Services.AddControllersWithViews(ops > {//全局异常过滤器&#xff0c;注册ops.Filters.Add<ExceptionFilter>(); })// Views视图文件输出到发布目录&#xff0c;视图文…