参考https://hadoopsters.com/how-random-sampling-in-hive-works-and-how-to-use-it-7cdb975aa8e2,可以直接查看原文,下面只是对原文进行概括和实际性能测试。
- 1.distribute by + sort by
- 2.测试
- 3.map端数据过滤优化采样
在说数据采样之前,需要先了解下hivesql中几个... by
的区别,也是面试中比较容易问的问题。
1)group by:分组。
2)cluster by:cluster by=distribute by+sort by,唯一区别在于cluster by数据分发和排序的字段或函数只能是同一个,而distribute by+sort by可以不同。
3)distribute by:仅数据分发,相同的字段值或函数值会被分发到同一个reducer,不保证reducer中的结果顺序。
4)sort by:局部(reducer)排序,只保证同一个reducer中的数据有序,不保证全局顺序。
5)order by:全局排序,将所有数据拉取到一个reducer中排序。
以上参考:https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SortBy#LanguageManualSortBy-SyntaxofClusterByandDistributeBy
因为分布式环境并不保证每次返回的结果顺序,因此好像直接limit进行随机采样也不是不可以,但是直接limit采样有个非常明显的弊端:采样数据分布不均。
举例来说,select * from tb limit 10
,假设key分别为a
、b
、c
,量级分别为5000,3000,2000的数据分布在3个reducer上,则hive在满足limit量级限制的情况下会优先从单个reducer采样,当单个reducer数据量不够采样量级时再从其它reducer取,这样key为a
、b
、c
的数据量级就和原有每个key的总量级不匹配,不能很好的代表整体。所以说limit在分布式环境中只能算是一种伪随机。
下面这个sql的执行结果可以印证上面的过程(表中std_tag为购物;超市
总数据量为1375218)。
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
distribute by std_tag sort by rand() limit 2000000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
1.distribute by + sort by
从上面可以看到造成结果伪随机的原因就是每个reducer中的数据不随机,相同的key数据都在同一个reducer,因此可以通过distribute by + 随机数
的方式对数据随机分发,保证了reducer中数据的随机性。
每个reducer内部中,再通过sort by + 随机数
的方式对数据局部随机排序,这样就能保证数据完全无序,样本不同key的量级也能代表整体(以上也可直接简写为cluster by 随机数
)。
order by + 随机数
也行,但是分布式环境中理论上没有distribute by 随机数 sort by 随机数
这种方式快,后者多个reducer同时处理更好的利用了集群资源。
2.测试
测试数据包含下面三类数据及量级。
房地产;内部楼栋(7819533)
公司企业;公司(4110450)
购物;超市(1375218)
1)distribute by + sort by
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
distribute by rand() sort by rand() limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
2)cluster by
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
cluster by rand() limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
3)order by
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
order by rand() limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
4)仅limit
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
3.map端数据过滤优化采样
思想就是在map端就过滤一部分数据,减少shuffle的数据量。
eg:
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
and rand() <= 0.01
distribute by rand() sort by rand() limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
rand()
用于生成[0, 1]的随机数,<=0.01的概率为1%,总数据量1300w+,那么理论上到达reducer的数据量有13w+,因此不影响最终的采样结果。
如果像下面这样将阈值设置为rand()<=0.0001
,到达reducer的数据量占总数据量的0.1%(约1.3w),虽然最终结果的量级占比正确,但总量级不够采样数量。
drop view if exists sample;
create temporary view sample as
select * from poi_data.poi_res where std_tag in ('房地产;内部楼栋', '公司企业;公司', '购物;超市')
and rand() <= 0.001
distribute by rand() sort by rand() limit 100000;
select std_tag, count(*) from sample group by std_tag order by count(*) desc;
因此要注意阈值的合理设置,设置大了优化效果不明显,设置小了影响采样结果。