1 数据同步问题
Hive在实际工作中主要用于构建离线数据仓库,定期的从各种数据源中同步采集数据到Hive中,经过分层转换提供数据应用。比如每天需要从MySQL中同步最新的订单信息、用户信息、店铺信息等到数据仓库中,进行订单分析、用户分析。
例如:MySQL中有一张用户表:tb_user,每个用户注册完成以后,就会在用户表中新增该用户的信息.
由于每天都会有用户注册,产生新的用户信息,那么每天都需要将MySQL中的用户数据同步到Hive数据仓库中.
假如在1号已经在hive中创建了表并拉取了数据,但是在2号时MySQL中新增2条用户注册数据,并且有1条用户数据发生更新.
那么我们需要对2号的数据进行同步到hive中,新增的数据会直接加载到Hive表中,但是更新的数据如何存储在Hive表中?
方案一:直接覆盖
使用2号的数据 直接将1号的数据覆盖掉
优点:实现最简单,使用起来最方便
缺点:没有历史状态 想查询008之前的数据查看不到
方案二:根据日期构建一份全量的快照表
1号创建一张表拉取所有数据
2号再创建一张表拉取所有数据
... 每天都创建一张表
优点:记录了所有数据在不同时间的状态
缺点:冗余存储了很多没有发生变化的数据,导致存储的数据量过大
方案三:构建拉链表,通过时间标记发生变化的数据的每种状态的时间周期
拉链表的设计是将更新的数据进行状态记录,没有发生更新的数据不进行状态存储,用于存储所有数据在不同时间上的所有状态,通过时间进行标记每个状态的生命周期,查询时,根据需求可以获取指定时间范围状态的数据,默认用9999-12-31等最大值来表示最新状态。
2 拉链表实现原理
1.增量采集变化数据,放入增量表中
2.将Hive中的拉链表与临时表的数据进行合并,合并结果写入临时表
3.将临时表的数据覆盖写入拉链表中
3 拉链表实现演示
创建拉链表
-- 数据准备
vi zipper.txt
001 186xxxx1234 laoda 0 sh 2021-01-01 9999-12-31
002 186xxxx1235 laoer 1 bj 2021-01-01 9999-12-31
003 186xxxx1236 laosan 0 sz 2021-01-01 9999-12-31
004 186xxxx1237 laosi 1 gz 2021-01-01 9999-12-31
005 186xxxx1238 laowu 0 sh 2021-01-01 9999-12-31
006 186xxxx1239 laoliu 1 bj 2021-01-01 9999-12-31
007 186xxxx1240 laoqi 0 sz 2021-01-01 9999-12-31
008 186xxxx1241 laoba 1 gz 2021-01-01 9999-12-31
009 186xxxx1242 laojiu 0 sh 2021-01-01 9999-12-31
010 186xxxx1243 laoshi 1 bj 2021-01-01 9999-12-31
--创建拉链表
create table dw_zipper
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
load data local inpath '/root/zipper.txt' into table dw_zipper;
select * from dw_zipper;
创建增量表
vi update.txt
008 186xxxx1241 laoba 1 sh 2021-01-02 9999-12-31
011 186xxxx1244 laoshi 1 jx 2021-01-02 9999-12-31
012 186xxxx1245 laoshi 0 zj 2021-01-02 9999-12-31
create table ods_update
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
load data local inpath '/root/update.txt' overwrite into table ods_update;
select * from ods_update;
创建临时表
create table tmp_zipper
(
userid string,
phone string,
nick string,
gender int,
addr string,
starttime string,
endtime string
) row format delimited fields terminated by '\t';
合并数据到临时表
insert overwrite table tmp_zipper
select
userid,
phone,
nick,
gender,
addr,
starttime,
endtime
from ods_update
union all
--查询原来拉链表的所有数据,并将这次需要更新的数据的endTime更改为更新值的startTime
select
a.userid,
a.phone,
a.nick,
a.gender,
a.addr,
a.starttime,
--如果这条数据没有更新或者这条数据不是要更改的数据,就保留原来的值,否则就改为新数据的开始时间-1
if(b.userid is null or a.endtime < '9999-12-31', a.endtime , date_sub(b.starttime,1)) as endtime
from dw_zipper a left join ods_update b
on a.userid = b.userid ;
覆盖拉链表数据
insert overwrite table dw_zipper
select * from tmp_zipper;