每个人的生活都是一个世界,即使最平凡的人也要为他那个世界的存在而战斗。
——《平凡的世界》
目录
一、sqoop简介
1.1 导入流程
1.2 导出流程
二、使用sqoop
2.1 sqoop的常用参数
2.2 连接参数列表
2.3 操作hive表参数
2.4 其它参数
三、sqoop应用 - 导入
3.1 准备测试数据
3.2 sqoop查看数据
3.3 创建Hive表
3.4 多map条件查询导入HDFS
3.5 全量导入数据
3.6 增量数据导入
四、sqoop应用 - 导出
4.1 Hive中数据导出到MySQL中
五、总结
一、sqoop简介
sqoop是Apache旗下的一款 hadoop和关系型数据库服务器之间传送数据
的工具
主要的功能:
- 导入数据
- MySQL、Oracle(关系型数据库)导入数据到hadoop的HDFS、Hive以及Hbase等数据存储系统
- 导出数据
- 从Hadoop的文件系统(HDFS等)中导出数据到关系型数据库(MySQL、PostgreSQL)中
1.1 导入流程
1. 首先通过JDBC读取关系型数据库元数据信息,获取到表结构
2. 根据元数据信息生成Java类
3. 启动import程序,通过JDBC读取关系型数据库数据,并通过上一步的Java类进行序列化
4. MapReduce并行写数据到Hadoop中,并使用Java类进行反序列化
1.2 导出流程
1.sqoop通过JDBC读取关系型数据库元数据,获取到表结构信息,生成Java类
2.MapReduce并行读取HDFS数据,并且通过Java类进行序列化
3.export程序启动,通过Java类反序列化,同时启动多个map任务,通过JDBC将数据写入到关系型数据库中
二、使用sqoop
环境:CDH 6.2.1
快速体验sqoop
# 前提是你已经下载好了sqoop
# 直接在命令行中输入以下命令(这个命令类似于你在MySQL中执行 show databases;)
# 格式:sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username 用户名 --password 密码
sqoop list-databases --connect jdbc:mysql://localhost:3306/ --username root --password 123456
# 查询指定库下面所有表(这个命令类似于你在MySQL中指定库后执行 show tables;)
# 格式:sqoop list-tables --connect jdbc:mysql://localhost:3306/库名 --username 用户名 --password 密码
sqoop list-tables --connect jdbc:mysql://localhost:3306/ecut --username root --password 123456
2.1 sqoop的常用参数
- 指令
sqoop 命令选项 参数
命令名称 | 对应类 | 命令说明 |
---|---|---|
import | ImportTool | 将关系型数据库数据导入到HDFS、HIVE、HBASE |
export | ExportTool | 将HDFS上的数据导出到关系型数据库 |
codegen | CodeGenTool | 获取数据库中某张表数据生成Java并打成Jar包 |
create-hive-table | CreateHiveTableTool | 创建hive的表 |
eval | EvalSqlTool | 查看SQL的执行结果 |
list-databases | ListDatabasesTool | 列出所有数据库 |
list-tables | ListTablesTool | 列出某个数据库下的所有表 |
help | HelpTool | 打印sqoop帮助信息 |
version | VersionTool | 打印sqoop版本信息 |
2.2 连接参数列表
参数 | 说明 |
---|---|
–connect | 连接关系型数据库的URL |
–help | 打印帮助信息 |
–username | 连接数据库的用户名 |
–password | 连接数据库的密码 |
–verbose | 在控制台打印出详细信息 |
2.3 操作hive表参数
参数 | 说明 |
---|---|
–hcatalog-database | 指定hive表的数据库名称。如果未指定,则使用默认数据库名称(default) |
–hcatalog-table | 指定hive表名,该–hcatalog-table选项的存在表示导入或导出作业是使用HCatalog表完成的,并且是HCatalog作业的必需选项。 |
2.4 其它参数
参数 | 含义 |
---|---|
–num-mappers N | 指定启动N个map进程 |
–table | 指定数据库表名 |
–query | 编写sql语句,将查询的结果导入,如果查询中有where条件,则条件后必须加上conditions关键字。如果使用双引号包含sql,则condition关键字前要加上*$CONDITIONS* 以完成转义: |
–target-dir | 指定HDFS路径 |
–delete-target-dir | 若hdfs存放目录已存在,则自动删除 |
–fields-terminated-by | 设置字段分隔符 |
–export-dir | 导出到指定HDFS的目录路径 |
三、sqoop应用 - 导入
需求:使用sqoop上传字典表数据到hive中与我们的数据进行关联查询。
3.1 准备测试数据
- 在MySQL中创建测试数据(库名test_ecut,表名products,总共54条数据)
-- 在MySQL客户端或者图形化工具里执行下面代码
drop database if exists test_ecut;
create database if not exists test_ecut char set utf8;
use test_ecut; -- 使用该数据库
create table test_ecut.products (
id int auto_increment primary key,
product_name varchar(255),
price decimal(10, 2)
);
-- 插入一些正常数据
insert into test_ecut.products (product_name, price) values ('商品A', 19.99);
insert into test_ecut.products (product_name, price) values ('商品B', 29.99);
insert into test_ecut.products (product_name, price) values ('商品C', 9.99);
insert into test_ecut.products (product_name, price) values ('商品D', 49.99);
insert into test_ecut.products (product_name, price) values ('商品E', 39.99);
-- 插入一些包含空值的数据(这里假设price字段允许为空,实际需根据你的表结构定义来确定是否合理)
insert into test_ecut.products (product_name, price) values ('商品F', null);
insert into test_ecut.products (product_name, price) values ('商品G', null);
-- 插入一些重复数据
insert into test_ecut.products (product_name, price) values ('商品A', 19.99);
insert into test_ecut.products (product_name, price) values ('商品B', 29.99);
-- 继续插入更多不同情况的数据以凑够45条示例(以下为随机模拟更多数据情况)
insert into test_ecut.products (product_name, price) values ('商品H', 59.99);
insert into test_ecut.products (product_name, price) values ('商品I', 15.99);
insert into test_ecut.products (product_name, price) values ('商品J', 25.99);
insert into test_ecut.products (product_name, price) values ('商品K', 69.99);
insert into test_ecut.products (product_name, price) values ('商品L', 89.99);
insert into test_ecut.products (product_name, price) values ('商品M', null);
insert into test_ecut.products (product_name, price) values ('商品N', 35.99);
insert into test_ecut.products (product_name, price) values ('商品O', 45.99);
insert into test_ecut.products (product_name, price) values ('商品P', 79.99);
insert into test_ecut.products (product_name, price) values ('商品Q', 99.99);
insert into test_ecut.products (product_name, price) values ('商品R', 10.99);
insert into test_ecut.products (product_name, price) values ('商品S', 12.99);
insert into test_ecut.products (product_name, price) values ('商品T', 14.99);
insert into test_ecut.products (product_name, price) values ('商品U', 16.99);
insert into test_ecut.products (product_name, price) values ('商品V', 18.99);
insert into test_ecut.products (product_name, price) values ('商品W', 20.99);
insert into test_ecut.products (product_name, price) values ('商品X', 22.99);
insert into test_ecut.products (product_name, price) values ('商品Y', 24.99);
insert into test_ecut.products (product_name, price) values ('商品Z', 26.99);
insert into test_ecut.products (product_name, price) values ('商品AA', 28.99);
insert into test_ecut.products (product_name, price) values ('商品AB', 30.99);
insert into test_ecut.products (product_name, price) values ('商品AC', 32.99);
insert into test_ecut.products (product_name, price) values ('商品AD', 34.99);
insert into test_ecut.products (product_name, price) values ('商品AE', 36.99);
insert into test_ecut.products (product_name, price) values ('商品AF', 38.99);
insert into test_ecut.products (product_name, price) values ('商品AG', 40.99);
insert into test_ecut.products (product_name, price) values ('商品AH', 42.99);
insert into test_ecut.products (product_name, price) values ('商品AI', 44.99);
insert into test_ecut.products (product_name, price) values ('商品AJ', 46.99);
insert into test_ecut.products (product_name, price) values ('商品AK', 48.99);
insert into test_ecut.products (product_name, price) values ('商品AL', 50.99);
insert into test_ecut.products (product_name, price) values ('商品AM', 52.99);
insert into test_ecut.products (product_name, price) values ('商品AN', 54.99);
insert into test_ecut.products (product_name, price) values ('商品AO', 56.99);
insert into test_ecut.products (product_name, price) values ('商品AP', 58.99);
insert into test_ecut.products (product_name, price) values ('商品AQ', 60.99);
insert into test_ecut.products (product_name, price) values ('商品AR', 62.99);
insert into test_ecut.products (product_name, price) values ('商品AS', 64.99);
insert into test_ecut.products (product_name, price) values ('商品AT', 66.99);
insert into test_ecut.products (product_name, price) values ('商品AU', 68.99);
insert into test_ecut.products (product_name, price) values ('商品AV', 70.99);
insert into test_ecut.products (product_name, price) values ('商品AW', 72.99);
insert into test_ecut.products (product_name, price) values ('商品AX', 74.99);
insert into test_ecut.products (product_name, price) values ('商品AY', 76.99);
insert into test_ecut.products (product_name, price) values ('商品AZ', 78.99);
select count(1) from test_ecut.products;
3.2 sqoop查看数据
- 可以借助sqoop中eval查看结果
# 通过eval查看:test_ecut库下的products表前5条数据
sqoop eval \
--connect jdbc:mysql://localhost:3306/test_ecut \
--username root \
--password 123456 \
--query "select * from products limit 5"
3.3 创建Hive表
前提:你需要启动hadoop集群(hdfs和yarn),以及hive服务(hiveserver2和metastore)
- 1:在hive中你需要先建库
-- 通过图形化工具(datagrip等),执行以下命令
create database hive_ecut;
- 2:使用create-hive-table创建hive表
# 基于MySQL表创建hive表
sqoop create-hive-table \
--connect jdbc:mysql://localhost:3306/test_ecut \
--username root \
--password 123456 \
--table products \
--hive-table hive_ecut.goods_table
- 3:然后通过datagrip工具,查看hive中是否存在表
3.4 多map条件查询导入HDFS
# 语法
sqoop import \
--connect 数据库连接字符串 \
--username 数据库用户名 \
--password 数据库密码 \
--target-dir HDFS位置 \
--delete-target-dir 若hdfs存放目录以及存在,则自动删除 \
--fields-terminated-by "\t" \
--num-mappers 3 \
--split-by 切分数据依据 \
--query 'select SQL where 查询条件 and $CONDITIONS'
解释:
query
将查询结果的数据导入,使用时必须伴随参--target-dir
或--hive-table
,如果查询中有where条件
,则条件后必须加上$CONDITIONS关键字当
sqoop
使用--query
执行多个maptask并行运行导入数据时,每个maptask将执行一部分数据的导入,原始数据需要使用--split-by 某个字段'
来切分数据,不同的数据交给不同的maptask去处理
maptask
执行sql脚本时,需要在where条件中添加$CONDITIONS条件,这个是linux系统的变量,可以根据sqoop
对边界条件的判断,来替换成不同的值,这就是说若split-by id
,则sqoop
会判断id
的最小值和最大值判断id
的整体区间,然后根据maptask的个数来进行区间拆分,每个maptask执行一定id
区间范围的数值导入任务,如下为示意图。
- 1:导入文本文件
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_table \
--delete-target-dir \
--fields-terminated-by "\001" \
--num-mappers 3 \
--split-by id \
--query 'select * from products where id < 10 and $CONDITIONS'
3.5 全量导入数据
补充: 导入数据可以分为两步
第一步,将数据导入到HDFS,默认的临时目录是/user/当前操作用户/mysql表名;
第二步,将导入到HDFS的数据迁移到Hive表,如果hive表不存在,sqoop会自动创建内部表;(我们的是在/user/root/products,通过查看job的configuration的outputdir属性得知)
第二步很重要,因为有时候报错并不是你的代码脚本问题,而是临时文件存在,在调度的时候运行的其实是临时文件中的配置job,需要删除才可以(.Trash和.staging 别删)
- 导入刚刚的商品数据,如果表不存在会自动创建内部表
# 导入命令
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hive_ecut.goods_table_test
3.6 增量数据导入
增量数据导入的两种方法
方法1:append方式
方法2:lastmodified方式,必须要加–append(追加)或者–merge-key(合并,一般填主键)
- 1:按照id增量导入数据
-- MySQL添加一条新的数据
insert into test_ecut.products(product_name, price) values ('无敌绝世小学生',999999)
# 按照id增量导入
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_table_test \
--fields-terminated-by "\001" \
--incremental append \
--check-column id \
--last-value 54
参数解释:
1)incremental:append或lastmodified,使用lastmodified方式导入数据要指定增量数据是要–append(追加)还是要–merge-key(合并)
2)check-column:作为增量导入判断的列名
3)last-value:指定某一个值,用于标记增量导入的位置,这个值的数据不会被导入到表中,只用于标记当前表中最后的值。(可以看到sqoop脚本中,我设置的id为54,也就意味着要跳过54而直接从55开始存)
- 2:按照时间增量导入数据
–incremental lastmodified
–append
–check-column 日期字段
在MySQL中重新建表,需要时间字段
-- mysql中新建products_update表
create table if not exists test_ecut.products_update(
id int auto_increment primary key,
product_name varchar(255),
price decimal(10, 2),
last_update_time datetime default current_timestamp on update current_timestamp
);
insert into test_ecut.products_update (product_name, price) values ('商品H', 59.99);
insert into test_ecut.products_update (product_name, price) values ('商品I', 15.99);
insert into test_ecut.products_update (product_name, price) values ('商品J', 25.99);
导入数据到hive中
# 在命令行中执行,然后在datagrip中查看数据
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--delete-target-dir \
--hive-import \
--fields-terminated-by "\001" \
--hive-overwrite \
--hive-table hive_ecut.goods_update_table
隔一段时间,新增一条数据
-- 在MySQL中,新增
insert into test_ecut.products_update (product_name, price) values ('无敌绝世小学生', 999999);
增量导入更新的数据
# 在命令行中执行,在datagrip中查看
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2024-12-28 13:18:00' \
--append
# 注意:last-value 的设置是把包括 2024-12-28 13:18:00 时间的数据做增量导入。(所以我给2024-12-28 13:17:59加了1秒)
- 3:按照时间增量并按照主键合并导入
–incremental lastmodified
–merge-key 用法
如果之前的数据有修改的话可以使用–incremental lastmodified --merge-key进行数据合并执行修改的SQL
更改字段,从而更新时间
-- 在MySQL中更新
update test_ecut.products_update set product_name = '萌神想' where product_name='无敌绝世小学生';
进行合并导入(如果报错,可能是因为/user/root/_sqoop存在了很多临时文件,需要删除这些临时文件)
sqoop import \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table products_update \
--num-mappers 1 \
--target-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--fields-terminated-by "\001" \
--incremental lastmodified \
--check-column last_update_time \
--last-value '2024-12-28 13:20:24' \
--merge-key id
# --incremental lastmodified --merge-key的作用:修改过的数据和新增的数据(前提是满足last-value的条件)都会导入进来,并且重复的数据(不需要满足last-value的条件)都会进行合并
四、sqoop应用 - 导出
4.1 Hive中数据导出到MySQL中
sqoop的export命令支持 insert、update到关系型数据库,但是不支持merge
- 1:查看需要导出表的数据
- 2:新建MySQL表用于接收hive中的数据
create table if not exists test_ecut.get_hive_data(
id int auto_increment primary key,
product_name varchar(255),
price decimal(10, 2),
last_update_time datetime default current_timestamp on update current_timestamp
);
- 3:导出到MySQL中
# 导出命令
sqoop export \
--connect jdbc:mysql://localhost:3306/test_ecut"?useUnicode=true&characterEncoding=UTF-8" \
--username root \
--password 123456 \
--table get_hive_data \
--export-dir /user/hive/warehouse/hive_ecut.db/goods_update_table \
--num-mappers 1 \
--fields-terminated-by '\001'
补充:sqoop的作用就是负责导入和导出的,我上面所写的虽然都在虚拟机上运行,但只要改一下localhost就可以实现不同主机之间的数据传输(前提是有映射,且可以互通)
五、总结
看完上面的操作之后,很容易发现一个特点,Sqoop其实就是个脚本,而且命令很固定,只需改改参数就可以使用,门槛并不高,能用就行,具体它底层怎么实现的,可以去官网看看(Sqoop已经不更新,虽然是apache的项目,但已经被打入冷宫了),值得一提的是,Sqoop 通常只会使用 Map 任务来完成数据的传输,不会启动 Reduce 任务