前言
一次我发现业务他们在用 datax数据同步工具,我尤记得曾经 19 年使用过,并且基于当时的版本还修复了个 BUG并且做了数据同步管道的集成开发。没想到时间过的飞快,业务方基于海豚调度 2.0.6 的版本中有在使用,由于业务方还没有一个 web页面(虽然有开源的 DataX Web),一切又得从新开始,以下是拾遗的一部分,算作记录,后面也会对优秀的 seatunnel做些验证,有人做过 BenchMark,据说性能提升不止一点点。
搞了这么久的数据 ETL,现在的轮子真的是越来越好用了,开箱即用,能满足各种数据源的同步,本节还是讲 Datax的初步使用,后续有时间在出分析源码的文章。
扯了一些废话,以下是正文。
DataX是阿里巴巴开发的一款分布式离线数据同步工具,旨在实现多种数据源之间高效的数据交换。作为一个开源项目,DataX不仅支持多种数据源(如关系型数据库、NoSQL数据库、云存储等),还提供了灵活的插件架构,便于扩展和定制化开发。其核心功能包括数据的高效传输、增量和全量数据同步、断点续传等,这使得DataX在大数据处理中扮演了重要角色。
目前而言,结合DataX与海豚调度,可以构建一个高效、稳定的数据同步与调度平台。在实际应用中,用户可以利用DataX完成数据的抽取、转换和加载(ETL),并通过海豚调度对这些ETL任务进行统一管理和调度。这样不仅提高了数据同步的效率,还增强了数据处理流程的可靠性和可维护性。
DataX是什么?
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
详细内容请看官方介绍
一、DataX原理
DataX的实现原理是基于以下机制来实现的,
-
插件机制: DataX采用插件机制,主要包括Reader插件(数据读取模块)和Writer插件(数据写入模块)。通过这种插件化的设计,DataX可以轻松扩展支持新的数据源。常见的Reader和Writer插件包括MySQLReader、MySQLWriter、HDFSReader、HDFSWriter等。
-
配置驱动: DataX使用JSON格式的配置文件来定义数据同步任务。配置文件中包含Reader和Writer的具体配置信息(如连接方式、查询语句、目标表名等),以及任务的执行规则(如并行度、错误重试等)。
-
任务编排与执行: 每次DataX启动时,会根据配置文件创建并执行一个或多个同步任务。DataX的任务执行引擎负责协调Reader和Writer插件的执行:
-
Reader:读取源数据并封装成中间数据格式传递给执行引擎。
-
Transform(可选):执行数据转换的逻辑(如字段映射、数据转换等),在Reader和Writer之间传递数据。
-
Writer:接收经过映射转换的数据并写入到目标数据源。
并发与数据分片: DataX支持并发和数据分片,以提高数据同步的效率。具体方法包括: -
并发任务:DataX可以将一个大任务拆分成多个小任务,并发执行。
-
数据分片:对于大数据量的同步,DataX会将数据按一定规则分片(比如按ID范围或时间范围分片),并生成多个数据同步子任务,每个子任务并行运行。
-
容错与重试: DataX提供了多级容错机制:
- 任务级重试:如果任务执行失败,可以设置重试次数。
- 记录级容错:在数据同步过程中,单条记录的异常不会导致整个任务失败,而是会记录下错误信息并继续同步其他数据。
-
监控与日志: DataX提供详细的监控和日志功能,记录同步任务的执行情况,包括数据量、任务状态、错误信息等。这些信息有助于运维人员实时了解任务的执行情况,及时发现和解决问题。
总体来说,DataX通过插件化设计、配置驱动、并发数据同步和多级容错机制,保证了数据同步的灵活性、高效性和可靠性。
二、如何使用?
在此之前,我们先创一个外部表作为映射路径:
以TEXTFILE为例:
CREATE external TABLE IF NOT EXISTS test.crm_fund_test1 (
id bigint comment 'id',
uid STRING COMMENT 'UID',
cust_no STRING COMMENT '客户号',
data_type TINYINT COMMENT '数据类型'
) STORED as textfile
location 'hdfs://localhost:9000/tmp/fund_ext'
1. mysql2hive text格式 测试
创建同步配置文件:mysql2hive.json
如下(外部表,追加方式):
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "root",
"column": [
"id",
"uid",
"cust_no",
"data_type"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8",
"table": [
"crm_fund_threshold"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "/tmp/fund_ext",
"defaultFS": "hdfs://localhost:9000",
"fileType": "text",
"writeMode": "append",
"compress":"NONE",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
},
{
"name": "cust_no",
"type": "string"
},
{
"name": "data_type",
"type": "tinyint"
}
]
}
}
}
]
}
}
遇到报错如下:
2024-06-30 17:10:54.163 [job-0] ERROR JobContainer - Exception when job run
java.lang.ClassCastException: java.lang.String cannot be cast to java.util.List
at com.alibaba.datax.common.util.Configuration.getList(Configuration.java:435) ~[datax-common-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.reader.util.OriginalConfPretreatmentUtil.dealJdbcAndTable(OriginalConfPretreatmentUtil.java:85) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.reader.util.OriginalConfPretreatmentUtil.simplifyConf(OriginalConfPretreatmentUtil.java:59) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.reader.util.OriginalConfPretreatmentUtil.doPretreatment(OriginalConfPretreatmentUtil.java:33) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader$Job.init(CommonRdbmsReader.java:55) ~[plugin-rdbms-util-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.plugin.reader.mysqlreader.MysqlReader$Job.init(MysqlReader.java:37) ~[mysqlreader-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.initJobReader(JobContainer.java:673) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.init(JobContainer.java:303) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.job.JobContainer.start(JobContainer.java:113) ~[datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.start(Engine.java:86) [datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.entry(Engine.java:168) [datax-core-0.0.1-SNAPSHOT.jar:na]
at com.alibaba.datax.core.Engine.main(Engine.java:201) [datax-core-0.0.1-SNAPSHOT.jar:na]
2024-06-30 17:10:54.167 [job-0] INFO StandAloneJobContainerCommunicator - Total 0 records, 0 bytes | Speed 0B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 0.00%
注意我们这里是从 mysql同步到 hive,怀疑是没有加MySQL 驱动导致的问题。
下载并放到datax安装目录的 lib下:
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.33/mysql-connector-java-8.0.30.jar
再次尝试报错依旧,查看下模板文件做下对比试试:
➜ bin python3 ./datax.py -r mysqlreader -w hdfswriter
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"setting": {
"content": [
"name": "mysqlreader",
"username": "root",
}
"column": [],
"connection": [
"setting": {
}
"name": "mysqlreader",
"username": "root",
"crm_fund_threshold"
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
仔细对比了下,应该是JdbcUrl这里出的问题, java.lang.String cannot be cast to java.util.List ,也就是这里需要的是一个数组,于是修改。
再次又遇到新的报错:
2024-06-30 17:32:05.790 [job-0] ERROR JobContainer - Exception when job run
com.alibaba.datax.common.exception.DataXException: Code:[HdfsWriter-02], Description:[您填写的参数值不合法.]. - 目前TEXT FILE仅支持GZIP、BZIP2 两种压缩, 不支持您配置的 compress 模式 : [NONE]
显然,不存在 NONE 的压缩类型,修改为 GZIP 这一次成功同步,如下:
➜ job python3 ../bin/datax.py mysql2hive.json
# 最终正确的版本:
2024-06-30 17:36:57.357 [main] INFO Engine -
{
"setting":{
"speed":{
"channel":1
}
},
"content":[
{
"reader":{
"name":"mysqlreader",
"parameter":{
"where":"",
"username":"root",
"password":"****",
"column":[
"id",
"uid",
"cust_no",
"data_type"
],
"connection":[
{
"jdbcUrl":[
"jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8"
],
"table":[
"crm_fund_threshold"
]
}
]
}
},
"writer":{
"name":"hdfswriter",
"parameter":{
"path":"/tmp/fund_ext",
"defaultFS":"hdfs://localhost:9000",
"fileType":"text",
"writeMode":"append",
"compress":"GZIP",
"encoding":"UTF-8",
"fileName":"fund_crm_ext",
"fieldDelimiter":",",
"column":[
{
"name":"id",
"type":"bigint"
},
{
"name":"uid",
"type":"string"
},
{
"name":"cust_no",
"type":"string"
},
{
"name":"data_type",
"type":"tinyint"
}
]
}
}
}
]
}
2024-06-30 17:37:09.020 [job-0] INFO JobContainer - PerfTrace not enable!
2024-06-30 17:37:09.021 [job-0] INFO StandAloneJobContainerCommunicator - Total 1 records, 14 bytes | Speed 1B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2024-06-30 17:37:09.022 [job-0] INFO JobContainer -
任务启动时刻 : 2024-06-30 17:36:57
任务结束时刻 : 2024-06-30 17:37:09
任务总计耗时 : 11s
任务平均流量 : 1B/s
记录写入速度 : 0rec/s
读出记录总数 : 1
读写失败总数 : 0
不幸的是虽然同步成功了,但是结果却是 NULL,why ?
hive> select * from test.crm_fund_test1;
OK
NULL NULL NULL NULL
这是因我们指定的上传文件的分割形式是逗号分割,但是建表的时候又没有指定,所以就出现了这个问题,解决很简单。
hive> CREATE external TABLE IF NOT EXISTS test.crm_fund_test2(
> id bigint comment 'id',
> uid STRING COMMENT 'UID',
> cust_no STRING COMMENT '客户号',
> data_type TINYINT COMMENT '数据类型'
> ) row format delimited fields terminated by ','
> location 'hdfs://localhost:9000/tmp/fund_ext';
OK
Time taken: 0.219 seconds
再次运行同步命令测试,注意之前虽然我们删除了表重新创建,但由于是外部表并不会删除数据,又因为是追加模式,所以出现了多条同样的的数据,如下。
hive> select * from crm_fund_test2;
OK
23 22 123132132 2
23 22 123132132 2
23 22 123132132 2
Time taken: 0.12 seconds, Fetched: 3 row(s)
我这里修改为 truncate后变为了 1 条(MySQL 原表就只有一条)
hive> select * from crm_fund_test2;
OK
23 22 123132132 2
Time taken: 0.104 seconds, Fetched: 1 row(s)
且看日志,同名的文件会被覆盖:
2024-06-30 18:09:42.526 [job-0] INFO JobContainer - DataX Writer.Job [hdfswriter] do prepare work .
2024-06-30 18:09:42.649 [job-0] INFO HdfsWriter$Job - 由于您配置了writeMode truncate, [/tmp/fund_ext] 下面的内容将被覆盖重写
2024-06-30 18:09:42.649 [job-0] INFO HdfsWriter$Job - delete file [hdfs://localhost:9000/tmp/fund_ext/fund_crm_ext__7c76b99c_2322_40a8_9b30_cb0a84ac47a1.gz].
2024-06-30 18:09:42.657 [job-0] INFO HdfsWriter$Job - delete file [hdfs://localhost:9000/tmp/fund_ext/fund_crm_ext__9be79eb4_39ae_47a5_9634_ea43a5a73aed.gz].
2024-06-30 18:09:42.659 [job-0] INFO HdfsWriter$Job - delete file [hdfs://localhost:9000/tmp/fund_ext/fund_crm_ext__ca1346f6_2d23_4cea_ae1c_1d8f961aee8f.gz].
2024-06-30 18:09:42.660 [job-0] INFO JobContainer - jobContainer starts to do split ...
2024-06-30 18:09:42.661 [job-0] INFO JobContainer - Job set Channel-Number to 1 channels.
2. mysql2hive orc
代码如下(示例):
{
"job": {
"setting": {
"speed": {
"channel": 1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"where": "",
"username": "root",
"password": "root",
"column": [
"id",
"uid",
"cust_no",
"data_type"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8"],
"table": [
"crm_fund_threshold"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"path": "/tmp/fund_ext2",
"defaultFS": "hdfs://localhost:9000",
"fileType": "orc",
"writeMode": "append",
"compress":"NONE",
"encoding": "UTF-8",
"fileName":"fund_crm_ext2",
"fieldDelimiter": ",",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "uid",
"type": "string"
},
{
"name": "cust_no",
"type": "string"
},
{
"name": "data_type",
"type": "tinyint"
}
]
}
}
}
]
}
}
测试结果:
CREATE external TABLE IF NOT EXISTS test.crm_fund_test2(
id bigint comment 'id',
uid STRING COMMENT 'UID',
cust_no STRING COMMENT '客户号',
data_type TINYINT COMMENT '数据类型'
) STORED as orc
location 'hdfs://localhost:9000/tmp/fund_ext2';
# ORC 格式验证没有问题
hive> select * from test.crm_fund_test2;
OK
23 22 123132132 2
最后看下文件形式:
hive> dfs -ls /tmp/fund_ext;
Found 1 items
-rw-r--r-- 3 mac supergroup 36 2024-06-30 18:13 /tmp/fund_ext/fund_crm_ext__81ce986b_6ff2_4bca_b044_d863447698cc.gz
从上面可见,文件存储采用了我们配置的 GZIP 压缩,并且使用了我们自定义的文件名前缀。
总结
本节内容的一个目标就是快速将 datax用起来,当然如果你有海豚,可以自行配置下结合起来测测看,后面我会将这块内容更新上。我尝试将 datax的 lib下放 MySQL 驱动包,基于当前最新的版本从实际测来看,真的不需要,因为 mysqlreader的插件已经包含了mysql-connector-java-5.1.47.jar,如下所示:
➜ mysqlreader ll libs
total 18288
-rw-r--r--@ 1 mac staff 506K 8 21 2023 commons-collections-3.0.jar
-rw-r--r--@ 1 mac staff 181K 8 21 2023 commons-io-2.4.jar
-rw-r--r--@ 1 mac staff 403K 8 21 2023 commons-lang3-3.3.2.jar
-rw-r--r--@ 1 mac staff 1.5M 8 21 2023 commons-math3-3.1.1.jar
-rw-r--r--@ 1 mac staff 115K 8 21 2023 datax-common-0.0.1-SNAPSHOT.jar
-rw-r--r--@ 1 mac staff 1.9M 8 21 2023 druid-1.0.15.jar
-rw-r--r--@ 1 mac staff 1.7M 8 21 2023 fastjson2-2.0.23.jar
-rw-r--r--@ 1 mac staff 913K 8 21 2023 guava-r05.jar
-rw-r--r--@ 1 mac staff 44K 8 21 2023 hamcrest-core-1.3.jar
-rw-r--r--@ 1 mac staff 258K 8 21 2023 logback-classic-1.0.13.jar
-rw-r--r--@ 1 mac staff 409K 8 21 2023 logback-core-1.0.13.jar
-rw-r--r--@ 1 mac staff 984K 8 21 2023 mysql-connector-java-5.1.47.jar
-rw-r--r--@ 1 mac staff 93K 8 21 2023 plugin-rdbms-util-0.0.1-SNAPSHOT.jar
-rw-r--r--@ 1 mac staff 31K 8 21 2023 slf4j-api-1.7.10.jar