站浏览量 站访问人数
目录
  1. 1. hive的笛卡尔积
  2. 2. 优化历程
  3. 3. 代码实现
  4. 4. 参数设置
  5. 5. smbJoin
  6. 6. 后话

hive的笛卡尔积

hive中默认不使用笛卡尔积,需要开启才能运行,hive不建议运行笛卡尔积,但是,在实践过程中,难免需要使用到,这时就得开启该功能;然后,hive中做笛卡尔积的结果就是最后的数据运算都集中在一个reduce上,亲测确实如此。那么,该怎么优化呢?

优化历程

笛卡尔积计算需要把数据全部遍历一遍才行,使用笛卡尔积理论上数据需要保存进内存,如果有多个任务运行,那么就得把数据复制多份,然而实时却并非如此;因为,笛卡尔积的计算最终是计算出结果,因此,会把数据积压到一个reduce上,而不是多份(虽然难以理解,但是,实践测试多次,还真心如此)。
因此,想要优化,必须加大reduce的个数,这时需要有分区(或者分块)的概念,即把数据copy成多份,进行join操作,在计算时把小部分数据与全部数据进行join,达到笛卡尔积的效果。

代码实现

1,获得表a
2,copya表成b,注意是拓展的copy,即分区成5份(看资源情况可设置成更多份):

drop table if exists b;
create table b
as
select from
(
select aa.
,’1,2,3,4,5’ as old_part a aa
)b lateral view explode(split(old_part,’,’)) b as part
;

3,把表a的记录进行随机打散:

drop table if exists aa;
create table aa
as
select aaa.
,cast(round(rand()
4+1.0,0) as int) as part from a aaa
;

4,做一般的join操作

drop table if exists aab;
create table aab
as
select
a.id
,b.id as id_tj
from aa a
join b b
on a.part=b.part
;

5,这样就可5个reduce计算,比单独的一个reduce快的多

参数设置

数据倾斜
set hive.optimize.skewjoin = true;
set hive.skewjoin.key = skew_key_threshold (default = 100000)

使用mapjoin
set hive.mapred.mode=nonstrict;(当只有一个reducer时不使用迪卡尔积)
自动识别,判断小的表存入内容
set hive.auto.convert.join=true;

smbJoin

当两个大表做笛卡尔积时,可将数据进行分桶,把数据切块,然后每个桶中进行join操作,最后再进行merge。但是,为了加快速度,每个桶中的记录条数不能过多,否则就起不到优化的效果,因此,设置桶数量时,要根据数据总量来变动设置,保证每个桶的记录数在1万左右。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
先设置一个分桶的空表,注意需要分区和排序,

use 库名;
drop table if exists 表名;
create table 表名
(
a int,
b int,
c int
)
clustered by(a,b) sorted by(a,b) into 8 buckets
row format delimited fields terminated by '\t'
stored as textfile;
;

再把数据进行插入,插入数据时需要设置bucket等参数,否则元数据存储的文件数量还是1.

set mapreduce.job.reduces=60;
set hive.enforce.bucketing = true;
set hive.optimize.bucketmapjoin=true;
set hive.optimize.bucketmapjoin.sortedmerge=true;
set hive.enforce.sortmergebucketmapjoin=false;
set hive.auto.convert.sortmerge.join=true;

use 库名;
insert overwrite table 表名
select a
,b
,c
from 来源表
;


最后做正常的join操作即可。
select /*+ Mapjoin(a) */
.....
from 表 a
join 表 b on (a.user_id=b.user_id)
where ....
group by ...

后话

1, hive是分布式计算结构化数据的一个工具,本质上使用map-reduce计算框架;
2,hive操作与mysql类似,但有不同,比如表的in操作等;
3,使用map-reduce计算的方式,都需要考虑并行和分区概念。
4,上述方法将数据进行分区,类似于hive的分箱操作,但是,在大量数据比如20万条数据下做笛卡尔积,虽然使用了分区扩展,运行起来也是十分慢,因为20万笛卡尔积需要每条数据计算20万次,所以,这时需要做一些业务上或者牺牲精度上选择,减少每次计算的次数,比如每条数据尽量不要与全部的20万条计算,可以加一些过滤条件,这些条件在计算笛卡尔积前就设置好,然后做join时在on条件带上;
5,除了使用hive的笛卡尔积计算记录两两关系,还可以使用spark或flink下计算,这时可以操纵计算出的结果值,避免后续排序再选前100条时不用全量排序再选。这样做一个job就变成了两个(spark计算完,再从hdfs导入hive里),虽然麻烦,但是时间优化很快。