DataX学习
DataX3.0概览
DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。学习可见官网资料(https://github.com/alibaba/DataX)。
设计理念:
为了解决异构数据源同步问题,DataX将复杂的网状的同步链路变成了星型数据链路,DataX作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到DataX,便能跟已有的数据源做到无缝数据同步。
DataX框架设计
- Reader:Reader为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
- Writer: Writer为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
- Framework:Framework用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
DataX插件体系
DataX核心架构
DataX六大核心优势
可靠的数据质量监控
丰富的数据转换功能
精准的速度控制
强劲的同步性能
健壮的容错机制
极简的使用体验
DataX与sqoop对比
功能 | DataX | Sqoop |
---|---|---|
运行模式 | 单进程多线程 | MR |
分布式 | 不支持,可以通过调度系统规避 | 支持 |
流控 | 有流控功能 | 需要定制 |
统计信息 | 已有一些统计,上报需定制 | 没有,分布式的数据收集不方便 |
数据校验 | 在core部分有校验功能 | 没有,分布式的数据收集不方便 |
监控 | 需要定制 | 需要定制 |
DataX部署
1.下载datax jar包
地址:(https://github.com/alibaba/DataX?tab=readme-ov-file)
2.上传jar包 解压并配置环境变量
tar -zxvf datax.tar.gz
#添加环境变量
DATAX_HOME=/usr/local/soft/datax
export PATH=$DATAX_HOME/bin:$PATH
#生效
source /etc/profile
3.自检
#给bin目录下的datax.py赋予执行权限
chmod +x datax.py
#查看是否成功安装
python ./bin/datax.py ./job/job.json
DataX使用
在bigdata30目录下创建datax_jsons文件夹用于存储脚本
mkdir datax_jsons
小例子:创建stream2stream.json
{
"job": {
"setting": {
"speed": {
"channel":1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "我是",
"type": "string"
},
{
"value": "大",
"type": "string"
},
{
"value": "帅",
"type": "string"
},
{
"value": "哥",
"type": "string"
},
{
"value": "哈哈哈",
"type": "string"
}
],
"sliceRecordCount": 10 // 打印次数
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true, //是否打印
"encoding": "UTF-8"
}
}
}
]
}
}
运行 datax.py stream2stream.json
结果:
DataX配置文件格式
编写json文件的步骤:
1、根据你确定的reader和writer来使用命令生成对应的模板
2、去github上找到对应参数值的写法,主要是看参数是否是必须项,有什么影响
如果你不知道怎么写模版 可以去官网查看 或者使用如下命名查看DataX配置文件模板。
datax.py -r mysqlreader -w hdfswriter #以mysql-->hdfs为例
生成模版:
解读:json最外层是一个job,job包含setting和content两部分,其中setting用于对整个job进行配置,content用户配置数据源和目的地。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [],
"connection": [
{
"jdbcUrl": [],
"table": []
}
],
"password": "",
"username": "",
"where": ""
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [],
"compress": "",
"defaultFS": "",
"fieldDelimiter": "",
"fileName": "",
"fileType": "",
"path": "",
"writeMode": ""
}
}
}
],
"setting": {
"speed": {
"channel": ""
}
}
}
}
然后,可以根据官网给出的样例进行更改:
在参数说明中还能了解哪些参数必选和默认值等内容:
类型转换
mysql2hdfs
更改模版:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"CId",
"Cname",
"Tid"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["Course"]
}
],
"password": "123456",
"username": "root"
// where不是必选项 直接删掉
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "CId",
"type": "String"
},
{
"name": "Cname",
"type": "String"
},
{
"name": "Tid",
"type": "String"
}
],
//"compress": "NONE", // 是否压缩 不是必选项删掉
"defaultFS": "hdfs://master:9000",
"fieldDelimiter": ",", // 列分隔符
"fileName": "Course",
"fileType": "text", // 文件类型
"path": "/bigdata30/dataxout1",
"writeMode": "append" //hdfswriter写入前数据清理处理模式
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行报错 : 路径不存在 说明要提前创建路径
hdfs dfs -mkdir /bigdata30/dataxout1
运行:
datax.py mysql2hdfs.json
成功 hdfs出现文件:
查看文件内容:
mysql2mysql
场景:将Y1数据库中的Course表同步到datax1数据库中
生成 MySQL 到 MySQL 同步的模板:
datax.py -r mysqlreader -w mysqlwriter
查看官网mysqlwrite模版,写出自己的模版
在datax数据库中新建Course表
create table if not exists datax1.Course(
CId varchar(10),
Cname nvarchar(10),
TId varchar(10)
)
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"CId",
"Cname",
"Tid"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["Course"]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"CId",
"Cname",
"Tid"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"table": ["Course"]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行
datax.py mysql2mysql.json
结果 在datax数据库中的Course表中出现数据:
mysql2hive
场景:
将mysql中的Studnet表数据同步到hive中的Students表中
查看模版
datax.py -r mysqlreader -w hdfswriter
在hive中建表
create table bigdata30.Students(
Sid STRING,
Sname STRING,
Sage DATE,
Ssex STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
编写配置文件mysql2hive.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"SId",
"Sname",
"Sage",
"Ssex"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://master:3306/Y1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["Student"]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": [
{
"name": "SId",
"type": "string"
},
{
"name": "Sname",
"type": "string"
},
{
"name": "Sage",
"type": "Date"
},
{
"name": "Ssex",
"type": "string"
}
],
"defaultFS": "hdfs://master:9000",
"fieldDelimiter": ",",
"fileName": "students",
"fileType": "text",
"path": "/user/hive/warehouse/bigdata30.db/students",
"writeMode": "append"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行
datax.py mysql2hive.json
hive students表中出现数据
hdfs2mysql == hive2mysql
场景:
将hive中的t_movie1表中的数据传入mysql中
查看模版
datax.py -r hdfsreader -w mysqlwriter
根据官方文档编写配置文件
DataX 内部类型 | Hive表 数据类型 |
---|---|
Long | TINYINT,SMALLINT,INT,BIGINT |
Double | FLOAT,DOUBLE |
String | String,CHAR,VARCHAR,STRUCT,MAP,ARRAY,UNION,BINARY |
Boolean | BOOLEAN |
Date | Date,TIMESTAMP |
先在mysq建一张输出表
create table if not exists datax1.datax_out1(
id bigint,
name varchar(100),
types varchar(100),
shengfen varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;
编辑配置文件
{
"job": {
"content": [
{
"reader": {
"name": "hdfsreader",
"parameter": {
"column": [
{
"index": 0,
"type": "long"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"type": "string", // 给一个定值
"value":"学生"
}
],
"defaultFS": "hdfs://master:9000",
"encoding": "UTF-8",
"fieldDelimiter": ",",
"fileType": "text",
"path": "/user/hive/warehouse/bigdata30.db/t_movie1/*"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"id",
"name",
"types",
"shengfen"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"table": ["datax_out1"]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行 mysql中的datax_out1表中出现数据
mysql2hbase
场景
将mysql中的datax_out1表数据写入hbase中的datax_tb1中
查看模版
datax.py -r mysqlreader -w hbase20xsqlwriter #hbase的版本是2.2.7
HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。
- 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件
所以我们不使用这种,使用Hbase11XWriter,根据官方文档挑选出必选的参数
官方文档:hbase11xwriter/doc/hbase11xwriter.md
在hbase中新建表
create 'datax_tb1','info'
编写配置文件 mysql2hbase.json
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": [
"id",
"name",
"types",
"shengfen"
],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false"],
"table": ["datax_out1"]
}
],
"password": "123456",
"username": "root"
}
},
"writer": {
"name": "hbase11xwriter",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
},
"table": "datax_tb1",
"mode": "normal",
"rowkeyColumn": [
{
"index":0,
"type":"long"
}
],
"column": [
{
"index":1,
"name": "info:name",
"type": "string"
},
{
"index":2,
"name": "info:types",
"type": "string"
},
{
"index":3,
"name": "info:shengfen",
"type": "string"
}
],
"encoding": "utf-8"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行 扫描datax_tb1表:
hbase2mysql
场景
将hbase中的datax_tb1表数据写入mysql中的datax_out2中
查看模版
datax.py -r hbase11xreader -w mysqlwriter
官方文档:hbase11xreader/doc/hbase11xreader.md
先在mysql中建表:
create table if not exists datax1.datax_out2(
id bigint,
name varchar(100),
types varchar(100),
shengfen varchar(10)
)CHARSET = utf8 COLLATE utf8_general_ci;
编写配置文件 hbase2mysql.json
{
"job": {
"content": [
{
"reader": {
"name": "hbase11xreader",
"parameter": {
"column": [
{
"name": "rowkey",
"type": "long"
},
{
"name": "info: name",
"type": "string"
},
{
"name": "info: types",
"type": "string"
},
{
"name": "info: shengfen",
"type": "string"
}
],
"encoding": "utf-8",
"hbaseConfig": {"hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"},
"mode": "normal",
"table": "datax_tb1"
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"column": [
"Id",
"name",
"types",
"shengfen"
],
"connection": [
{
"jdbcUrl": "jdbc:mysql://master:3306/datax1?useUnicode=true&characterEncoding=UTF-8&useSSL=false",
"table": ["datax_out2"]
}
],
"password": "123456",
"username": "root",
"writeMode": "insert"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
运行 mysql中的datax_out2中出现数据:
mysql增量同步到hive
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"last_mod"
],
"splitPk": "age",
"where": "id > 9", //条件筛选
"connection": [
{
"table": [
"student"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/datax1"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master01:9000",
"fileType": "text",
"path": "/user/hive/warehouse/bigdata30.db/dataxstudent",
"fileName": "student",
"column": [
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "string"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "string"
},
{
"name": "clazz",
"type": "string"
},
{
"name": "last_mod",
"type": "string"
}
],
"writeMode": "append", // 增量导入 全量导入 truncate
"fieldDelimiter": ","
}
}
}
],
"setting": {
"speed": {
"channel": 6
}
}
}
}