数据集成工具 ---- datax 3.0

1、datax:

        是一个异构数据源离线同步工具,致力于实现关系型数据库(mysql、oracle等)hdfs、hive、hbase等各种异构数据源之间的数据同步

2、参考网址文献:

https://github.com/alibaba/DataX/blob/master/introduction.mdhttps://github.com/alibaba/DataX/blob/master/userGuid.mdhttps://github.com/alibaba/DataX/blob/master/introduction.md

3、Datax的框架设计:

Datax作为离线数据同步工具,主要的是采用了Framework+plugin架构构成,将数据源的读数据和写数据封装成对应的Reader和Writer插件,纳入到整体的同步框架中。

        1、Reader:作为数据的采集模块,负责采集数据源的数据,将数据发送给Framework

        2、Writer:作为数据写入模块,负责不断的向Framework取出数据,将数据写入到对应的目的端

        3、Framework:用于连接Reader和Writer,作为两者的数据传输通道,并处理缓冲、数据转换等核心技术问题。

4、Datax的核心架构:

Datax3.0 开源版本是支持单机多线程来完成同步作业运行,因为底层是使用java做开发。整体的架构:

模块的核心介绍:

        1、Datax完成单个数据同步做作业,被称之为job,Datax接收到一个job时就会启动一个进程来完成数据同步工作,所以Datax job 模块是单个作业的中枢管理中心,主要是承担数据清理,子任务切分、TaskGroup 管理。

        2、当Datax启动后,Datax job会根据不同的源数据将job切分成不同的Task,所以Task是Datax的最小作业单位,每一个Task都会负责一部分的数据同步。

        3、切分成多个Task后,Datax job 就会调用scheduler模块,根据配置的并发数量,将拆分的Task重新组合,组装成TaskGroup,每一个TaskGroup都负责一定的Task任务的执行,默认TaskGroup并发数量数5个。

        4、每一个Task都是由TaskGroup所监控执行启动,每一个Task启动后都会按照Reader---Channel---Writer的执行顺序执行。

        5、当任务启动后,Datax job就会监控所有的TaskGroup的执行情况,当所有的TaskGroup任务完成后,job就会退出,当出现异常,就会异常退出并且进程退出值非0.

5、Datax的核心优势:

        1、可靠的数据质量监控

        2、丰富的数据转换功能

        3、精准的控制速度

        4、容错机制:

                1、线程内部重试

                        DataX的核心插件都经过团队的全盘review,不同的网络交互方式都有不同的重试策略。

                2、线程级别重试

                        目前DataX已经可以实现TaskFailover,针对于中间失败的Task,DataX框架可以做到整个Task级别的重新调度。

        5、极简的体验

6、Datax与Sqoop的区别:
功能Dataxsqoop
运行模式单进程多线程MR
分布式是不支持分布式支持
流控需要定制
统计信息支持不支持,分布式的数据收集不方便
数据校验只有core部分有校验功能不支持,分布式的数据收集不方便
监控需要定制需要定制
7、Datax部署:

1、下载jar包:

        下载路径:https://github.com/alibaba/DataX

2、解压文件,配置环境变量:  

#解压jar包
tar -zxvf datax.tar.gz

#配置环境变量:
vim /etc/profile


export   DATAX_HOME=/user/loacl/soft/datax
export   PATH=.:$PATH:$DATAX_HOME/bin

#配置好环境变量,让配置文件生效
source /etc/profile

3、使执行文件拥有执行权:

添加执行权:

chmod +x  data.py
8、Datax的使用:
在datax中会自动的生成模板的命令:

datax.py -r streamreader -w streamwriter
        1、streamreader  to  streamwriter,数据打印在控制台上面

参数说明:

"sliceRecordCount": 100  #指定打印的个数

"channel": 1  #指定并发度
#创建json文件:

vim streamreadertostreamwriter.json


{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "streamreader", 
                    "parameter": {
                        "column": [
                            {
                                "type":"string",
                                "value":"wyz"
                            },
                            {
                                "type":"int",
                                "value":"18"
                            }
                        ], 
                        "sliceRecordCount": 100  #指定打印的个数
                    }
                }, 
                "writer": {
                    "name": "streamwriter", 
                    "parameter": {
                        "encoding": "", 
                        "print": true
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1  #指定并发度
            }
        }
    }
}



#脚本执行命令:
datax.py streamreadertostreamwriter.json
        2、mysql to mysql
1、可以通过命令获取模板:
datax.py  -r mysqlreader -w mysqlwriter 

2、可以通过github上的模板进行编写:分别是mysqlreader和mysqlwriter,参数会比较详细


3、在插入数据的需要注意是在将数据写入的时候如果出现在数据,那么此时可能是创建的表出了问题
    例如:表中的某个字段是主键,主键唯一
vim mysqlreaderTomysqlwriter.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://192.168.226.1:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""      #不是必须要写的,作用是可以在读数据时进行一次过滤
                    }
                }, 
                "writer": {
                    "name": "mysqlwriter", 
                    "parameter": {
                        "column": [
                                "id",
							 "name",
							 "age",
							 "clazz",
							 "gender"
                        ], 
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.226.1:3306/bigdata25", 
                                "table": ["data_test"]
                            }
                        ], 
                        "password": "123456", 
                        "preSql": [],   #不是必须写的,作用是再写入数据前可以执行该sql
                        "session": [], 
                        "username": "root", 
                        "writeMode": "insert"   #必选,指定数据写入的模式,分成三种:insert(一般默认)、replace、update
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}




#执行脚本:
datax.py   mysqlreaderTomysqlwriter.json

 将数据写入到mysql时,写入的表是需要提前创建的。

        3、mysql to hdfs

参数解释:

使用datax的时候hdfswriter只支持两种文件形式,分别是text和orc

"fileType": "text",  #支持两种方式:text和orc,text表示的是textfile,orc表示的orcfile

"compress": "", #指定文件的压缩形式,不指定代表不用压缩,
                text支持的压缩方式:gzip,bzip2,
                orc支持的压缩方式有NONE和SNAPPY

 "writeMode": "append"   #表示的是数据在写入的操作,分成三种:
                            append,写入前不做任何处理  
                            nonconflit 如果文件存在,直接报错  
                            truncate:如果文件存在,那就先删除在写入

"path": "/bigdata25/datax/datax_mysqltohdfs/"  #文件不存在是需要提前创建的

vim mysqlTohdfs.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "stu_mysqltohdfs", 
                        "fileType": "text",  
                        "path": "/bigdata25/datax/datax_mysqltohdfs/", 
                        "writeMode": "append"                         												
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}

#执行脚本:
datax.py mysqlTohdfs.json
        4、mysql to hive

 原理思想:

实际上还是将数据存入到hdfs上面,hive通过记录元数据信息来获取数据
原理:
	创建好hive表在保存在hdfs上,是有文件路径,然后通过写入到指定的hdfs文件路径就能将数据写入到hive表中
当开启hive的时候,在hive中创建的表会默认的存储hdfs的/user/hive/warehouse/目录下

前期准备:

前期准备:
启动hive(后台启动):nohup hive --service metastore &
连接hive:hive

创建hive表(在没有说明的情况下一般在都是创建一个外部表)
创建一个datax数据库:
	create database datax;
切换数据库:use datax
创建外部表:
	create external table if not exists datax_mysqltohive(
    	 id string,
         name string,
         age int,
         clazz string,
		gender string
    		)
row format delimited  fields terminated  by ',' stored as textfile

编写脚本:

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
        5、mysql to hbase(Hbase11XWriter)

参数解释:

"mode": "normal" 写hbase的模式,目前只支持normal模式

"hbaseConfig"  {"hbase.zookeeper.quorum": "***"} 描述:连接HBase集群需要的配置信息,JSON格式

"table": "writer" 表的名称,大小写比较敏感

"encoding" 编码方式

"rowkeyColumn" 描述:要写入的hbase的rowkey列。index:指定该列对应reader端column的索引,从0开始,若为常量index为-1;type:指定写入数据类型,用于转换HBase byte[];value:配置常量,常作为多个字段的拼接符。hbasewriter会将rowkeyColumn中所有列按照配置顺序进行拼接作为写入hbase的rowkey,不能全为常量


"versionColumn" 表示指定写入hbase的时间戳,支持当前时间、指定时间列、指定时间

前期准备:

前期准备:

启动zookeeper:
	zkServer.sh start(每一个节点上都是需要启动的)
查看zk的状态:
	zkServer.sh status 

启动hbase:
	start-hbase.sh

连接hbase:
	sqlline.py master,node1,node2

进入hbase的客户端:hbase shell 

hbase中查看表的命令:!table
退出命令 !quit
查看表:list
在hbase中创建创建表,指定表名和列簇:create 'student','cf1'
				

编写脚本:

vim mysqlTohbase.json



{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": ""
                    }
                }, 
                 "writer": {
                      "name": "hbase11xwriter",
                      "parameter": {
                        "hbaseConfig": {
                          "hbase.zookeeper.quorum": "master:2181,node1:2181,node2:2181"
                        },
                        "table": "NEW_STU",
                        "mode": "normal",
                        "rowkeyColumn": [
                            {
                              "index":0,
                              "type":"string"
                            },
                            {
                              "index":-1,
                              "type":"string",
                              "value":"_"
                            }
                        ],
                        "column": [
                          {
                            "index":1,
                            "name": "cf1:name",
                            "type": "string"
                          },
                          {
                            "index":2,
                            "name": "cf1:age",
                            "type": "int"
                          },
                          {
                            "index":3,
                            "name": "cf1:clazz",
                            "type": "string"
                          },
                          {
                            "index":4,
                            "name": "cf1:gender",
                            "type": "string"
                          }
                        ],
                        "versionColumn":{
                          "index": -1,
                          "value":"123456789"
                        },
                        "encoding": "utf-8"
                      
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": 1
            }
        }  
    }
}

#执行脚本
datax.py mysqlTohbase.json
        6、mysql增量同步数据到hive中。

最主要的工作就是在原先的mysql数据导入的hive中的基础上进行where过滤

vim mysqlTohive.json

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader", 
                    "parameter": {
                        "column": ["id","name","age","clazz","gender"], 
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://master:3306/bigdata25"], 
                                "table": ["stu"]
                            }
                        ], 
                        "password": "123456", 
                        "username": "root", 
                        "where": "id >20"
                    }
                }, 
                "writer": {
                    "name": "hdfswriter", 
                    "parameter": {
                        "column": [
                            {
                                "name":"id",
                                "type":"string"
                            },
                            {
                                "name":"stu_name",
                                "type":"string" 
                            },
                            {
                                "name":"age",
                                "type":"string"
                            },
                            {
                                
                                "name":"clazz",
                                "type":"string"
                            },
                            {
                                "name":"gender",
                                "type":"string" 
                            }
                        ], 
                        "compress": "", 
                        "defaultFS": "hdfs://master:9000", 
                        "fieldDelimiter": ",", 
                        "fileName": "datax_mysqltohive1", 
                        "fileType": "text", 
                        "path": "/user/hive/warehouse/datax.db/datax_mysqltohive", 
                        "writeMode": "append"
                    }
                }
            }
        ], 
        "setting": {
            "speed": {
                "channel": 1
            }
        }
    }
}


#执行脚本命令:
datax.py  mysqlTohive.json
9、在使用datax过程中出现的错误:

1、配置文件出现错误,脚本不完整:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/454327.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

代码随想录算法训练营Day46 ||leetCode 139.单词拆分 || 322. 零钱兑换 || 279.完全平方数

139.单词拆分 class Solution { public:bool wordBreak(string s, vector<string>& wordDict) {unordered_set<string> wordSet(wordDict.begin(), wordDict.end());vector<bool> dp(s.size() 1, false);dp[0] true;for (int i 1; i < s.size(); …

【Linux】-Linux下的软件商店yum工具介绍(linux和windows互传文件仅仅一个拖拽搞定!!!!)

目录 1.Linux 软件包管理器yum 1.1快速认识yum 1.2 yumz下载方式&#xff08;如何使用yum进行下载&#xff0c;注意下载一定要是root用户或者白名单用户&#xff08;可提权&#xff09;&#xff09; 1.2.1下载小工具rzsz 1.2.2 rzsz使用 1.2.2查看软件包 1.3软件的卸载 2.yum生…

三、HarmonyOS 应用开发入门之运行Hello World

目录 1、课程对象 1.1、有移动端开发经验 1.2、无移动端开发经验 1.3、对 HarmonyOS 感兴趣 2、DevEco Studio 的使用 2.1、DevEco Studio 的关键特性 智能代码编辑 低代码开发 多段双向实时预览 多端模拟仿真 2.2、安装配置 DevEco Studio 2.2.1、官网开发工具下载地…

蓝桥杯真题讲解:三国游戏(贪心)

蓝桥杯真题讲解&#xff1a;三国游戏&#xff08;贪心&#xff09; 一、视频讲解二、正解代码 一、视频讲解 蓝桥杯真题讲解&#xff1a;三国游戏&#xff08;贪心&#xff09; 二、正解代码 //三国游戏&#xff1a;贪心 #include<bits/stdc.h> #define int long lon…

哪些订单预计会亏?一张报表告诉你

各位数据的朋友&#xff0c;大家好&#xff0c;我是老周道数据&#xff0c;和你一起&#xff0c;用常人思维数据分析&#xff0c;通过数据讲故事。 销售订单一般是企业在销售活动中重要的单据&#xff0c;当我们接到一个客户的订单时&#xff0c;就需要在系统中录入一个销售订…

jQuery模态框弹窗提示代码

jQuery模态框弹窗提示代码 下载地址 jQuery模态框弹窗提示代码

Volatile与JMM

被Volatile修饰的变量有两大特点 可见性 有序性&#xff08;禁重排&#xff09; 如何保证的&#xff1f;内存屏障 Volatile的内存语义 当写一个Volatile变量的时候&#xff0c;JMM会把该线程对应的本地内存共享变量值立即刷新回主内存。 当读一个Volatile变量的时候&…

【Java语言】遍历List元素时删除集合中的元素

目录 前言 实现方式 1.普通实现 1.1 使用【for循环】 方式 1.2 使用【迭代器】方式 2.jdk1.8新增功能实现 2.1 使用【lambda表达式】方式 2.2 使用【stream流】方式 注意事项 1. 使用【for循环】 方式 2. 不能使用增强for遍历修改元素 总结 前言 分享几种从List中移…

程序语言设计

一、程序设计语言及其构成 1.程序设计语言 2.高级程序设计语言划分 3.常见的高级程序语言 4.标记语言 5.程序设计语言的构成 二、表达式 表达式的类型及转换规则 三、传值和传址调用 1.数据类型 2.传值和传址调用 四、语言处理程序 1.语言处理程序 语言处理程序&#xff1…

【JS】浅谈浅拷贝与深拷贝

浅拷贝与深拷贝 前言一、浅拷贝&#xff1f;1.1是什么&#xff1f;1.2做什么&#xff1f;1.3为什么使用&#xff1f;1.4实现方式&#xff1f;1.5 应用场景&#xff1f; 二、深拷贝&#xff1f;2.1是什么&#xff1f;2.2做什么&#xff1f;2.3为什么使用&#xff1f;2.4实现方式…

成都产业园排名出炉!金牛区这个园区成数字产业聚集地

近日&#xff0c;成都产业园排名榜单正式发布&#xff0c;可以看出金牛区成数字产业聚集地&#xff0c;其中&#xff0c;备受瞩目的国际数字影像产业园荣登榜首。这一排名不仅彰显了国际数字影像产业园在数字产业领域的卓越表现&#xff0c;更凸显了成都作为西部重要城市在科技…

51单片机系列-单片机定时器

&#x1f308;个人主页&#xff1a;会编辑的果子君 &#x1f4ab;个人格言:“成为自己未来的主人~” 软件延时的缺点 延时过程中&#xff0c;CPU时间被占用&#xff0c;无法进行其他任务&#xff0c;导致系统效率降低&#xff0c;延时时间越长&#xff0c;该缺点就越明显&…

HBuilder发行微信小程序

首先需要完善mainifest.json中的基本配置 这个需要组测dcloud才可以获取&#xff0c;注册后点击重新获取就可以。 然后发行前还需要完成dcloud的信息&#xff0c;这个他会给你网址 点击连接完成信息填写就可以了 然后就可以发行了。 发行成功后会自动跳转微信小程序&#xff…

day02vue学习

day02 一、今日学习目标 1.指令补充 指令修饰符v-bind对样式增强的操作v-model应用于其他表单元素 2.computed计算属性 基础语法计算属性vs方法计算属性的完整写法成绩案例 3.watch侦听器 基础写法完整写法 4.综合案例 &#xff08;演示&#xff09; 渲染 / 删除 / 修…

Flutter第四弹:Flutter图形渲染性能

目标&#xff1a; 1&#xff09;Flutter图形渲染性能能够媲美原生&#xff1f; 2&#xff09;Flutter性能优于React Native? 一、Flutter图形渲染原理 1.1 Flutter图形渲染原理 Flutter直接调用Skia。 Flutter不使用WebView&#xff0c;也不使用操作系统的原生控件,而是…

如何深度学习

信息爆炸时代&#xff0c;诞生了很多新的学习方式&#xff0c;非常轻松就能掌握知识&#xff0c;比如&#xff0c;每天听一本书&#xff0c;半个小时就能学习一本书的精华&#xff0c;比如订阅名家专栏或者课程&#xff0c;在不长的时间内内就能学到很多知识。 很多人认为这样…

jenkins 使用k8s插件连接k8s集群

jenkins 安装k8s 插件 配置k8s节点 填写k8s 配置信息 生成秘钥 在服务器上面 查看地址 Kubernetes 服务证书 key cat /root/..kube/config 查看秘钥 对秘钥进行base64 位 加密 echo "秘钥内容" | base64 -d -----BEGIN CERTIFICATE----- MIIDITCCAgmgAwIB…

【node】模块化与包(二)

1、模块化的基本概念 模块化是指解决一个复杂的问题时&#xff0c;自顶向下逐层把系统划分成若干模块的过程。对于整个系统来说&#xff0c;模块是可组合、分解和更换的单元。 &#xff08;1&#xff09;模块化的优点 遵循固定规则&#xff0c;把大文件拆分成对立并相互依赖…

【Axure高保真原型】下拉列表切换图表

今天和大家分享通过下拉列表动态切换统计图表的原型模板&#xff0c;我们可以通过下拉列表选择要显示的图表&#xff0c;包括柱状图、条形图、饼图、环形图、折线图、曲线图、面积图、阶梯图、雷达图&#xff1b;而且图表数据可以在左侧表格中动态维护&#xff0c;包括增加修改…

数码管的动态显示(二)

1.原理 这个十六进制是右边的dp为高位。 数码管的动态显示&#xff0c;在第一个计数周期显示个位&#xff0c;在第二个周期显示十位&#xff0c;在第三个周期显示百位由于人眼的视觉和数码管的特性&#xff0c;感觉就是显示了234&#xff0c;每个数码管的显示需要从输入的数据里…