全量采集:采集全部数据
3、全量采集
vim students_all.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"update_time"
],
"connection": [
{
"table": [
"students"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/bigdata31"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/data/students_all/dt=${dt}",
"fileName": "students",
"column": [
{
"name": "id",
"type": "STRING"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "STRING"
},
{
"name": "clazz",
"type": "STRING"
},
{
"name": "update_time",
"type": "STRING"
}
],
"writeMode": "truncate",
"fieldDelimiter": ","
}
}
}
]
}
}
# 创建分区目录
hdfs dfs -mkdir -p /data/students_all/dt=2024-10-21
# 执行datax脚本
datax.py -p"-Ddt=2024-10-21" students_all.json
# 增加分区
hive -e "alter table students_all add if not exists partition(dt='2024-10-21');"
增量采集:就只采集新插入或修改的数据
1、原表需要有一个更新时间字段
CREATE TABLE `students` (
`id` bigint(20) ,
`name` varchar(255) ,
`age` bigint(20),
`gender` varchar(255) ,
`clazz` varchar(255),
`update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ;
2、在hive中创建分区表
create external table if not exists students_all(
id bigint comment '学生id'
,name string comment '学生姓名'
,age bigint comment '学生年龄'
,sex string comment '学生性别'
,clazz string comment '学生班级'
,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile
location 'hdfs://master:9000/data/students_all';
4、创建增量表
create external table if not exists students_acc(
id bigint comment '学生id'
,name string comment '学生姓名'
,age bigint comment '学生年龄'
,sex string comment '学生性别'
,clazz string comment '学生班级'
,update_time string comment '更新时间'
) comment '学生信息表'
partitioned by (dt string)
row format delimited fields terminated by ','
stored as textfile
location 'hdfs://master:9000/data/students_acc';
5、增量采集更新的数据
vim students_acc.json
{
"job": {
"setting": {
"speed": {
"channel": 1
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"where": "substr(update_time,1,10)='${dt}'",
"column": [
"id",
"name",
"age",
"gender",
"clazz",
"update_time"
],
"connection": [
{
"table": [
"students"
],
"jdbcUrl": [
"jdbc:mysql://master:3306/bigdata31"
]
}
]
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"defaultFS": "hdfs://master:9000",
"fileType": "text",
"path": "/data/students_acc/dt=${dt}",
"fileName": "students",
"column": [
{
"name": "id",
"type": "STRING"
},
{
"name": "name",
"type": "STRING"
},
{
"name": "age",
"type": "INT"
},
{
"name": "gender",
"type": "STRING"
},
{
"name": "clazz",
"type": "STRING"
},
{
"name": "update_time",
"type": "STRING"
}
],
"writeMode": "truncate",
"fieldDelimiter": ","
}
}
}
]
}
}
# 创建分区目录
hdfs dfs -mkdir -p /data/students_acc/dt=2024-10-22
# 执行datax脚本
datax.py -p"-Ddt=2024-10-22" students_acc.json
# 增加分区
hive -e "alter table students_acc add if not exists partition(dt='2024-10-22');"
6、合并数据
vim student_merge.sql
insert overwrite table students_all partition(dt='${dt}')
select
id,
name,
age,
sex,
clazz,
update_time
from
(
select
id,
name,
age,
sex,
clazz,
update_time,
row_number() over (
partition by
id
order by
update_time desc
) as r
from
(
select
*
from
students_all
where
dt = '${diff_dt}'
union all
select
*
from
students_acc
where
dt = '${dt}'
) as a
) as b
where
r = 1;
hive -f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21
spark-sql \
--master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-cores 1 \
--executor-memory 2G \
--conf spark.sql.shuffle.partitions=1 \
-f student_merge.sql -d dt=2024-10-22 -d diff_dt=2024-10-21