什么是数据同步利器DataX,如何使用?

转载至我的博客 https://www.infrastack.cn ,公众号:架构成长指南

今天给大家分享一个阿里开源的数据同步工具DataX,在Github拥有14.8k的star,非常受欢迎,官网地址:https://github.com/alibaba/DataX

什么是 Datax?

DataX 是阿里云 DataWorks数据集成 的开源版本,使用Java 语言编写,在阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、OceanBase、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS, databend 等各种异构数据源之间高效的数据同步功能。

应用场景有那些?

  1. 数据仓库同步:DataX 可以帮助将数据从一个数据仓库(如关系型数据库、大数据存储系统等)同步到另一个数据仓库,实现数据的迁移、备份或复制。
  2. 数据库迁移:当我们需要将数据从一个数据库平台迁移到另一个数据库平台时,DataX 可以帮助完成数据的转移和转换工作
  3. 数据集成与同步:DataX 可以用作数据集成工具,用于将多个数据源的数据进行整合和同步。它支持多种数据源,包括关系型数据库、NoSQL 数据库、文件系统等,可以将这些数据源的数据整合到一个目标数据源中。
  4. 数据清洗与转换:DataX 提供了丰富的数据转换能力,可以对数据进行清洗、过滤、映射、格式转换等操作。这对于数据仓库、数据湖和数据集市等数据存储和分析平台非常有用,可以帮助提高数据质量和一致性。
  5. 数据备份与恢复:DataX 可以用于定期备份和恢复数据。通过配置定时任务,可以将数据从源端备份到目标端,并在需要时进行数据恢复。

DataX支持那些数据源?

架构设计

DataX作为离线数据同步框架,采用Framework + plugin架构构建。将数据源读取和写入抽象成为Reader/Writer插件,纳入到整个同步框架中。

  • Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
  • Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
  • Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

DataX 开源版本支持单机多线程模式完成同步作业运行,如下图

  1. DataX完成单个数据同步的作业,称之为Job,DataX接受到一个Job之后,将启动一个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一个Task都会负责一部分数据的同步工作。
  3. 切分多个Task之后,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一个TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步工作。
  5. DataX作业运行起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值非0

DataX调度流程

举例来说,用户提交了一个DataX作业,并且配置了20个并发,目的是将一个100张表的mysql数据同步到odps里面。 DataX的调度决策是:

  1. Job根据分表切分成了100个Task。
  2. 根据20个并发,DataX计算需要分配4个TaskGroup。
  3. 4个TaskGroup平分切分好的100个Task,每一个TaskGroup负责5个并发共计运行25个Task。

如何使用 Datax?

点击datax 下载,下载后解压至本地某个目录,如下图

image-20240203222845753

用例说明

这里为了方便演示,我们同步MySQL的user_info表至MySQL的ods_test_mysql_user_info_m,同步条件为更新时间字段,如下

在实际工作中你可以选择不同类型的数据源测试


drop table ods_test_mysql_user_info_m

CREATE TABLE `user_info` (
  `id` int NOT NULL COMMENT 'ID',
  `name` varchar(50) NOT NULL COMMENT '名称',
  `sex` tinyint NOT NULL COMMENT '性别 1男 2女',
  `phone` varchar(11) COMMENT '手机',
	`address` varchar(1000)  COMMENT '地址',
	`age` int  COMMENT '年龄',
	`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',
  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息表';

CREATE TABLE `ods_test_mysql_user_info_m` (
  `id` int NOT NULL COMMENT 'ID',
  `name` varchar(50) NOT NULL COMMENT '名称',
  `sex` tinyint NOT NULL COMMENT '性别 1男 2女',
  `phone` varchar(11) COMMENT '手机',
	`address` varchar(1000)  COMMENT '地址',
	`age` int  COMMENT '年龄',
	`create_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT '创建时间',
  `update_time` datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) COMMENT '修改时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='用户信息数仓表';

在user_info表中插入数据如下

创建作业的配置文件(json格式)

在 datax 的 script 目录,创建ods_test_mysql_user_info_m.json文件,配置如下,mysqlreader表示读取端,mysqlwriter表示写入端

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],
            		         "splitPk": "id",
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false"],
                                "table": ["user_info"]
                            }
                        ],
                        "password": "root",
                        "username": "root",
                        "where": "update_time > '${updateTime}' "
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                       "writeMode": "replace",
                        "column": ["id","name","sex","phone","address","age","create_time","update_time"],
                        "connection": [
                            {
                                "jdbcUrl":"jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=false",
                                "table": ["ods_test_mysql_user_info_m"]
                            }
                        ],
                        "username": "root",
                        "password": "root",
                        "preSql": [],
                        "session": [
                          "set session sql_mode='ANSI'"
                        ]

                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "5"
            }
        }
    }
}

创建执行脚本

为了更贴合实际,写一个调度脚本sync.sh支持动态参数来执行任务

#!/bin/bash
## 执行示例 sh /Users/weizhao.dong/Documents/soft/datax/datax-script/call.sh /Users/weizhao.dong/Documents/soft/datax/datax-script/dwd_g2park_inout_report_s.json 1
jsonScript=$1
echo '执行脚本:'$jsonScript
interval=$2
echo "时间间隔(分钟):"$interval
now_time=$(date '+%Y-%m-%d %H:%M:%S')
echo "当前时间:"$now_time
update_time=$(date -v -${interval}M  '+%Y-%m-%d %H:%M:%S')
#linux 更新时间获取
#update_time=$(date -d "${now_time} $interval minute ago" +"%Y-%m-%d %H:%M:%S")
echo "更新时间:"$update_time
#执行
python3 /Users/weizhao.dong/Documents/soft/datax/bin/datax.py $jsonScript -p "-DupdateTime='${update_time}'"

假设我们要执以上ods_test_mysql_user_info_m.json脚本,并且同步十分钟之前的数据,如下

./sync.sh ods_test_mysql_user_info_m.json 10
测试

执行./sync.sh ods_test_mysql_user_info_m.json 10进行同步

以上结果可能有些人有疑问,就三条数据执行时间为 10s,其实这个 10s主要是初始化时间,耗时过长,同步的数据量多了优势就体现出来了,以下为实际生产同步数据结果,可以看到同步63102条耗时22s

推荐用法

以上我们只是通过一个简单的示例来演示了dataX如何使用,如果只是一次性同步,没问题,但是如果是周期性进行同步,有以下几种方式推荐

crontab调度

这种方式是最简单的,可以使用操作系统中的crontab定时调度,通过crontab -e编辑corn 任务,添加对应脚本即可

海豚调度器

在种方式在大数据领域用的比较多,典型场景就是 mysql 同步到数仓,海豚调度器内置了 datax 并且提供了图形化配置界面,配置起来非常方便

同时每次执行都有记录,并且都有对应的日志

定时任务框架(elasticjob/xxl-job)

在我们实际使用的业务系统定时调度框架都支持调度 shell 脚本,通过传入对应参数也可执行

扫描下面的二维码关注我们的微信公众帐号,在微信公众帐号中回复◉加群◉即可加入到我们的技术讨论群里面共同学习。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/391024.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Java程序设计】【C00265】基于Springboot的地方废物回收机制管理系统(有论文)

基于Springboot的地方废物回收机制管理系统(有论文) 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的地方废物回收机构管理系统 本系统分为管理员功能模块以及员工功能模块。 管理员功能模块:管理员登录系统后…

spring boot3登录开发-1(整合jwt)

⛰️个人主页: 蒾酒 🔥系列专栏:《spring boot实战》 🌊山高路远,行路漫漫,终有归途。 目录 前置条件 jwt简介 导依赖 编写jwt工具类 1.配置项直接嵌入代码,通过类名.静态方法使用 2.配置项写到…

算法刷题:无重复字符的最长字串

无重复字符的最长字串 .题目链接题目详情算法原理题目解析滑动窗口定义指针进窗口判断出窗口更新结果 我的答案 . 题目链接 无重复字符的最长字串 题目详情 算法原理 题目解析 首先,为了使字符串遍历的更加方便,我们选择将字符串转换为数组 题目要求子串中不能有重复的字符…

LaTeX中的documentclass命令:指定文档的类型和整体布局

诸神缄默不语-个人CSDN博文目录 documentclass 是 LaTeX 中一个基础且重要的命令,用于定义文档的整体布局和样式。这个命令告诉 LaTeX 编译器文档是属于哪一类的,比如是文章、报告、书籍等,每一类都有其预定义的格式和结构。 文章目录 基本语…

MongoDB从入门到实战之.NET Core使用MongoDB开发ToDoList系统(2)-Swagger框架集成

Swagger是什么? Swagger是一个规范且完整API文档管理框架,可以用于生成、描述和调用可视化的RESTful风格的 Web 服务。Swagger 的目标是对 REST API 定义一个标准且和语言无关的接口,可以让人和计算机拥有无须访问源码、文档或网络流量监测就…

JDBC 核心 API

引入 mysql-jdbc 驱动 驱动 jar 版本的选择:推荐使用 8.0.25,省略时区设置java 工程导入依赖 项目创建 lib 文件夹导入驱动依赖 jar 包jar 包右键 - 添加为库 JDBC 基本使用步骤 注册驱动获取连接创建发送 sql 语句对象发送 sql 语句,并获…

清华AutoGPT:掀起AI新浪潮,与GPT4.0一较高下

引言: 随着人工智能技术的飞速发展,自然语言处理(NLP)领域迎来了一个又一个突破。最近,清华大学研发的AutoGPT成为了业界的焦点。这款AI模型以其出色的性能,展现了中国在AI领域的强大实力。 目录 引言&…

字符串拼接 - 华为OD统一考试(C卷)

OD统一考试(C卷) 分值: 200分 题解: Java / Python / C 题目描述 给定 M 个字符( a-z ) ,从中取出任意字符(每个字符只能用一次)拼接成长度为 N 的字符串,要求相同的字符不能相邻。 计算出给定的字符列表…

突发!亚马逊创始人贝索斯抛售60亿美元股票,外网疑其或加仓比特币

号外:2.16教链内参《内参:OpenAI Sora惊艳发布,加密圈有人获利超700倍》 前世界首富、全球知名电商平台亚马逊(amazon)创始人杰夫贝索斯(Jeff Bezos)最近一周以来接连抛售自家公司股票&#xff…

BulingBuling[Beyond the To-Do List] - 《让金钱为你服务》 [ Make Money Work for You ]

与《财务自由: 赚到足够的钱的有效方法》作者Grant的简短访谈 让钱为你工作 超越待办事项清单 主持人:Erik Fisher Make Money Work for You Beyond the To-Do List Hosted by Erik Fisher 与Erik Fisher一起探索如何确定你生活中最大的财务杠杆以及使用它们的最佳方…

【Linux系统化学习】文件重定向

目录 文件内核对象 文件描述符的分配规则 重定向 重定向的概念 dup2系统调用 输出重定向 追加重定向 输入重定向 stderr解析 重定向到同一个文件中 分离常规输出和错输出 文件内核对象 上篇文章中我们介绍到了操作系统中的文件,操作系统为了方…

什么是智慧公厕,智慧公厕有哪些功能

1.什么是智慧公厕? 随着智慧城市的快速发展,公共厕所作为城市基础设施的一部分,也在逐步升级转型。那么,什么是智慧公厕?智慧公厕作为智慧城市的重要组成部分,将公共厕所的建设、设计、使用、运营和管理等…

报错405(errAxiosError: Request failed with status code 405)

errAxiosError: Request failed with status code 405 前端调用接口的方法跟后台定义接口的方法不一致

docker (四)-docker网络

默认网络 docker会自动创建三个网络,bridge,host,none bridge桥接网络 如果不指定,新创建的容器默认将连接到bridge网络。 默认情况下,使用bridge网络,宿主机可以ping通容器ip,容器中也能ping通宿主机。 容器之间只…

UE4学习笔记 FPS游戏制作5 动画蒙太奇制作开枪动画

创建一个蒙太奇 选择角色的骨骼,并重命名 编辑蒙太奇 将我们需要的动画拖动到Default下的两个白杠的上边那个里 然后在下方的Sections节点中,点击Preview后的Default,选中后,再点击PreviewAllScetions上百年的长的绿色的Defalut&…

使用miniconda管理Python环境

之前经常使用pipenv管理虚拟环境,但是有一个问题就是代码给别人使用的时候,别人使用的Python版本和自己的不一致时,安装依赖包的时候会有问题。所以现在使用miniconda来管理虚拟环境,不仅小巧方便,还能为每个环境指定不…

Gitee入门之工具的安装

一、gitee是什么? Gitee(码云)是由开源中国社区在2013年推出的一个基于Git的代码托管平台,它提供中国本土化的代码托管服务。它旨在为个人、团队和企业提供稳定、高效、安全的云端软件开发协作平台,具备代码质量分析、…

LeetCode 100题目(python版本)待续...

一.哈希 1.两数之和 题目 给定一个整数数组 nums 和一个整数目标值 target,请你在该数组中找出 和为目标值 target 的那 两个 整数,并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数组中同一个元素在答案里不能重复…

【LeetCode: 103. 二叉树的锯齿形层序遍历 + BFS】

🚀 算法题 🚀 🌲 算法刷题专栏 | 面试必备算法 | 面试高频算法 🍀 🌲 越难的东西,越要努力坚持,因为它具有很高的价值,算法就是这样✨ 🌲 作者简介:硕风和炜,…

QGIS004:【10栅格地形分析工具箱】-坡度、坡向、山体阴影

摘要:QGIS栅格地形分析工具箱常用工具有坡度、坡向、山体阴影等选项,本文介绍各选项的基本操作。 实验数据: 链接:https://pan.baidu.com/s/1gYZ_om4AlSdal0bts2mt-A?pwd4rrn 提取码:4rrn 一、坡度 工具功能&…