使用DataX实现mysql与hive数据互相导入导出

一、概论

1.1 什么是DataX

         DataX 是阿里巴巴开源的一个异构数据源离线同步工具,致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等各种异构数据源之间稳定高效的数据同步功能。

1.2 DataX 的设计

         为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步
在这里插入图片描述

1.3 框架设计

在这里插入图片描述

  • Reader:数据采集模块,负责采集数据源的数据,将数据发给Framework。
  • Wiriter: 数据写入模块,负责不断向Framwork取数据,并将数据写入到目的端。
  • Framework:用于连接read和writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
    运行原理
    在这里插入图片描述
  • Job:单个作业的管理节点,负责数据清理、子任务划分、TaskGroup监控管理。
  • Task:由Job切分而来,是DataX作业的最小单元,每个Task负责一部分数据的同步工作。
  • Schedule:将Task组成TaskGroup,单个TaskGroup的并发数量为5。
  • TaskGroup:负责启动Task。

1.4 Datax所支持的渠道

类型数据源读者作家(写)文件
RDBMS关系型数据库MySQL读,写
           甲骨文        √        √    读,写
SQL服务器读,写
PostgreSQL的读,写
DRDS读,写
通用RDBMS(支持所有关系型数据库)读,写
阿里云数仓数据存储ODPS读,写
美国存托凭证
开源软件读,写
OCS读,写
NoSQL数据存储OTS读,写
Hbase0.94读,写
Hbase1.1读,写
凤凰4.x读,写
凤凰5.x读,写
MongoDB读,写
蜂巢读,写
卡桑德拉读,写
无结构化数据存储文本文件读,写
的FTP读,写
HDFS读,写
弹性搜索
时间序列数据库OpenTSDB
技术开发局读,写

二、快速入门

2.1 环境搭建

下载地址: http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
源码地址: https://github.com/alibaba/DataX

配置要求:

  • Linux
  • JDK(1.8以上 建议1.8) 下载
  • Python(推荐 Python2.6.X)下载
    安装:

1) 将下载好的datax.tar.gz上传到服务器的任意节点,我这里上传到node01上的/exprot/soft
2)解压到/export/servers/

[root@node01 soft]# tar -zxvf datax.tar.gz  -C ../servers/

3)运行自检脚本

出现以下结果说明你得环境没有问题

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.
在这里插入图片描述

2.2搭建环境注意事项

[/opt/module/datax/plugin/reader/._hbase094xreader/plugin.json]不存在. 请检查您的配置文件.

参考:

find ./* -type f -name ".*er"  | xargs rm -rf
find: paths must precede expression: |
Usage: find [-H] [-L] [-P] [-Olevel] [-D help|tree|search|stat|rates|opt|exec] [path...] [expression]


find /datax/plugin/reader/ -type f -name "._*er" | xargs rm -rf
find /datax/plugin/writer/ -type f -name "._*er" | xargs rm -rf

这里的/datax/plugin/writer/要改为你自己的目录

原文链接:https://blog.csdn.net/dz77dz/article/details/127055299

2.3读取Mysql中的数据写入到HDFS

准备
创建数据库和表并加载测试数据

create database test;
use test;
create table c_s(
   id   varchar(100) null,
    c_id int          null,
    s_id varchar(20)  null
);
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 1, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 2, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 3, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 5, '201967');
INSERT INTO test.c_s (id, c_id, s_id) VALUES ('123', 6, '201967');

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r mysqlreader -w hdfswriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the mysqlreader document:
     https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

Please refer to the hdfswriter document:
     https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": [],
                                "table": []
                            }
                        ],
                        "password": "",
                        "username": "",
                        "where": ""
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [],
                        "compress": "",
                        "defaultFS": "",
                        "fieldDelimiter": "",
                        "fileName": "",
                        "fileType": "",
                        "path": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据官网模板进行修改

[root@node01 datax]# vim job/mysqlToHDFS.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "column": [
                            "id",
                            "c_id",
                            "s_id"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": [
                                    "jdbc:mysql://node02:3306/test"
                                ],
                                "table": [
                                    "c_s"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root"
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
                            {
                                "name": "id",
                                "type": "string"
                            },
                            {
                                "name": "c_id",
                                "type": "int"
                            },
                            {
                                "name": "s_id",
                                "type": "string"
                            }
                        ],
                        "defaultFS": "hdfs://node01:8020",
                        "fieldDelimiter": "\t",
                        "fileName": "c_s.txt",
                        "fileType": "text",
                        "path": "/",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

HDFS的端口号注意版本,2.7.4 是9000;hdfs://node01:9000

MySQL的参数介绍
在这里插入图片描述
HDFS参数介绍
在这里插入图片描述
运行脚本

[root@node01 datax]# bin/datax.py  job/mysqlToHDFS.json
2020-10-02 16:12:16.358 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /export/servers/datax/hook
2020-10-02 16:12:16.359 [job-0] INFO  JobContainer -
         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.245s             | 0.245s             | 0.245s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.155s             | 0.155s             | 0.155s

2020-10-02 16:12:16.359 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:12:16.359 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2020-10-02 16:12:16.360 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:12:04
任务结束时刻                    : 2020-10-02 16:12:16
任务总计耗时                    :                 12s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.4 读取HDFS中的数据写入到Mysql

准备工作

create database test;
use test;
create table c_s2(
   id   varchar(100) null,
    c_id int          null,
    s_id varchar(20)  null
);

查看官方提供的模板

[root@node01 datax]# bin/datax.py -r hdfsreader -w mysqlwriter

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


Please refer to the hdfsreader document:
     https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md

Please refer to the mysqlwriter document:
     https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md

Please save the following configuration as a json file and  use
     python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.

{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [],
                        "defaultFS": "",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ",",
                        "fileType": "orc",
                        "path": ""
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [],
                        "connection": [
                            {
                                "jdbcUrl": "",
                                "table": []
                            }
                        ],
                        "password": "",
                        "preSql": [],
                        "session": [],
                        "username": "",
                        "writeMode": ""
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": ""
            }
        }
    }
}

根据官方提供模板进行修改

[root@node01 datax]# vim job/hdfsTomysql.json
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "column": [
                            "*"
                        ],
                        "defaultFS": "hdfs://node01:8020",
                        "encoding": "UTF-8",
                        "fieldDelimiter": "\t",
                        "fileType": "text",
                        "path": "/c_s.txt"
                    }
                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "column": [
                            "id",
                            "c_id",
                            "s_id"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://node02:3306/test",
                                "table": [
                                    "c_s2"
                                ]
                            }
                        ],
                        "password": "123456",
                        "username": "root",
                        "writeMode": "replace"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

脚本运行

[root@node01 datax]# bin/datax.py job/hdfsTomysql.json

         [total cpu info] =>
                averageCpu                     | maxDeltaCpu                    | minDeltaCpu
                -1.00%                         | -1.00%                         | -1.00%


         [total gc info] =>
                 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime
                 PS MarkSweep         | 1                  | 1                  | 1                  | 0.026s             | 0.026s             | 0.026s
                 PS Scavenge          | 1                  | 1                  | 1                  | 0.015s             | 0.015s             | 0.015s

2020-10-02 16:57:13.152 [job-0] INFO  JobContainer - PerfTrace not enable!
2020-10-02 16:57:13.152 [job-0] INFO  StandAloneJobContainerCommunicator - Total 5 records, 50 bytes | Speed 5B/s, 0 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.033s | Percentage 100.00%
2020-10-02 16:57:13.153 [job-0] INFO  JobContainer -
任务启动时刻                    : 2020-10-02 16:57:02
任务结束时刻                    : 2020-10-02 16:57:13
任务总计耗时                    :                 11s
任务平均流量                    :                5B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   5
读写失败总数                    :                   0

2.5将Mysql表导入Hive

1.在hive中建表

-- hive建表
CREATE TABLE student2 (
	classNo string,
	stuNo string,
	score int) 
row format delimited fields terminated by ',';


-- 构造点mysql数据
create table if not exists student2(
    classNo varchar ( 50 ),
    stuNo   varchar ( 50 ),
    score    int 
)
insert into student2 values('1001','1012ww10087',63);
insert into student2 values('1002','1012aa10087',63);
insert into student2 values('1003','1012bb10087',63);
insert into student2 values('1004','1012cc10087',63);
insert into student2 values('1005','1012dd10087',63);
insert into student2 values('1006','1012ee10087',63);

2.编写mysql2hive.json配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "connection": [
                            {
                                "table": [
                                    "student2"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://192.168.43.10:3306/mytestmysql"
                                ]
                            }
                        ],
                        "column": [
                            "classNo",
                            "stuNo",
                            "score"
                        ]
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "defaultFS": "hdfs://192.168.43.10:9000",
                        "path": "/hive/warehouse/home/myhive.db/student2",
                        "fileName": "myhive",
                        "writeMode": "append",
                        "fieldDelimiter": ",",
                        "fileType": "text",
                        "column": [
                            {
                                "name": "classNo",
                                "type": "string"
                            },
                            {
                                "name": "stuNo",
                                "type": "string"
                            },
                            {
                                "name": "score",
                                "type": "int"
                            }
                        ]
                    }
                }
            }
        ]
    }
}

3.运行脚本

bin/datax.py job/mysql2hive.json 

4.查看hive表是否有数据

2.6将Hive表数据导入Mysql

1.要先在mysql建好表

create table if not exists student(
    classNo varchar ( 50 ),
    stuNo   varchar ( 50 ),
    score    int 
)

2.hive2mysql.json配置文件

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            }
        },
        "content": [
            {
                "reader": {
                    "name": "hdfsreader",
                    "parameter": {
                        "path": "/hive/warehouse/home/myhive.db/student/*",
                        "defaultFS": "hdfs://192.168.43.10:9000",
                        "column": [
                               {
                                "index": 0,
                                "type": "string"
                               },
                                                           {
                                "index": 1,
                                "type": "string"
                               },
                               {
                                "index": 2,
                                "type": "Long"
                               }
                        ],
                        "fileType": "text",
                        "encoding": "UTF-8",
                        "fieldDelimiter": ","
                    }

                },
                "writer": {
                    "name": "mysqlwriter",
                    "parameter": {
                        "writeMode": "insert",
                        "username": "root",
                        "password": "root",
                        "column": [
                            "classNo",
                            "stuNo",
                            "score"
                        ],
                        "preSql": [
                            "delete from student"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:mysql://192.168.43.10:3306/mytestmysql?useUnicode=true&characterEncoding=utf8",
                                "table": [
                                    "student"
                                ]
                            }
                        ]
                    }
                }
            }
        ]
    }
}

注意事项:

在Hive的ODS层建表语句中,以“,”为分隔符;
fields terminated by ','
在DataX的json文件中,也以“,”为分隔符。
"fieldDelimiter": "," 与hive表里面的分隔符保持一致即可

由于DataX不能完全支持所有Hive表的数据类型,应将DataX启动文件中的hdfsreader中的column字段的类型改成DataX支持的类型

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/52645.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

HTML5前端开发工程师的岗位职责说明(合集)

HTML5前端开发工程师的岗位职责说明1 职责 1、根据产品设计文档和视觉文件,利用HTML5相关技术开发移动平台的web前端页面; 2、基于HTML5.0标准进行页面制作,编写可复用的用户界面组件; 3、持续的优化前端体验和页面响应速度,并保证兼容性和…

C# NDArray System.IO.FileLoadException报错原因分析

C# NDArray System.IO.FileLoadException 报错原因分析: 1.NuGet程序包版本有冲突 2.统一项目版本 1.打开解决方案NuGet程序包设置 2.查看是否有版本冲突 3.统一版本冲突

【数据结构与算法】基数排序

基数排序 基数排序(Radix Sort)属于“分配式排序”,又称“桶子法”或 bin sort,顾名思义,它是通过键值的各个位的值,将要排序的元素分配至某些“桶”中,达到排序的作用。基数排序法是属于稳定性…

【UE5】快速认识入门

目录 🌟1. 快速安装🌟2. 简单快捷键操作🌟3. 切换默认的打开场景🌟4. 虚幻引擎术语 🌟1. 快速安装 进入Unreal Engine 5官网进行下载即可:UE5 📝官方帮助文档 打开后在启动器里创建5.2.1引擎…

Java并发系列之一:JVM线程模型

什么是线程模型: Java字节码运行在JVM中,JVM运行在各个操作系统上。所以当JVM想要进行线程创建回收这种操作时,势必需要调用操作系统的相关接口。也就是说,JVM线程与操作系统线程之间存在着某种映射关系,这两种不同维…

Qt信号与槽机制的本质

引入 对象与对象之间的通信有多个方式,如果我们要提供一种对象之间的通信机制。这种机制,要能够给两个不同对象中的函数建立映射关系,前者被调用时后者也能被自动调用。 再深入一些,两个对象如果都互相不知道对方的存在&#xff…

搭建网站 --- 快速WordPress个人博客并内网穿透发布到互联网

文章目录 快速WordPress个人博客并内网穿透发布到互联网 快速WordPress个人博客并内网穿透发布到互联网 我们能够通过cpolar完整的搭建起一个属于自己的网站,并且通过cpolar建立的数据隧道,从而让我们存放在本地电脑上的网站,能够为公众互联…

【phaser微信抖音小游戏开发002】hello world!

执行效果: 将以下代码文本内容,放入到game.js中即可。目录结构如下图 import ./js/libs/weapp-adapter import ./js/libs/symbolGameGlobal.window.scrollTo () > { };//防止真机出错 import Phaser from ./js/phaser//引入Phaservar {windowWidth, …

在线阅读版:《2023中国软件供应链安全分析报告》全文

聚焦源代码安全,网罗国内外最新资讯! 专栏供应链安全 数字化时代,软件无处不在。软件如同社会中的“虚拟人”,已经成为支撑社会正常运转的最基本元素之一,软件的安全性问题也正在成为当今社会的根本性、基础性问题。 随…

28_计算机网络(Computer Networks)基础

本篇介绍计算机网络的基础知识。 文章目录 1. 计算机网络历史2. 以太网" (Ethernet)2.1 以太网" (Ethernet)的简单形式及概念2.2 指数退避解决冲突问题2.3 利用交换机减少同一载体中设备2.4 互联网(The Internet)2.5 路由(routing)2.6 数据包…

基于SpringBoot+Vue的财务管理系统设计与实现(源码+LW+部署文档等)

博主介绍: 大家好,我是一名在Java圈混迹十余年的程序员,精通Java编程语言,同时也熟练掌握微信小程序、Python和Android等技术,能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架…

Docker基础命令(一)

Docker使用1 一、运行终端 打开终端,输入docker images ,如果运行正常,表示docker已经可以在本电脑上使用了 二、docker常用命令 指令说明docker images查看已下载的镜像docker rmi 镜像名称:标签名删除已下载的镜像docker search 镜像从官…

STM32 LWIP UDP 一对一 一对多发送

STM32 LWIP UDP通信 前言设置 IP 地址UDP函数配置实验结果单播发送,一对一发送广播发送,一对多发送 可能遇到的问题总结 前言 之前没有接触过网络的通信,工作需要 UDP 接收和发送通信,在网上没有找到一对一、一对多的相关例程&am…

Kata Containers

Kata Containers(简称 Kata 或 Kata Containers)是一个开源的容器运行时项目,它提供了一种轻量级的虚拟化解决方案,用于在容器内运行应用程序。Kata Containers 结合了虚拟机(VM)和容器的优势,旨…

【容器编排】初识 Kubernetes

目录 1.简介 2.为什么需要 k8s 3.k8s 能做什么? 4.k8s 不是什么? 1.简介 摘取官网: 概述 | Kubernetes Kubernetes 这个名字源于希腊语,意为舵手或飞行员。k8s 这个缩写是因为 k 和 s 之间有八个字符的关系。 Google 在 2014 年开源了 Kubernetes 项目。 Kub…

gti 远程操作

目录 一. 分布式版本控制管理系统 1. 理解分布式版本控制管理系统 二. 创建远程仓库 ​编辑 ​编辑 三. 克隆远程仓库_HTTP 四. 克隆远程仓库_SSH 配置公钥 添加公钥 五. git 向远程仓库推送 六. 拉取远程仓库 七. 忽略特殊文件 八. 配置别名 一. 分布式版本控制管理…

【腾讯云 Cloud Studio 实战训练营】体验搭建软件系统,无经验也能做开发

文章目录 前言IDE 解放开发者的生产力功能强大的 IDE快速搭建 Vue 开发环境注册 Cloud Studio进入Vue预置开发环境安装相关依赖包主文件 main.js 引入相关库和包首页增加移动端默认样式增加主要代码 IDE 的适用场景总结 前言 云计算技术的不断发展为代码开发带来了全新的体验&…

git本地库和远程库的相关操作命令

目录 一、分支概念&#xff1a; 二、 本地库分支管理&#xff1a; 1. 查看分支情况&#xff1a; 命令1&#xff1a;git branch 2. 新建分支 命令1&#xff1a; git branch <分支名> 命令2&#xff1a; git branch <新建分支名> <源分支名> 命令3&…

打印Winform控件实现简陋版的分页打印(C#)

本文的代码可以从这里获取&#xff1a;winformDemo.rar 张祥裕/分享的资源名称 - Gitee.com 作者的水平有限&#xff0c;如有错误&#xff0c;望指正。 为了简单起见&#xff0c;纸张大小&#xff0c;打印机等信息按照默认的来&#xff0c;本文的实现方案是&#xff1a;打印Pa…

RN 设置背景图片(使用ImageBackground组件)

在RN版本0.46版本的时候添加了ImageBackground控件。ImageBackground可以设置背景图片&#xff0c;使用方法和image一样&#xff0c;里面嵌套了其他的组件 import React from "react"; import { ImageBackground, StyleSheet, Text, View } from "react-native…