[实时计算flink]数据摄入YAML作业快速入门

实时计算Flink版基于Flink CDC,通过开发YAML作业的方式有效地实现了将数据从源端同步到目标端的数据摄入工作。本文介绍如何快速构建一个YAML作业将MySQL库中的所有数据同步到StarRocks中。

前提条件

  • 已创建Flink工作空间,详情请参见开通实时计算Flink版。

  • 上下游存储

    • 已创建RDS MySQL实例,详情请参见快速创建RDS MySQL实例。

    • 已创建StarRocks实例,详情请参见步骤一:创建存算一体版StarRocks实例。

    说明

    RDS MySQL和StarRocks需要与Flink工作空间在相同VPC下,否则需要打通网络和配置RDS MySQL的IP白名单,详情请参见如何访问跨VPC的其他服务?、实时计算Flink版如何访问公网?和操作指导。

背景信息

假设MySQL实例中有一个order_dw_mysql库,里面有名称为orders、orders_pay和product_catalog的3张业务表。此时,如果您希望开发一个数据摄入YAML作业,将这些表和数据都同步到StarRocks的order_dw_sr数据库中,则可以按照以下步骤进行:

  1. 步骤一:准备RDS MySQL测试数据

  2. 步骤二:开发数据摄入YAML作业

  3. 步骤三:启动数据摄入YAML作业

  4. 步骤四:在StarRocks上查看同步结果

步骤一:准备RDS MySQL测试数据

  1. 创建数据库和账号。

    为目标实例创建名称为order_dw_mysql数据库和具有对应数据库读写权限的普通账号。具体操作请参见创建数据库和账号和管理数据库。

  2. 通过DMS登录RDS MySQL。

    详情请参见通过DMS登录RDS MySQL。

  3. 在已登录的SQL Console窗口,输入如下命令后单击执行,创建数据库和三张业务表,并插入数据。

    CREATE TABLE `orders` (
      order_id bigint not null primary key,
      user_id varchar(50) not null,
      shop_id bigint not null,
      product_id bigint not null,
      buy_fee numeric(20,2) not null,   
      create_time timestamp not null,
      update_time timestamp not null default now(),
      state int not null 
    );
    
    
    CREATE TABLE `orders_pay` (
      pay_id bigint not null primary key,
      order_id bigint not null,
      pay_platform int not null, 
      create_time timestamp not null
    );
    
    
    CREATE TABLE `product_catalog` (
      product_id bigint not null primary key,
      catalog_name varchar(50) not null
    );
    
    -- 准备数据
    INSERT INTO product_catalog VALUES(1, 'phone_aaa'),(2, 'phone_bbb'),(3, 'phone_ccc'),(4, 'phone_ddd'),(5, 'phone_eee');
    
    INSERT INTO orders VALUES
    (100001, 'user_001', 12345, 1, 5000.05, '2023-02-15 16:40:56', '2023-02-15 18:42:56', 1),
    (100002, 'user_002', 12346, 2, 4000.04, '2023-02-15 15:40:56', '2023-02-15 18:42:56', 1),
    (100003, 'user_003', 12347, 3, 3000.03, '2023-02-15 14:40:56', '2023-02-15 18:42:56', 1),
    (100004, 'user_001', 12347, 4, 2000.02, '2023-02-15 13:40:56', '2023-02-15 18:42:56', 1),
    (100005, 'user_002', 12348, 5, 1000.01, '2023-02-15 12:40:56', '2023-02-15 18:42:56', 1),
    (100006, 'user_001', 12348, 1, 1000.01, '2023-02-15 11:40:56', '2023-02-15 18:42:56', 1),
    (100007, 'user_003', 12347, 4, 2000.02, '2023-02-15 10:40:56', '2023-02-15 18:42:56', 1);
    
    INSERT INTO orders_pay VALUES
    (2001, 100001, 1, '2023-02-15 17:40:56'),
    (2002, 100002, 1, '2023-02-15 17:40:56'),
    (2003, 100003, 0, '2023-02-15 17:40:56'),
    (2004, 100004, 0, '2023-02-15 17:40:56'),
    (2005, 100005, 0, '2023-02-15 18:40:56'),
    (2006, 100006, 0, '2023-02-15 18:40:56'),
    (2007, 100007, 0, '2023-02-15 18:40:56');

步骤二:开发数据摄入YAML作业

  1. 登录实时计算管理控制台。

  2. 在左侧导航栏选择数据开发 > 数据摄入

  3. 单击新建,选择MySQL到Starrocks数据同步,单击下一步

  4. 填写作业名称存储位置和选择引擎版本后,单击确定

  5. 配置YAML作业代码信息。

    将MySQL中order_dw_mysql数据库下的所有表同步到starrocks的order_dw_sr数据库中,代码示例如下。

    source:
      type: mysql
      hostname: rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com
      port: 3306
      username: ${secret_values.mysqlusername}
      password: ${secret_values.mysqlpassword}
      tables: order_dw_mysql.\.*
      server-id: 5405-5415
    
    sink:
      type: starrocks
      name: StarRocks Sink
      jdbc-url: jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030
      load-url: fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030
      username: ${secret_values.starrocksusername}
      password: ${secret_values.starrockspassword}
      table.create.properties.replication_num: 1
      
    route:
      - source-table: order_dw_mysql.\.*
        sink-table: order_dw_sr.<>
        replace-symbol: <>
        description: route all tables in source_db to sink_db
    
    pipeline:
      name: Sync MySQL Database to StarRocks

    关于MySQL和Starrocks的本示例需要的配置信息说明如下表所示,数据摄入更多参数详情请参见MySQL和StarRocks。

    类别

    参数

    说明

    示例值

    source

    hostname

    MySQL数据库的IP地址或者Hostname。

    建议填写专有网络VPC地址。

    rm-bp1rk934iidc3****.mysql.rds.aliyuncs.com

    port

    MySQL数据库服务的端口号。

    3306

    username

    MySQL数据库服务的用户名和密码。填写您步骤一:准备RDS MySQL测试数据中创建的账号和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.mysqlusername}

    password

    ${secret_values.mysqlpassword}

    tables

    MySQL表名。支持正则表达式以读取多个表的数据。

    本文将同步order_dw_mysql数据库所有表及数据。

    order_dw_mysql.\.*

    server-id

    数据库客户端的一个数字ID。

    5405-5415

    sink

    jdbc-url

    JDBC连接的URL。

    指定FE(Front End)的IP和查询端口,格式为jdbc:mysql://ip:port

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址查询端口

     jdbc:mysql://fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:9030

    load-url

    连接到FE节点的HTTP服务URL。

    您可以在E-MapReduce控制台实例详情页签,查看目标实例的FE内网地址HTTP端口

    fe-c-b76b6aa51807****-internal.starrocks.aliyuncs.com:8030

    username

    StarRocks连接用户名和密码。

    此处需要填写为您开通StarRocks时填写的用户名和密码信息。

    说明

    本示例使用变量,可以避免明文展示密码等信息,详情请参见变量管理。

    ${secret_values.starrocksusername}

    password

    ${secret_values.starrockspassword}

    route

    source-table

    指定生效上游表。

    order_dw_mysql.\.*

    sink-table

    指定数据路由的目标位置。

    order_dw_sr.<>

    replace-symbol

    在使用模式匹配功能时,用于指代上游表名的字符串。

    <>

  6. 单击部署

步骤三:启动数据摄入YAML作业

  1. 数据摄入页面,单击部署后,在弹出的对话框中,单击确定

  2. 运维中心 > 作业运维页面,单击目标YAML作业操作中的启动

  3. 单击启动

    本示例选择为无状态启动,参数配置详情请参见作业启动。作业启动后,您可以在作业运维页面观察作业的运行信息和状态。

步骤四:在StarRocks上查看同步结果

当YAML作业处于运行中后,您就可以在StarRocks查看数据同步情况。

  1. 通过EMR StarRocks Manager连接StarRocks实例。

  2. 在左侧导航栏,单击SQL Editor,在数据库页签,单击

    image

    按钮。

    您会看到default_catalog下出现名称为order_dw_sr的数据库。

  3. 查询列表页签,单击+文件,新建查询脚本后,输入以下SQL语句,单击运行

    SELECT * FROM default_catalog.order_dw_sr.orders order by order_id;
    SELECT * FROM default_catalog.order_dw_sr.orders_pay order by pay_id;
    SELECT * FROM default_catalog.order_dw_sr.product_catalog order by product_id;
  4. 在命令下方查看同步结果。

    您会看到StarRocks中已存在和MySQL数据库中相同名称的表及数据。

    image

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/899205.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Jenkins配置CI/CD开发环境(理论到实践的完整流程)

目录 一、对于CI/CD的理解1.1、什么是CI&#xff08;持续集成&#xff09;1.2、CI 的主要特点&#xff1a;1.3、CI 的优势&#xff1a;**实际开发中的场景举例:** 1.4、什么是 CD&#xff08;持续交付和持续部署&#xff09;1.5、持续交付&#xff08;Continuous Delivery&…

鸿蒙HarmonyOS NEXT 5.0开发(2)—— ArkUI布局组件

文章目录 布局Column&#xff1a;从上往下的布局Row&#xff1a;从左往右的布局Stack&#xff1a;堆叠布局Flex&#xff1a;自动换行或列 组件Swiper各种选择组件 华为官方教程B站视频教程 布局 主轴和交叉轴的概念&#xff1a; 对于Column布局而言&#xff0c;主轴是垂直方…

如何区分真假Facebook三不限海外户?

对于需要推广到海外的企业来说&#xff0c;Facebook是一个重要的渠道。由于Facebook对国内普通企业户的风控极为严格&#xff0c;让不少出海广告主都很头疼&#xff0c;一到要出量的时刻就限额、挂户各种问题&#xff0c;根本没有办法跑起来量&#xff0c;白白错过好时机。对于…

R语言统计分析——置换检验3

参考资料&#xff1a;R语言实战【第2版】 列联表的独立性 通过chisq_test()或cmh_test()函数&#xff0c;我们可以用置换检验判断两类别型变量的独立性。当数据可根据第三个类别型变量进行分层时&#xff0c;需要使用后一个函数。若变量都是有序型&#xff0c;可使用lbl_test(…

047_python基于Hadoop的租房数据分析系统的设计与实现

目录 系统展示 开发背景 代码实现 项目案例 获取源码 博主介绍&#xff1a;CodeMentor毕业设计领航者、全网关注者30W群落&#xff0c;InfoQ特邀专栏作家、技术博客领航者、InfoQ新星培育计划导师、Web开发领域杰出贡献者&#xff0c;博客领航之星、开发者头条/腾讯云/AW…

基于 Konva 实现Web PPT 编辑器(三)

完善公式 上一节我们简单讲述了公式的使用&#xff0c;并没有给出完整的样例&#xff0c;下面还是完善下相关步骤&#xff0c;我们是默认支持公式的编辑功能的哈&#xff0c;因此&#xff0c;我们只需要提供必要的符号即可&#xff1a; 符号所表达的含义是 mathlive 的command命…

socket套接字

1.IP地址 IP地址是在IP协议中, 用来标识网络中不同主机的地址。由点分十进制组成&#xff0c;在数据传输中IP地址是一直不变的。 在IP数据包头部中, 有两个IP地址&#xff0c; 分别叫做源IP地址和目的IP地址。 2.端口号 由2字节16位整数组成&#xff0c;标识当前主机的唯一…

YOLO V3 网络构架解析

YOLO V3&#xff08;You Only Look Once version 3&#xff09;是由Joseph Redmon等人于2018年提出的一种基于深度学习的目标检测算法。它在速度和精度上相较于之前的版本有了显著提升&#xff0c;成为计算机视觉领域的一个重要里程碑。本文将详细解析YOLO V3的网络架构&#x…

关于WPF项目降低.Net版本

本来有项目是.NET Framework 4.8的&#xff0c;为了兼容升级到.NET 8.0&#xff0c;后期又为了兼容放弃.NET 8.0&#xff0c;升级的步骤&#xff1a;利用vs2022 的 .NET Upgrade Assistant 扩展&#xff0c;磕磕绊绊也升级完成了&#xff1b; 扩展链接&#xff1a; Upgrading…

前端拥抱AI:LangChain.js 入门遇山开路之PromptTemplate

PromptTemplate是什么 PromptTemplate是一个可重复使用的模板&#xff0c;用于生成引导模型生成特定输出的文本。与Prompt的区别: PromptTemplate相对于普通Prompt的优势&#xff0c;即其灵活性和可定制性。 简单了解PromptTemplate后&#xff0c;咱们就来聊聊LangChain里的P…

Hadoop 安装教程——单节点模式和分布式模式配置

文章目录 一、预备知识1.1 Hadoop 发行版本1.2 部署方式 二、预备条件2.1 环境准备2.2 创建新用户(可选)2.3 配置 SSH 无密码登录2.4 下载 Hadoop2.5 编辑 hadoop-env.sh 脚本2.6 编辑 dfs 和 yarn 脚本 三、单节点模式部署3.1 官方使用案例3.2 查看运行结果 四、伪分布模式部署…

golang 手动解析 epub 电子书格式

如题&#xff0c;本篇简单分析如何使用go语言解析epub格式的电子书&#xff0c;获取其内部资源内容。 EPUB格式 首先我们需要了解epub格式具有哪些特点。 已知的是&#xff0c;epub是一种类似doc或者pdf&#xff0c;可以提供图文并茂电子书的格式。 那么我们首先使用二进制…

重学SpringBoot3-集成Hazelcast

重学SpringBoot3-集成Hazelcast 1. Hazelcast 的作用2. Spring Boot 3 整合 Hazelcast 的步骤2.1 添加 Hazelcast 依赖2.2 配置 Hazelcast 实例 3. 集成 Hazelcast 与 Spring Boot 缓存4. 验证 Hazelcast 缓存5. Hazelcast 集群配置6. 总结 Hazelcast 是一个流行的开源内存数据…

Linux重点yum源配置

1.配置在线源 2.配置本地源 3.安装软件包 4.测试yum源配置 5.卸载软件包

浅谈人工智能之基于阿里云使用vllm搭建Llama3

浅谈人工智能之基于阿里云使用vllm搭建Llama3 引言 随着人工智能技术的迅速发展&#xff0c;Llama3作为一个先进的语言模型&#xff0c;受到广泛关注。本文将介绍如何在阿里云上使用VLLM搭建Llama3&#xff0c;为用户提供一套完整的技术流程。 环境准备 阿里云账户 确保您…

记录:网鼎杯2024赛前热身WEB01

目录扫描&#xff0c;发现上传点&#xff0c;判断可能存在文件上传漏洞&#xff0c;并根据文件后缀判断网站开发语言为php 编写蚁剑一句话木马直接上传 蚁剑连接 这里生成 的flag是随机的&#xff0c;因为烽火台反作弊会随机生成环境&#xff0c;在一顿查找后&#xff0c;在hom…

【部署篇】RabbitMq-03集群模式部署

一、准备主机 准备3台主机用于rabbitmq部署&#xff0c;文章中是在centos7上安装部署rabbitmq3.8通过文章中介绍的方式可以同样在centos8、centos9上部署&#xff0c;只需下载对应的版本进行相同的操作。 主机IP角色说明192.168.128.31种子节点192.168.128.32普通节点192.16…

【达梦数据库】两台或多台服务器之间免密登录设置-【dmdba用户】

目录 背景1、服务器A免密登录本机1.1、生成私钥&#xff08;id_rsa&#xff09;和公钥&#xff08;id_rsa.pub&#xff09;1.2、追加公钥到服务器A的密码登录权限管理文件1.3、结果验证 2、服务器A免密登录服务器B2.1、确认服务器B有目的文件夹2.2、服务器A的公钥复制到服务器B…

网安加·百家讲坛 | 徐一丁:金融机构网络安全合规浅析

作者简介&#xff1a;徐一丁&#xff0c;北京小西牛等保软件有限公司解决方案部总监&#xff0c;网络安全高级顾问。2000年开始从事网络安全工作&#xff0c;主要领域为网络安全法规标准研究、金融行业安全咨询与解决方案设计、信息科技风险管理评估等。对国家网络安全法规标准…

react18中的合成事件与浏览器中的原生事件

React 通过将事件 normalize 以让他们在不同浏览器中拥有一致的属性。 合成事件 SyntheticEvent 实例将被传递给你的事件处理函数&#xff0c;它是浏览器的原生事件的跨浏览器包装器。除兼容所有浏览器外&#xff0c;它还拥有和浏览器原生事件相同的接口&#xff0c;包括 stopP…