目录
1. Spark Load导入Hive非分区表数据
2. Spark Load 导入Hive分区表数据
3. 注意事项
进入正文之前,欢迎订阅专题、对博文点赞、评论、收藏,关注IT贫道,获取高质量博客内容!
宝子们订阅、点赞、收藏不迷路!抓紧订阅专题!
1. Spark Load导入Hive非分区表数据
1) 在node3hive客户端,准备向Hive表加载的数据
hive_data1.txt:
1,zs,18,100
2,ls,19,101
3,ww,20,102
4,ml,21,103
5,tq,22,104
2) 启动Hive,在Hive客户端创建Hive表并加载数据
#配置Hive 服务端$HIVE_HOME/conf/hive-site.xml
<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
</property>
注意:此配置项为关闭metastore版本验证,避免在doris中读取hive外表时报错。
#在node1节点启动hive metastore
[root@node1 ~]# hive --service metastore &
#在node3节点进入hive客户端建表并加载数据
create table hive_tbl (id int,name string,age int,score int) row format delimited fields terminated by ',';
load data local inpath '/root/hive_data1.txt' into table hive_tbl;
#查看hive表中的数据
hive> select * from hive_tbl;
1 zs 18 100
2 ls 19 101
3 ww 20 102
4 ml 21 103
5 tq 22 104
3) 在Doris中创建Hive 外部表
使用Spark Load 将Hive非分区表中的数据导入到Doris中时,需要先在Doris中创建hive 外部表,然后通过Spark Load 加载这张外部表数据到Doris某张表中。
#Doris中创建Hive 外表
CREATE EXTERNAL TABLE example_db.hive_doris_tbl
(
id INT,
name varchar(255),
age INT,
score INT
)
ENGINE=hive
properties
(
"dfs.nameservices"="mycluster",
"dfs.ha.namenodes.mycluster"="node1,node2",
"dfs.namenode.rpc-address.mycluster.node1"="node1:8020",
"dfs.namenode.rpc-address.mycluster.node2"="node2:8020",
"dfs.client.failover.proxy.provider.mycluster" = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider",
"database" = "default",
"table" = "hive_tbl",
"hive.metastore.uris" = "thrift://node1:9083"
);
注意:
- 在Doris中创建Hive外表不会将数据存储到Doris中,查询hive外表数据时会读取HDFS中对应hive路径中的数据来展示,向hive表中插入数据时,doris中查询hive外表也能看到新增数据。
- 如果Hive表中是分区表,doris创建hive表将分区列看成普通列即可。
以上hive外表结果如下:
mysql> select * from hive_doris_tbl;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 1 | zs | 18 | 100 |
| 2 | ls | 19 | 101 |
| 3 | ww | 20 | 102 |
| 4 | ml | 21 | 103 |
| 5 | tq | 22 | 104 |
+------+------+------+-------+
4) 创建Doris表
#创建Doris表
create table spark_load_t2(
id int,
name varchar(255),
age int,
score double
)
ENGINE = olap
DUPLICATE KEY(id)
DISTRIBUTED BY HASH(`id`) BUCKETS 8;
5) 创建Spark Load导入任务
创建Spark Load任务后,底层Spark Load转换成Spark任务进行数据导入处理时,需要连接Hive,所以需要保证在Spark node1-node3节点客户端中$SPARK_HOME/conf/目录下有hive-site.xml配置文件,以便找到Hive ,另外,连接Hive时还需要MySQL 连接依赖包,所以需要在Yarn NodeManager各个节点保证$HADOOP_HOME/share/hadoop/yarn/lib路径下有mysql-connector-java-5.1.47.jar依赖包。
#把hive客户端hive-site.xml 分发到Spark 客户端(node1-node3)节点$SPARK_HOME/conf目录下
[root@node3 ~]# scp /software/hive-3.1.3/conf/hive-site.xml node1:/software/spark-2.3.1/conf/
[root@node3 ~]# scp /software/hive-3.1.3/conf/hive-site.xml node2:/software/spark-2.3.1/conf/
[root@node3 ~]# cp /software/hive-3.1.3/conf/hive-site.xml /software/spark-2.3.1/conf/
#将mysql-connector-java-5.1.47.jar依赖分发到NodeManager 各个节点$HADOOP_HOME/share/hadoop/yarn/lib路径中
[root@node3 ~]# cp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar /software/hadoop-3.3.3/share/hadoop/yarn/lib/
[root@node3 ~]# scp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar node4:/software/hadoop-3.3.3/share/hadoop/yarn/lib/
[root@node3 ~]# scp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar node5:/software/hadoop-3.3.3/share/hadoop/yarn/lib/
编写Spark Load任务,如下:
LOAD LABEL example_db.label2
(
DATA FROM TABLE hive_doris_tbl
INTO TABLE spark_load_t2
)
WITH RESOURCE 'spark1'
(
"spark.executor.memory" = "1g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
6) Spark Load任务查看
登录Yarn Web UI查看对应任务执行情况:
执行命令查看Spark Load 任务执行情况:
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 37128
Label: label2
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=0
TaskInfo: cluster:spark1; timeout(s):3600; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-03-10 18:13:19
EtlStartTime: 2023-03-10 18:13:34
EtlFinishTime: 2023-03-10 18:15:27
LoadStartTime: 2023-03-10 18:15:27
LoadFinishTime: 2023-03-10 18:15:30
URL: http://node1:8088/proxy/application_1678424784452_0007/
JobDetails: {"Unfinished backends":{"0-0":[]},"ScannedRows":0,"TaskNumber":1,"LoadBytes":0,"All backends":{"0-0":[-1]},"FileNumber":0,"FileSi
ze":0} TransactionId: 24081
ErrorTablets: {}
1 row in set (0.00 sec)
7) 查看Doris结果
mysql> select * from spark_load_t2;
+------+------+------+-------+
| id | name | age | score |
+------+------+------+-------+
| 5 | tq | 22 | 104 |
| 4 | ml | 21 | 103 |
| 1 | zs | 18 | 100 |
| 3 | ww | 20 | 102 |
| 2 | ls | 19 | 101 |
+------+------+------+-------+
2. Spark Load 导入Hive分区表数据
导入Hive分区表数据到对应的doris分区表就不能在doris中创建hive外表这种方式导入,因为hive分区列在hive外表中就是普通列,所以这里我们使用Spark Load 直接读取Hive分区表在HDFS中的路径,将数据加载到Doris分区表中。
1) 在node3 hive客户端,准备向Hive表加载的数据
hive_data2.txt:
1,zs,18,100,2023-03-01
2,ls,19,200,2023-03-01
3,ww,20,300,2023-03-02
4,ml,21,400,2023-03-02
5,tq,22,500,2023-03-02
2) 创建Hive分区表并,加载数据
#在node3节点进入hive客户端建表并加载数据
create table hive_tbl2 (id int, name string,age int,score int) partitioned by (dt string) row format delimited fields terminated by ','
load data local inpath '/root/hive_data2.txt' into table hive_tbl2;
#查看hive表中的数据
hive> select * from hive_tbl2;
OK
1 zs 18 100 2023-03-01
2 ls 19 200 2023-03-01
3 ww 20 300 2023-03-02
4 ml 21 400 2023-03-02
5 tq 22 500 2023-03-02
hive> show partitions hive_tbl2;
OK
dt=2023-03-01
dt=2023-03-02
当hive_tbl2表创建完成后,我们可以在HDFS中看到其存储路径格式如下:
3) 创建Doris分区表
create table spark_load_t3(
dt date,
id int,
name varchar(255),
age int,
score double
)
ENGINE = olap
DUPLICATE KEY(dt,id)
PARTITION BY RANGE(`dt`)
(
PARTITION `p1` VALUES [("2023-03-01"),("2023-03-02")),
PARTITION `p2` VALUES [("2023-03-02"),("2023-03-03"))
)
DISTRIBUTED BY HASH(`id`) BUCKETS 8;
4) 创建Spark Load导入任务
创建Spark Load任务后,底层Spark Load转换成Spark任务进行数据导入处理时,需要连接Hive,所以需要保证在Spark node1-node3节点客户端中$SPARK_HOME/conf/目录下有hive-site.xml配置文件,以便找到Hive ,另外,连接Hive时还需要MySQL 连接依赖包,所以需要在Yarn NodeManager各个节点保证$HADOOP_HOME/share/hadoop/yarn/lib路径下有mysql-connector-java-5.1.47.jar依赖包。
#把hive客户端hive-site.xml 分发到Spark 客户端(node1-node3)节点$SPARK_HOME/conf目录下
[root@node3 ~]# scp /software/hive-3.1.3/conf/hive-site.xml node1:/software/spark-2.3.1/conf/
[root@node3 ~]# scp /software/hive-3.1.3/conf/hive-site.xml node2:/software/spark-2.3.1/conf/
[root@node3 ~]# cp /software/hive-3.1.3/conf/hive-site.xml /software/spark-2.3.1/conf/
#将mysql-connector-java-5.1.47.jar依赖分发到NodeManager 各个节点$HADOOP_HOME/share/hadoop/yarn/lib路径中
[root@node3 ~]# cp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar /software/hadoop-3.3.3/share/hadoop/yarn/lib/
[root@node3 ~]# scp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar node4:/software/hadoop-3.3.3/share/hadoop/yarn/lib/
[root@node3 ~]# scp /software/hive-3.1.3/lib/mysql-connector-java-5.1.47.jar node5:/software/hadoop-3.3.3/share/hadoop/yarn/lib/
编写Spark Load任务,如下:
LOAD LABEL example_db.label3
(
DATA INFILE("hdfs://node1:8020/user/hive/warehouse/hive_tbl2/dt=2023-03-02/*")
INTO TABLE spark_load_t3
COLUMNS TERMINATED BY ","
FORMAT AS "csv"
(id,name,age,score)
COLUMNS FROM PATH AS (dt)
SET
(
dt=dt,
id=id,
name=name,
age=age
)
)
WITH RESOURCE 'spark1'
(
"spark.executor.memory" = "1g",
"spark.shuffle.compress" = "true"
)
PROPERTIES
(
"timeout" = "3600"
);
注意:
- 以上HDFS路径不支持HA模式,需要手动指定Active NameNode节点
- 读取HDFS文件路径中的分区路径需要写出来,不能使用*代表,这与Broker Load不同。
- 目前版本测试存在问题:当Data INFILE中指定多个路径时有时会出现只导入第一个路径数据。
5) Spark Load任务查看
执行命令查看Spark Load 任务执行情况:
mysql> show load order by createtime desc limit 1\G;
*************************** 1. row ***************************
JobId: 39432
Label: label3
State: FINISHED
Progress: ETL:100%; LOAD:100%
Type: SPARK
EtlInfo: unselected.rows=0; dpp.abnorm.ALL=0; dpp.norm.ALL=3
TaskInfo: cluster:spark1; timeout(s):3600; max_filter_ratio:0.0
ErrorMsg: NULL
CreateTime: 2023-03-10 20:11:19
EtlStartTime: 2023-03-10 20:11:36
EtlFinishTime: 2023-03-10 20:12:21
LoadStartTime: 2023-03-10 20:12:21
LoadFinishTime: 2023-03-10 20:12:22
URL: http://node1:8088/proxy/application_1678443952851_0026/
JobDetails: {"Unfinished backends":{"0-0":[]},"ScannedRows":3,"TaskNumber":1,"LoadBytes":0,"All backends":{"0-0":[-1]},"FileNumber":2,"FileSi
ze":60} TransactionId: 25529
ErrorTablets: {}
1 row in set (0.02 sec)
6) 查看Doris结果
mysql> select * from spark_load_t3;
+------------+------+------+------+-------+
| dt | id | name | age | score |
+------------+------+------+------+-------+
| 2023-03-02 | 3 | ww | 20 | 300 |
| 2023-03-02 | 4 | ml | 21 | 400 |
| 2023-03-02 | 5 | tq | 22 | 500 |
+------------+------+------+------+-------+
3. 注意事项
1) 现在Spark load 还不支持 Doris 表字段是String类型的导入,如果你的表字段有String类型的请改成varchar类型,不然会导入失败,提示 type:ETL_QUALITY_UNSATISFIED; msg:quality not good enough to cancel
2) 使用 Spark Load 时如果没有在 spark 客户端的 spark-env.sh 配置 HADOOP_CONF_DIR 环境变量,会报 When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment. 错误。
3) 使用Spark Load时spark_home_default_dir配置项没有指定spark客户端根目录。提交 Spark job 时用到 spark-submit 命令,如果 spark_home_default_dir 设置错误,会报 Cannot run program "xxx/bin/spark-submit": error=2, No such file or directory 错误。
4) 使用 Spark load 时 spark_resource_path 配置项没有指向打包好的zip文件。如果 spark_resource_path 没有设置正确,会报 File xxx/jars/spark-2x.zip does not exist 错误。
5) 使用 Spark load 时 yarn_client_path 配置项没有指定 yarn 的可执行文件。如果 yarn_client_path 没有设置正确,会报 yarn client does not exist in path: xxx/yarn-client/hadoop/bin/yarn 错误
6) 使用Spark load 时没有在 yarn 客户端的 hadoop-config.sh 配置 JAVA_HOME 环境变量。如果 JAVA_HOME 环境变量没有设置,会报 yarn application kill failed. app id: xxx, load job id: xxx, msg: which: no xxx/lib/yarn-client/hadoop/bin/yarn in ((null)) Error: JAVA_HOME is not set and could not be found 错误
7) 关于FE配置
下面配置属于 Spark load 的系统级别配置,也就是作用于所有 Spark load 导入任务的配置。主要通过修改 fe.conf来调整配置值。
- enable_spark_load
开启 Spark load 和创建 resource 功能。默认为 false,关闭此功能。
- spark_load_default_timeout_second
任务默认超时时间为259200秒(3天)。
- spark_home_default_dir
spark客户端路径 (fe/lib/spark2x) 。
- spark_resource_path
打包好的spark依赖文件路径(默认为空)。
- spark_launcher_log_dir
spark客户端的提交日志存放的目录(fe/log/spark_launcher_log)。
- yarn_client_path
yarn二进制可执行文件路径 (fe/lib/yarn-client/hadoop/bin/yarn) 。
- yarn_config_dir
yarn配置文件生成路径 (fe/lib/yarn-config) 。
8) 关于Spark Load支持Kerberos认证配置看考官网:Spark Load - Apache Doris
9) 使用Spark Load 导入文件数据时,必须指定format ,否则Spark Load 执行最后会报错“spark etl job run failed java.lang.NullPointerException”
🏡个人主页:IT贫道的博客_CSDN博客-Apache Doris,Kerberos安全认证,随笔领域博主 主页包含各种IT体系技术
📌订阅:拥抱独家专题,你的订阅将点燃我的创作热情!
👍点赞:赞同优秀创作,你的点赞是对我创作最大的认可!
⭐️ 收藏:收藏原创博文,让我们一起打造IT界的荣耀与辉煌!
✏️评论:留下心声墨迹,你的评论将是我努力改进的方向!