Hive的几种Join方式

Hive中的Join与RDBMS类似,都是为了操纵多张表的数据而进行连接操作,不同的是Hive使用HDFS作为底层数据存储,实际使用Map/Reduce执行引擎进行处理,这里面就涉及特殊的场景需要一些特殊的优化Join。


Left/Right/Full Outer Join(Full Join)

  • left outer join 右边无匹配则置为NULL
  • right outer join 左边无匹配则置为NULL
  • full outer join 得到两张表的全部记录,同时,左边无匹配则为NULL, 右边无匹配则为NULL
1
2
3
4
select a.*
from tablename1 a
left outer join tablename2 b
on a.id = b.id

Left Semi Join

left semi join在Hive中是用来替代 IN/EXISTS子查询的, 其查询相比IN/EXISTS而言更加高效。
left semi join右边的表只能在ON中设置过滤条件,在where,select子句或其它地方都不能设置过滤,且查询的结果不能有右边的表的字段。
例如IN写法:

1
2
3
4
5
6
select a.*
from tablename1 a
where a.id in (
select b.id
from tablename2 b
)

使用 left semin join改写:

1
2
3
4
select a.*
from tablename1 a
left semi join tablename2 b
on a.id = b.id

Cross Join

即最常见的JOIN查询,cross join的缺点是,两个表会做笛卡尔(Cartesian)积,如果有where,再做对做了笛卡尔积之后的结果过滤,这样大大增加了开销:

join 和 cross join都一样:

1
2
3
4
5
6
7
8
9
select a.*
from tablename1 a
join tablename2 b
where a.id=1
select a.*
from tablename1 a
cross join tablename2 b
on a.id = b.id

map-side join

有一种场景是一个大表(很多行)join 一个小表,此时可以使用hive的map-side join提高join的效率。
在map-side join中,小表缓存到内存中(要注意小表的大小)将小表复制到mapper中,大表通过mapper进行流式传输,与小表进行关联。
采用这种方式,Hive将在mapper端完成join,而不用使用reducer。

有两种设置方式:

使用 /+ MAPJOIN(smalltable)/ hint

1
2
3
4
select /*+ MAPJOIN(smalltable)*/ a.*, b.*
from bigtable a
join smalltable b
on a.id = b.id;

设置属性,然后join:

1
2
3
4
5
6
set hive.auto.convert.join=true;
select a.*, b.*
from bigtable a
join smalltable b
on a.id = b.id;

另一个数据hive.mapjoin.smalltable.filesize=25000000可以设置表的大小,作为开启和关闭map-join的阈值

使用map-join存在以下限制,即下属场景不支持map-join:

  • union后进行map-join
  • Lateral view 之后进行map-join
  • ruduce sink(group by/join/sort by/cluster by/distribute by)之后进行map-join
  • map-join之后进行union
  • map-join之后进join
  • map-join之后进行map-join

bucket map join

使用bucket map join时,一个表的bucket(可以理解为篮子,桶)数必须是另一个表的倍数。例如一个表有2个bucket,那么另一个表的bucket数必须是(2,4,6…)。
满足上述条件,则在join直接在mapper端就可以完成,这意味着只有查询所需要的bucket会被加载到mapper中,而不是整个表,也即只会把smalltable中匹配的bucket复制到mapper中,提高查询效率。不满足上述条件则使用inner join(交集)。

使用时必须先设置属性:

1
set hive.optimize.bucketmapjoin = true

在插入数据的时候,我们也必须设置:

1
set hive.enforce.bucketing = true;

建表:

1
2
3
CREATE TABLE weblog (user_id INT, url STRING, age INT, source_ip STRING)
PARTITIONED BY (dt STRING)
CLUSTERED BY (user_id) SORTED BY (age) INTO 96 BUCKETS;

插入数据:

1
2
3
4
INSERT OVERWRITE TABLE weblog PARTITION (dt='2009-02-25')
SELECT user_id, url, source_ip
FROM tmp_cdn_log
WHERE dt='2009-02-25' SORTED BY age;

bucket map join查询:

1
2
3
4
5
6
SELECT /*+ MAPJOIN(Sales_orc, Location) */ a.*, b.*, c.*
FROM Sales a
JOIN Sales_orc b
ON a.id = b.id
JOIN Location
ON a.id = c.id;

tips:

bucket是在partition的基础上进一步划分数据,通过使用hash函数计算user_id的值属于哪个bucket,便将该数据放到指定的bucket中(HDFS中表现为一个文件)
同时,每个bucket里的数据也是可以排序的


bucket sort merge map join

bucket map join的高级版,如果表中的数据是sorted且在join的列上进行了bucketed。所有的表必须有相同的bucket数,每个mapper会读取从每个table中读取一个bucket执行sort merge map join。

设置:

1
2
3
4
Set hive.enforce.sorting = true;
set hive.input.format = org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;

查询:

1
2
3
4
5
SELECT /*+ MAPJOIN(smalltable1, smalltable2) */ a.*, b.*, c.*
FROM bigtable a
JOIN smalltable1 b
ON a.id = b.id
JOIN smalltable2 ON a.id = c.id;

skew join

skew join 用于处理一个表在join column上有大量skew data(倾斜数据的情况)。比如,一个表中join column的 name=’join’的记录数相比其它的数据而言非常多,就会导致
name=’join’的数据被分配到单独的reducer,而系统必须等待这个reducer完成,但由于数据量巨大造成瓶颈。而skew join则可以解决该问题。

比如有两张表A(id, name, age), B(id, aid, ..), A表在id=10上有大量数据(数据倾斜),b表中id=10的并不多(可以加载到内存中,)skew join时,会在mapper(多个)中读取A表的skew key(即id),与内存中的A表的id=10的数据进行join,直接得到结果输出,而不用将数据传输到reducer。A剩余的key(即id<>10),仅会使用一个Map/Reduce进行处理。

设置:

1
2
set hive.optimize.skewjoin=true;
set hive.skewjoin.key=100000;

查询:

1
SELECT a.* FROM skewdatatable a JOIN smalltable b ON a.id = b.id;
坚持原创技术分享,您的支持将鼓励我继续创作!