DataX介绍

一、介绍

DataX 是一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。
github地址
详细文档
操作手册

支持数据框架如下:
在这里插入图片描述
在这里插入图片描述在这里插入图片描述
在这里插入图片描述在这里插入图片描述

架构
在这里插入图片描述
Reader:为数据采集模块,负责采集数据源的数据,将数据发送给Framework。
Writer:为数据写入模块,负责不断向Framework取数据,并将数据写入到目的端。
Framework:用于连接reader和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。

二、使用

  1. 下载

下载地址:https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202308/datax.tar.gz

  1. 解压缩
# 解压缩
tar -zxvf datax.tar.gz -C /opt/module/
  1. 编写数据同步任务
{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,datax"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 5
       }
    }
  }
}

  1. 启动任务
python /opt/module/datax/bin/datax.py /opt/module/datax/job/stream_to_stream.json
  1. 执行结果

在这里插入图片描述

  1. 配置说明
参数说明
job.setting设置全局配置参数
job.setting.speed控制任务速度配置参数,包括:channel(通道(并发))、record(字节流)、byte(记录流)等三种模式
job.setting.speed.channel并发数
job.setting.speed.record字节流
job.setting.speed.byte记录流
job.setting.errorLimit设置错误限制
job.setting.errorLimit.record指定允许的最大错误记录数
job.setting.errorLimit.percentage指定允许的最大错误记录百分比
job.setting.dirtyDataPath设置错误限制
job.setting.dirtyDataPath.path设置错误限制
job.setting.log设置错误限制
job.setting.log.level设置错误限制
job.setting.log.dir设置错误限制
content任务配置参数
readerReader配置
nameReader类型
parameterReader具体配置(具体配置查看具体Reader)
writerWriter配置
nameWriter类型
parameterWriter具体配置(具体配置查看具体Writer)

三、常用配置

3.1、MysqlReader

{
    "job": {
        "setting": {
            "speed": {
                 "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "splitPk": "db_id",
                        "connection": [
                            {
                                "table": [
                                    "table"
                                ],
                                "jdbcUrl": [
     "jdbc:mysql://127.0.0.1:3306/database"
                                ]
                            }
                        ]
                    }
                },
               "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print":true
                    }
                }
            }
        ]
    }
}

配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
splitPk:使用splitPk代表的字段进行数据分片,DataX因此会启动并发任务进行数据同步,这样可以大大提供数据同步的效能
where:筛选条件
querySql:sql语句,可以替代column和where配置

3.2、MysqlWriter

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                 "reader": {
                    "name": "streamreader",
                    "parameter": {
                        "column" : [
                            {
                                "value": "DataX",
                                "type": "string"
                            },
                            {
                                "value": 19880808,
                                "type": "long"
                            },
                            {
                                "value": "1988-08-08 08:08:08",
                                "type": "date"
                            },
                            {
                                "value": true,
                                "type": "bool"
                            },
                            {
                                "value": "test",
                                "type": "bytes"
                            }
                        ],
                        "sliceRecordCount": 1000
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "id",
                            "name"
                        ],
                        "session": [
                        	"set session sql_mode='ANSI'"
                        ],
                        "preSql": [
                            "delete from test"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/datax?useUnicode=true&characterEncoding=gbk",
                                "table": [
                                    "test"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

配置说明:
jdbcUrl:链接地址
username:mysql用户名
password:mysql密码
table:待同步的表名
column:所配置的表中需要同步的列名集合,可以使用使用*代表所有字段
preSql:写入数据到目的表前,会先执行这里的标准语句
postSql:写入数据到目的表后,会执行这里的标准语句
writeMode:控制写入数据到目标表采用insert into或者replace into或者 ON DUPLICATE KEY UPDATE语句
batchSize:一次性批量提交的记录数大小

3.3、HdfsReader

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/user/hive/warehouse/mytable01/*",
                        "defaultFS": "hdfs://xxx:port",
                        "column": [
                               {
                                "index": 0,
                                "type": "long"
                               },
                               {
                                "index": 1,
                                "type": "boolean"
                               },
                               {
                                "type": "string",
                                "value": "hello"
                               },
                               {
                                "index": 2,
                                "type": "double"
                               }
                        ],
                        "fileType": "orc",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": true
                    }
                }
            }
        ]
    }
}

配置说明:
path:文件路径
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:”text”、”orc”、”rc”、”seq”、”csv”
column:读取字段列表
fieldDelimiter:读取的字段分隔符
encoding:读取文件的编码配置
nullFormat:文本文件中无法使用标准字符串定义null(空指针),DataX提供nullFormat定义哪些字符串可以表示为null
haveKerberos:是否有Kerberos认证,默认false
kerberosKeytabFilePath:Kerberos认证keytab文件路径,且为绝对路径
kerberosPrincipal:Kerberos认证Principal名
hadoopConfig:hadoop相关的一些高级参数

3.4、HdfsWriter

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/Users/shf/workplace/txtWorkplace/job/dataorcfull.txt"],
                        "encoding": "UTF-8",
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "long"
                            },
                            {
                                "index": 2,
                                "type": "long"
                            },
                            {
                                "index": 3,
                                "type": "long"
                            },
                            {
                                "index": 4,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 5,
                                "type": "DOUBLE"
                            },
                            {
                                "index": 6,
                                "type": "STRING"
                            },
                            {
                                "index": 7,
                                "type": "STRING"
                            },
                            {
                                "index": 8,
                                "type": "STRING"
                            },
                            {
                                "index": 9,
                                "type": "BOOLEAN"
                            },
                            {
                                "index": 10,
                                "type": "date"
                            },
                            {
                                "index": 11,
                                "type": "date"
                            }
                        ],
                        "fieldDelimiter": "\t"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://xxx:port",
                        "fileType": "orc",
                        "path": "/user/hive/warehouse/writerorc.db/orcfull",
                        "fileName": "xxxx",
                        "column": [
                            {
                                "name": "col1",
                                "type": "TINYINT"
                            },
                            {
                                "name": "col2",
                                "type": "SMALLINT"
                            },
                            {
                                "name": "col3",
                                "type": "INT"
                            },
                            {
                                "name": "col4",
                                "type": "BIGINT"
                            },
                            {
                                "name": "col5",
                                "type": "FLOAT"
                            },
                            {
                                "name": "col6",
                                "type": "DOUBLE"
                            },
                            {
                                "name": "col7",
                                "type": "STRING"
                            },
                            {
                                "name": "col8",
                                "type": "VARCHAR"
                            },
                            {
                                "name": "col9",
                                "type": "CHAR"
                            },
                            {
                                "name": "col10",
                                "type": "BOOLEAN"
                            },
                            {
                                "name": "col11",
                                "type": "date"
                            },
                            {
                                "name": "col12",
                                "type": "TIMESTAMP"
                            }
                        ],
                        "writeMode": "append",
                        "fieldDelimiter": "\t",
                        "compress":"NONE"
                    }
                }
            }
        ]
    }
}

配置说明:
path:存储到Hadoop Hdfs文件系统的路径信息
defaultFS:namenode节点地址
fileType:文件的类型,目前支持:“text”或“orc”
fileName:文件名
column:写入字段列表
fieldDelimiter:读取的字段分隔符
compress:文件压缩类型

3.5、FtpReader

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {
                    "name": "ftpreader",
                    "parameter": {
                        "protocol": "sftp",
                        "host": "127.0.0.1",
                        "port": 22,
                        "username": "xx",
                        "password": "xxx",
                        "path": [
                            "/home/hanfa.shf/ftpReaderTest/data"
                        ],
                        "column": [
                            {
                                "index": 0,
                                "type": "long"
                            },
                            {
                                "index": 1,
                                "type": "boolean"
                            },
                            {
                                "index": 2,
                                "type": "double"
                            },
                            {
                                "index": 3,
                                "type": "string"
                            },
                            {
                                "index": 4,
                                "type": "date",
                                "format": "yyyy.MM.dd"
                            }
                        ],
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "ftpWriter",
                    "parameter": {
                        "path": "/home/hanfa.shf/ftpReaderTest/result",
                        "fileName": "shihf",
                        "writeMode": "truncate",
                        "format": "yyyy-MM-dd"
                    }
                }
            }
        ]
    }
}

配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
column:读取字段列表
fieldDelimiter:读取的字段分隔符

3.6、FtpWriter

{
    "setting": {},
    "job": {
        "setting": {
            "speed": {
                "channel": 2
            }
        },
        "content": [
            {
                "reader": {},
                "writer": {
                    "name": "ftpwriter",
                    "parameter": {
                        "protocol": "sftp",
                        "host": "***",
                        "port": 22,
                        "username": "xxx",
                        "password": "xxx",
                        "timeout": "60000",
                        "connectPattern": "PASV",
                        "path": "/tmp/data/",
                        "fileName": "yixiao",
                        "writeMode": "truncate|append|nonConflict",
                        "fieldDelimiter": ",",
                        "encoding": "UTF-8",
                        "nullFormat": "null",
                        "dateFormat": "yyyy-MM-dd",
                        "fileFormat": "csv",
			"suffix": ".csv",
                        "header": []
                    }
                }
            }
        ]
    }
}

配置说明:
protocol:ftp服务器协议,目前支持传输协议有ftp和sftp
host:ftp服务器地址
port:ftp服务器端口
timeout:连接ftp服务器连接超时时间,单位毫秒,默认:60000
connectPattern:连接模式(主动模式或者被动模式)
username:用户名
password:密码
path:路径
fileName:文件名
fieldDelimiter:读取的字段分隔符

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/363501.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

RT-Thread:STM32的PB3,PB4 复用IO配置为GPIO

说明:在使用 STM32F103CBT6 配置了 PB3 为IO,测试时发现读取这个IO的电平时钟是0,即便单管脚上的电平是1,读取的数据任然是0,查规格书后发现PB3,PB4是JTAG复用口,要当普通IO用需要配置。 配置工具:STM32Cu…

51单片机编程应用(C语言):独立按键

目录 1.独立按键介绍 2.独立按键控制LED亮灭 1.1按下时LED亮,松手LED灭(按一次执行亮灭) 1.2首先按下时无操作,松手时LED亮(再按下无操作,所以LED亮),松手LED灭(松手时…

LeetCode:206反转链表

206. 反转链表 - 力扣(LeetCode) 不难,小细节是单写一个循环,把特殊情况包含进去, 单链表核心:上一个结点,当前结点,下一个结点, 代码:注释(算是…

系统分析师-22年-下午答案

系统分析师-22年-下午答案 更多软考知识请访问 https://ruankao.blog.csdn.net/ 试题一必答,二、三、四、五题中任选其中两题作答 试题一 (25分) 说明 某软件公司拟开发一套博客系统,要求能够向用户提供一个便捷发布自已心得,及时有效的…

安装jar包到maven本地仓库的基本步骤

1.jar名字和所在目录 2.输入导包脚本 mvn install:install-file -DfileE:\resources\6、SpringBoot3Vue3全套教程\02_资料\02_Bean注册资料\common-pojo-1.0-SNAPSHOT.jar -DgroupIdcn.itcast -DartifactIdcommon-pojo -Dversion1.0 -Dpackagingjar3.打开命令行输入脚本就可以…

Linux安装aria2出现No package aria2 available.的解决方案

大家好,我是爱编程的喵喵。双985硕士毕业,现担任全栈工程师一职,热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。喜欢通过博客创作的方式对所学的…

【C++】 C++入门—内联函数

C入门 1 内联函数1.1 定义1.2 查看方式1.3 注意 Thanks♪(・ω・)ノ谢谢阅读下一篇文章见!!! 1 内联函数 1.1 定义 程序在执行一个函数前需要做准备工作:要将实参、局部变量、返回地址以及若干寄存…

代理模式详解(重点解析JDK动态代理)

- 定义 在解析动态代理模式之前,先简单看下整个代理模式。代理模式分为普通代理、强制模式、动态代理模式。其中动态代理模式主要实现方式为Java JDK提供的JDK动态代理,第三方类库提供的,例如CGLIB动态代理。 代理模式就是为其他对象提供一种…

Attack Lab:Phase1~Phase5【缓冲区溢出实验】

注:本实验所用文件不是csapp官网给出的,是学校下发的。可以参考我的思路。 phase 1 本阶段目标是使getbuf调用结束后,控制权交给touch1函数。 则我们要知道两件事:一是缓冲区大小,二是touch1在虚拟内存中的位置。 用…

山海鲸智慧教育方案:教育数据的未来

作为山海鲸可视化软件的开发者,我们深知数据可视化在教育领域的重要价值。山海鲸智慧教育解决方案正是在这样的背景下应运而生,致力于为教育行业提供高效、直观的数据可视化解决方案。 随着教育信息化的深入推进,教育数据呈爆炸式增长。如何…

vue3中使用了keep-alive来缓存页面使用onActivated和onDeactivated生命周期

1.说明 要求从实单订单列表跳转到物流账单列表时通过订单号(orderSn)进行筛选。现在出现的问题是所以第一次跳转到物流账单列表页面时是可以实现通过订单号进行筛选数据。在没有关闭物流账单列表页面就进行第二次跳转 2.出现的问题 3.keep-alive缓存页面特有的生命周期 vue2…

【算法与数据结构】300、LeetCode最长递增子序列

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;   程序如下&#xff1a; class Solution { public:int lengthOfLIS(vector<int>& nums)…

IP 了解

参考&#xff1a;5.1 IP 基础知识全家桶 | 小林coding IP 在 TCP/IP 参考模型中处于第三层&#xff0c;也就是网络层。 网络层的主要作用是&#xff1a;实现主机与主机之间的通信&#xff0c;也叫点对点&#xff08;end to end&#xff09;通信。 什么是 IP 地址&#xff1f…

线上品牌展厅有哪些优点,如何打造线上品牌展厅

引言&#xff1a; 在当今数字化时代&#xff0c;品牌展示的方式也在不断演变&#xff0c;线上品牌展厅作为一种新型的展示方式&#xff0c;正逐渐成为品牌宣传的新宠。但是为什么需要线上品牌展厅&#xff0c;线上品牌展厅有哪些优势呢&#xff1f; 一&#xff0e;为什么需要线…

【开源】基于JAVA+Vue+SpringBoot的用户画像活动推荐系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 兴趣标签模块2.3 活动档案模块2.4 活动报名模块2.5 活动留言模块 三、系统设计3.1 用例设计3.2 业务流程设计3.3 数据流程设计3.4 E-R图设计 四、系统展示五、核心代码5.1 查询兴趣标签5.2 查询活动推荐…

[C++历练之路]C++多态底层逻辑知多少

W...Y的主页 &#x1f60a; 代码仓库分享&#x1f495; 前言&#x1f354;&#xff1a;学习了继承与多态&#xff0c;我相信大家对其底层的运用逻辑非常之好奇&#xff0c;今天我们就来探索一下多态中的底层逻辑&#xff0c;话不多说&#xff0c;我们现在开始&#xff01; 目…

【C++】 C++入门 — auto关键字

C入门 auto 关键字1 介绍2 使用细则3 注意事项 Thanks♪(&#xff65;ω&#xff65;)&#xff89;谢谢阅读下一篇文章见&#xff01;&#xff01;&#xff01; auto 关键字 1 介绍 编程时常常需要把表达式的值赋给变量&#xff0c;这就要求在声明变量时清楚地知道表达式的类…

系统移植--无法启动Linux内核--报错VFS--挂载nfs失败

问题 找信息&#xff1a;VFS 可能的原因 1、开发板上内核启动参数中的虚拟机ubuntu IP和真实的 虚拟机的IP不一致 2、开发板上内核启动参数中虚拟机的共享目录和虚拟机 ubuntu上配置的nfs服务器上的共享目录不一致 3、nfs配置文件(/etc/exports)路径错误 与自己的共享文件…

基于ARM的餐厅点餐系统的设计与实现

基于ARM的餐厅点餐系统的设计与实现 系统简介 本设计主要将 STM32F103ZET6 芯片作为无线订购系统主要控制芯片&#xff0c;分为顾客终端和厨师终端。顾客通过 LCD 显示屏浏览菜单并点击触摸屏选择自己所需菜单&#xff0c;并经过有线连接到 PC 端上位机&#xff0c;将订餐信息…

80.如何评估一台服务器能承受的最大TCP连接数

文章目录 一、一个服务端进程最多能支持多少条 TCP 连接&#xff1f;二、一台服务器最大最多能支持多少条 TCP 连接&#xff1f;三、总结 一个服务端进程最大能支持多少条 TCP 连接&#xff1f; 一台服务器最大能支持多少条 TCP 连接&#xff1f; 很多朋友可能第一反应就是端…