第一种
1.从ODS层获取增量数据(上一天新增和更新的数据)
2.拿着DWD原始拉链表数据 left join 增量数据 ,修改原始拉链中历史数据的结束时间
3.拿着left join 的结果集 union all 增量数据
4.把最新的拉链数据优先保存到DWD对应的临时表中
5.使用insert+select 方式把临时表中数据灌入DWD拉链表中
案例
create table tb_mysql(
id int,
name varchar(20) comment '姓名',
address varchar(20) comment '地址',
create_time datetime not null default current_timestamp comment '创建时间',
update_time datetime not null default current_timestamp on update current_timestamp comment '更新时间'
);
insert into tb_mysql values (1,'李四','北京',timestampadd(day,-12,current_timestamp),update_time),
(2,'王五','上海',timestampadd(day,-8,current_timestamp),update_time),
(3,'赵六','广州',timestampadd(day,-1,current_timestamp),update_time),
(4,'孙悟空','深圳',timestampadd(day,-3,current_timestamp),update_time),
(5,'猪八戒','天津',timestampadd(day,-5,current_timestamp),update_time),
(6,'沙和尚','重庆',timestampadd(day,-6,current_timestamp),update_time);
create table tb_dw(
id int,
name varchar(20) comment '姓名',
address varchar(20) comment '地址',
create_time datetime comment '创建时间',
update_time varchar(20) comment '更新时间',
start_time varchar(20) comment '起始时间',
end_time varchar(20) comment '结束时间'
);
-- 第一全量导入
insert into tb_dw select *,current_date,'9999-99-99' from tb_mysql;
-- mysql原始表中新增数据和更新数据
insert into tb_mysql values (7,'唐僧','西安',current_timestamp,update_time);
update tb_mysql set address='南京' where name='李四';
-- 第二天对昨天的数据进行数数据导入
-- 查询新增的数据
-- 修改原来的表数据
-- 增加新的数据
with tb_add_new as(
-- 查询新增的数据
select *,'2024-5-12' as start_time,'9999-99-99' as end_time from tb_mysql where date(create_time)=current_date or date(update_time) =current_date
),
tb_left_join as(
-- 筛序哪个更新
select
tb_dw.id,
tb_dw.name,
tb_dw.address,
tb_dw.create_time,
tb_dw.update_time,
tb_dw.start_time,
if(tb_add_new.id is null,tb_dw.end_time,'2024-5-12') as end_time
from tb_dw left join tb_add_new on tb_dw.id = tb_add_new.id
)
-- 增加新的数据
select * from tb_left_join union all select * from tb_add_new;