第3.6章:StarRocks数据导入——DataX StarRocksWriter

一、Datax

1.1 DataX 3.0概述

 DataX3.0是一个异构数据源离线同步工具,可以方便的对各种异构数据源进行高效的数据同步。   其github地址为:

https://github.com/alibaba/DataX/blob/master/introduction.mdicon-default.png?t=N7T8https://github.com/alibaba/DataX/blob/master/introduction.md

GitCode - 开发者的代码家园icon-default.png?t=N7T8https://gitcode.com/alibaba/datax/overview

1.2 DataX3.0框架设计

DataX将复杂的网状的同步链路变成了星型数据链路,DataX自身作为中间传输载体负责连接各种数据源,解决了异构数据源同步问题。Datax采用的是

   DataX本身作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中:

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer:Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

1.3 DataX3.0核心架构

    DataX 3.0 开源版本支持单机多线程模式完成同步作业运行。基于DataX作业生命周期的时序图,从整体架构设计角度来阐述DataX各个模块相互关系。

1.3.1 核心模块介绍

  • DataX完成单个数据同步的作业,我们称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  • DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  • 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  • 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  • DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0。

 1.3.2 DataX调度流程

     用户提交了一个DataX作业,并且配置了DataX Channel并发数为20个,需求是将一个100张分表的mysql数据同步到starrocks里面, 则DataX的调度决策思路是:

  • DataXJob根据分库分表切分成了100个Task。
  • 根据20个并发,DataX计算共需要分配4个TaskGroup。
  • 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责以5个并发共计运行25个Task。

二、StarRocksWriter

   DataX基于StarRocks开发的StarRocksWriter插件支持MySQL、Oracle等数据库中的数据导入至 StarRocks。在底层实现上,StarRocksWriter内部将各种reader读取的数据进行缓存攒批(以csv或 json格式),之后采用Stream Load 方式批量导入至 StarRocks。总体的数据流是Source -->Reader -->DataX channel --> Writer ---> StarRocks

 官网文章地址:

使用 DataX 导入 | StarRocks

三、创建配置文件

 为导入作业创建 JSON 格式配置文件, 这里列举几种Datax同步脚本。

(1)同步oracle数据至starrocks:oracle2starrocks.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "oraclereader",
          "parameter": {
            "username": "root",
            "password": "root",
            "connection": [
              {
                "querySql": [
                  "select fid,f_diccode,concat(substr(qhcode,1,2),'0000') as partition_no from nannd.test1"
                ],
                "jdbcUrl": [
                  "jdbc:oracle:thin:@192.168.22.115:1521/init"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "database": "",
            "table": "test2",
            "column": [
              "fid",
              "f_diccode",
              "partition_no"
            ],
            "preSql": ["truncate table des.test2"],
            "postSql": [],
            "jdbcUrl": "jdbc:mysql://192.168.10.103:9030",
            "loadUrl": [
              "192.168.10.101:8030",
              "192.168.10.102:8030",
              "192.168.10.103:8030"
            ],
            "loadProps": {
              "format": "json",
              "strip_outer_array": true
            }
          }
        }
      }
    ]
  }
}
  • OracleReader的配置说明见:

 https://github.com/alibaba/DataX/blob/master/introduction.md

https://github.com/alibaba/DataX/blob/master/oraclereader/doc/oraclereader.md

  • StarRocksWriter的配置说明见:官网

使用 DataX 导入 | StarRocks

(2)同步mysql库的数据至starrocks:mysql2starrocks.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
                        "OBJECTID",
                        "xmmc",
                        "shengmc",
                        "shimc",
                        "xianmc",
        
            ],
            "connection": [
              {
                "table": [
                  "init2.test6"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://192.168.22.156:3306/init2"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "database": "des3",
            "table": "test7",
            "column": [
                         "OBJECTID",
                         "shengmc",
                         "shimc",
                         "xianmc",
            ],
            "preSql": [],
            "postSql": [],
            "jdbcUrl": "",
            "loadUrl": [
              "192.168.10.101:8030",
              "192.168.10.102:8030",
              "192.168.10.103:8030"
            ],
            "loadProps": {
              "format": "json",
              "strip_outer_array": true
            }
          }
        }
      }
    ]
  }
}

  • MysqlReader的配置说明见:

https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

  • StarRocksWriter的配置说明见:官网

(3)同步tidb库的数据至starrocks:tidb2starrocks.json

{
  "job": {
    "setting": {
      "speed": {
        "channel": 1
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0
      }
    },
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root@sq2023",
            "connection": [
              {
                "querySql": [
                  "select id,member_id,create_time,update_time,now() as run_dt from test2"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://192.168.22.143:4000/init1"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "starrockswriter",
          "parameter": {
            "username": "root",
            "password": "root",
            "database": "des1",
            "table": "test3",
            "column": [
              "id",
              "member_id",
              "create_time",
              "update_time",
              "run_dt"
            ],
            "preSql": [],
            "postSql": [],
            "jdbcUrl": "",
            "loadUrl": [
              "192.168.10.101:8030",
              "192.168.10.102:8030",
              "192.168.10.103:8030"
            ],
            "loadProps": {
              "format": "json",
              "strip_outer_array": true
            }
          }
        }
      }
    ]
  }
}

 ps:从tidb数据读取数据,采用的read插件还是MysqlReder,不赘述。

四、常见问题记录

4.1 常规排查方案

   例如:针对配置文件job.json启动导入任务,设置JVM 调优参数(--jvm="-Xms6G -Xmx6G")以及日志等级(--loglevel=debug),日志等级用来任务失败时打印更详细的作业执行信息

python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

4.2 时区问题

    如果源数据库与目标数据库时区不同,需要命令行中添加 -Duser.timezone=GMTxxx 选项设置源数据库的时区信息。例如,源库使用 UTC 时区,则启动任务时需添加参数 -Duser.timezone=GMT+0

4.3 性能调优

4.3.1 合理拆分任务

    合理配置任务参数,让DataX任务拆分为多个Task,同时,提升DataX Channel并发数。以mysqlreader为例,就要合理配置splitPk参数,如果splitPk不填写(包括不提供splitPk或者splitPk值为空),DataX会视作使用单通道同步该表数据。

4.3.2 配置堆内存

   当提升DataX Job内Channel并发数时,内存的占用也会显著增加,因为DataX作为数据交换通道,在内存中会缓存较多的数据。例如Channel中会有一个Buffer,作为临时的数据交换的缓冲区,而在部分Reader和Writer的中,也会存在一些Buffer,为了防止OOM等错误,调大JVM的堆内存。调整JVM xms xmx参数的两种方式:一种是直接更改datax.py脚本;另一种是在启动的时候,在命令行添加对应的参数,如下:(xms:初始化堆内存; xmx:堆最大内存)

python datax/bin/datax.py --jvm="-Xms6G -Xmx6G" --loglevel=debug datax/job/job.json

ps:建议将初始化堆内存与堆最大内存配置一致,这样可以让同步数据处理起来更快,也可以避免内存的抖动。

4.3.3 任务限速

  使用DataX进行数据同步的另一个优势是可以限速,进而降低同步过程中对业务库的压力影响。DataX3.0提供了包括通道(并发)、记录流、字节流三种流控模式,可以方便的控制同步作业速度,让同步作业在库可以承受的范围内达到最佳的同步速度。以最常用的字节流限速为例:

  • 修改datax/conf/core.json,限制单个chanel的速度为2M (2*1024*1024= 2097152 byte):

"speed": {

   "byte": 2097152,

   },
  • 同时修改作业json部分的速度限制,例如限制为4M(这样任务会用4/2=2个channel并发进行任务),修改:
    "job": {

        "setting": {

            "speed": {

                "byte" : 4194304
            }
        },
        ...
    }
  • 以及:
"speed": {

   "channel": 5,

   "byte": 1048576,

   "record": 10000

}

4.3.4 读取StarRocks数据

   StarRocks兼容MySQL协议,当我们需要将StarRocks中的数据同步至其他数据库时,可以使用mysqlreader来直接读取,但这种JDBC的方式性能可能不是很好,推荐Flink Connector或者Spark Connector来进行处理。

参考文章:

第3.5章:StarRocks数据导入--DataX StarRocksWriter_datax-starrockswriter-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/404579.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java代码审计】XSS漏洞

1. XSS漏洞 XSS(Cross Site Scripting,为了和层叠样式表(Cascading Style Sheet,CSS)有所区分,故称XSS)跨站脚本攻击是一种针对网站应用程序的安全漏洞攻击技术。它可以实现用户会话劫持、钓鱼攻击、恶意重…

罗克韦尔AB的PLC实现ModbusTCP和ModbusRTU协议标签方式通讯

本文是通过IGT-DSER智能网关读写AB罗克韦尔Compact、Control系列PLC的标签数据缓存并转为Modbus从站协议,与上位机通讯的案例。 打开智能网关的参数软件(下载地址),通过功能->数据转发与平台对接,再选择数据转发与缓存’,进入以…

浏览器录屏技术:探索网页内容的视觉记录之道

title: 浏览器录屏技术:探索网页内容的视觉记录之道 date: 2024/2/23 14:32:49 updated: 2024/2/23 14:32:49 tags: 浏览器录屏技术原理Web API应用场景用户体验在线教育产品演示 在当今数字化时代,浏览器录屏技术已经成为了一种强大的工具,…

基于springboot+vue的视频网站系统(前后端分离)

博主主页:猫头鹰源码 博主简介:Java领域优质创作者、CSDN博客专家、阿里云专家博主、公司架构师、全网粉丝5万、专注Java技术领域和毕业设计项目实战,欢迎高校老师\讲师\同行交流合作 ​主要内容:毕业设计(Javaweb项目|小程序|Pyt…

力扣随笔之按奇偶排序数组(简单905)

思路1:根据双指针对撞指针的思路,定义一个左指针从数组前端开始遍历,定义一个右指针从后端开始遍历,这时候有四种情况 左奇右偶:这种情况需要将其位置交换,将偶数提前,奇数后移左奇右奇&#xf…

【Lazy ORM】 小工具 acw 本地客户端 你负责点击页面,他负责输出代码

介绍 wu-smart-acw-client 简称acw-client&#xff0c;是一个基于Lazy ORM定制的客户端代码生成小工具 Lazy ORM 小工具 acw 本地客户端 你负责点击页面&#xff0c;他负责输出代码安装 <dependency><groupId>top.wu2020</groupId><artifactId>wu-sma…

Qt_快速安装指南

下载Qt在线安装程序&#xff08;不仔细介绍&#xff09;注册Qt账号&#xff08;不仔细介绍&#xff09;使用快速运行的命令&#xff0c;按照指定的下载地址下载 在Qt指定目录打开cmd命令窗口.\eqt-unified-windows-x86-4.0.1-1-online. exe --mirror https://mirrors.ustc.edu.…

计算机设计大赛 深度学习卷积神经网络垃圾分类系统 - 深度学习 神经网络 图像识别 垃圾分类 算法 小程序

文章目录 0 简介1 背景意义2 数据集3 数据探索4 数据增广(数据集补充)5 垃圾图像分类5.1 迁移学习5.1.1 什么是迁移学习&#xff1f;5.1.2 为什么要迁移学习&#xff1f; 5.2 模型选择5.3 训练环境5.3.1 硬件配置5.3.2 软件配置 5.4 训练过程5.5 模型分类效果(PC端) 6 构建垃圾…

Cesium 展示——加载 tileset.json 格式的模型数据

文章目录 需求分析需求 已给 tileset.json 文件,现需加载该模型文件, 该模型特点:模型上的各模块均可以进行点击设置,且相机视角拉近后可以看到内部隐藏的物件模块 分析 tileset.json :模型数据【模型加载】方法export function init3dTileLayer (option) {var tilesetMo…

【SpringBoot3】Spring Security 常用配置总结

注&#xff1a;本文基于Spring Boot 3.2.1 以及 Spring Security 6.2.1 相关文章 【SpringBoot3】Spring Security 核心概念 【SpringBoot3】Spring Security 常用注解 【SpringBoot3】Spring Security 详细使用实例&#xff08;简单使用、JWT模式&#xff09; 【SpringBoot3】…

ChatGPT调教指南 | 咒语指南 | Prompts提示词教程(二)

在我们开始探索人工智能的世界时&#xff0c;了解如何与之有效沉浸交流是至关重要的。想象一下&#xff0c;你手中有一把钥匙&#xff0c;可以解锁与OpenAI的GPT模型沟通的无限可能。这把钥匙就是——正确的提示词&#xff08;prompts&#xff09;。无论你是AI领域的新手&#…

flinksql 流表转换, 自定义udf/udtf,SQL 内置函数及自定义函数

flinksql 流表转换&#xff0c; 自定义udf/udtf 1、标量函数2、表函数3、聚合函数4、表聚合函数 1、在大多数情况下&#xff0c;用户定义的函数必须先注册&#xff0c;然后才能在查询中使用。不需要专门为 Scala 的 Table API 注册函数。 2、函数通过调用 registerFunction&am…

yolov9目标检测报错AttributeError: ‘list‘ object has no attribute ‘device‘

最近微智启软件工作室在运行yolov9目标检测的detect.py测试代码时&#xff0c;报错&#xff1a; File “G:\down\yolov9-main\yolov9-main\detect.py”, line 102, in run pred non_max_suppression(pred, conf_thres, iou_thres, classes, agnostic_nms, max_detmax_det) Fil…

Python urllib、requests、HTMLParser

HTTP协议 HTTP 协议&#xff1a;一般指HTTP(超文本传输)协议。 HTTP是为Web浏览器和Web服务器之间的通信而设计的&#xff0c;基于TCP/IP通信协议嘞传递数据。 HTTP消息结构 客户端请求消息 客户端发送一个HTTP请求到服务器的请求消息包括以下格式 请求行(request line)请求…

排序算法之——归并排序

归并排序 1. 基本思想2. 数据的分解3. 数据的合并4.归并排序的实现4.1 递归实现4.1.1 一个易错点4.1.2 运行结果 4.2 非递归实现4.2.1 图示思路4.2.2 代码实现4.2.3 一个易错点4.2.4 修改后的代码4.2.5 运行结果 6. 时间复杂度7. 空间复杂度8. 稳定性9. 动图演示 1. 基本思想 …

h-table(表格列表组件的全封装)

文章目录 概要h-table的封装过程查询组件封装 h-highForm结果页右侧工具栏封装RightToolbar结果页列表组件h-table结果页vue页面使用js文件有需要的请私信博主&#xff0c;还请麻烦给个关注&#xff0c;博主不定期更新组件封装&#xff0c;或许能够有所帮助&#xff01;&#x…

如何用GPT进行成像光谱遥感数据处理?

第一&#xff1a;遥感科学 从摄影侦察到卫星图像 遥感的基本原理 遥感的典型应用 第二&#xff1a;ChatGPT ChatGPT可以做什么&#xff1f; ChatGPT演示使用 ChatGPT的未来 第三&#xff1a;prompt 提示词 Prompt技巧&#xff08;大几岁&#xff09; 最好的原则和策…

互动游戏团队如何将性能体验优化做到TOP级别

一、背景 随着互动游戏业务 DAU 量级增加&#xff0c;性能和体验重要性也越发重要&#xff0c;好的性能和体验不仅可以增加用户使用体感&#xff0c;也可以增加用户对于互动游戏的使用粘性。 对现状分析&#xff0c;主要存在首屏渲染速度慢、打开页面存在白屏、页面加载过多资…

app测试必掌握的核心测试:UI、功能测试!

一、UI测试 UI即User Interface (用户界面)的简称。UI 设计则是指对软件的人机交互、操作逻辑、界面美观的整体设计。好的UI设计不仅是让软件变得有个性有品味,还要让软件的操作变得舒适、简单、自由、充分体现软件的定位和特点。手机APP从启动界面开始, 到运行过程,直至退出,…

聊聊mysql的七种日志

进入正题前,可以先简单介绍一下,MySQL的逻辑架构, MySQL的逻辑架构大致可以分为三层: 第一层:处理客户端连接、授权认证,安全校验等。第二层:服务器 server 层,负责对SQL解释、分析、优化、执行操作引擎等。第三层:存储引擎,负责MySQL中数据的存储和提取。我们要知道…