數(shù)據(jù)分析工具篇-HQL原理及優(yōu)化
HQL是數(shù)據(jù)分析過(guò)程中的必備技能,隨著數(shù)據(jù)量增加,這一技能越來(lái)越重要,熟練應(yīng)用的同時(shí)會(huì)帶來(lái)效率的問(wèn)題,動(dòng)輒十幾億的數(shù)據(jù)量如果處理不完善的話有可能導(dǎo)致一個(gè)作業(yè)運(yùn)行幾個(gè)小時(shí),更嚴(yán)重的還有可能因占用過(guò)多資源而引發(fā)生產(chǎn)問(wèn)題,所以HQL優(yōu)化就變得非常重要,本文我們就深入HQL的原理中,探索HQL優(yōu)化的方法和邏輯。
group by的計(jì)算原理
代碼為:
- SELECT uid, SUM(COUNT) FROM logs GROUP BY uid;
可以看到,group by本身不是全局變量,任務(wù)會(huì)被分到各個(gè)map中進(jìn)行分組,然后再在reduce中聚合。
默認(rèn)設(shè)置了hive.map.aggr=true,所以會(huì)在mapper端先group by一次,最后再把結(jié)果merge起來(lái),為了減少reducer處理的數(shù)據(jù)量。注意看explain的mode是不一樣的。mapper是hash,reducer是mergepartial。如果把hive.map.aggr=false,那將groupby放到reducer才做,他的mode是complete。
優(yōu)化點(diǎn):
Group by主要是面對(duì)數(shù)據(jù)傾斜的問(wèn)題。
很多聚合操作可以現(xiàn)在map端進(jìn)行,最后在Reduce端完成結(jié)果輸出:
- Set hive.map.aggr = true; # 是否在Map端進(jìn)行聚合,默認(rèn)為true;
- Set hive.groupby.mapaggr.checkinterval = 1000000; # 在Map端進(jìn)行聚合操作的條目數(shù)目;
當(dāng)使用Group by有數(shù)據(jù)傾斜的時(shí)候進(jìn)行負(fù)載均衡:
- Set hive.groupby.skewindata = true; # hive自動(dòng)進(jìn)行負(fù)載均衡;
策略就是把MR任務(wù)拆分成兩個(gè)MR Job:第一個(gè)先做預(yù)匯總,第二個(gè)再做最終匯總;
第一個(gè)Job:
Map輸出結(jié)果集中緩存到maptask中,每個(gè)Reduce做部分聚合操作,并輸出結(jié)果,這樣處理的結(jié)果是相同Group by Key有可能被分到不同的reduce中,從而達(dá)到負(fù)載均衡的目的;
第二個(gè)Job:
根據(jù)第一階段處理的數(shù)據(jù)結(jié)果按照group by key分布到reduce中,保證相同的group by key分布到同一個(gè)reduce中,最后完成最終的聚合操作。
join的優(yōu)化原理
代碼為:
- SELECT a.id,a.dept,b.age FROM a join b ON (a.id = b.id);
1)Map階段:
讀取源表的數(shù)據(jù),Map輸出時(shí)候以Join on條件中的列為key,如果Join有多個(gè)關(guān)聯(lián)鍵,則以這些關(guān)聯(lián)鍵的組合作為key;
Map輸出的value為join之后所關(guān)心的(select或者where中需要用到的)列;同時(shí)在value中還會(huì)包含表的Tag信息,用于標(biāo)明此value對(duì)應(yīng)哪個(gè)表;
按照key進(jìn)行排序;
2)Shuffle階段:
根據(jù)key的值進(jìn)行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個(gè)表中相同的key位于同一個(gè)reduce中。
3)Reduce階段:
根據(jù)key的值完成join操作,期間通過(guò)Tag來(lái)識(shí)別不同表中的數(shù)據(jù)。
在多表join關(guān)聯(lián)時(shí):
如果 Join 的 key 相同,不管有多少個(gè)表,都會(huì)合并為一個(gè) Map-Reduce,例如:
- SELECT pv.pageid, u.age
- FROM page_view p
- JOIN user u
- ON (pv.userid = u.userid)
- JOIN newuser x
- ON (u.userid = x.userid);
如果 Join 的 key不同,Map-Reduce 的任務(wù)數(shù)目和 Join 操作的數(shù)目是對(duì)應(yīng)的,例如:
- SELECT pv.pageid, u.age
- FROM page_view p
- JOIN user u
- ON (pv.userid = u.userid)
- JOIN newuser x
- on (u.age = x.age);
優(yōu)化點(diǎn):
1)應(yīng)該將條目少的表/子查詢放在 Join 操作符的左邊。
2)我們知道文件數(shù)目小,容易在文件存儲(chǔ)端造成瓶頸,給 HDFS 帶來(lái)壓力,影響處理效率。對(duì)此,可以通過(guò)合并Map和Reduce的結(jié)果文件來(lái)消除這樣的影響。用于設(shè)置合并屬性的參數(shù)有:
- 合并Map輸出文件:hive.merge.mapfiles=true(默認(rèn)值為真)
- 合并Reduce端輸出文件:hive.merge.mapredfiles=false(默認(rèn)值為假)
- 合并文件的大?。篽ive.merge.size.per.task=256*1000*1000(默認(rèn)值為 256000000)
3) Common join即普通的join,性能較差,因?yàn)樯婕暗搅藄huffle的過(guò)程(Hadoop/spark開發(fā)的過(guò)程中,有一個(gè)原則:能避免不使用shuffle就不使用shuffle),可以轉(zhuǎn)化成map join。
- hive.auto.convert.join=true;# 表示將運(yùn)算轉(zhuǎn)化成map join方式
使用的前提條件是需要的數(shù)據(jù)在 Map 的過(guò)程中可以訪問(wèn)到。
1)啟動(dòng)Task A:Task A去啟動(dòng)一個(gè)MapReduce的local task;通過(guò)該local task把small table data的數(shù)據(jù)讀取進(jìn)來(lái);之后會(huì)生成一個(gè)HashTable Files;之后將該文件加載到分布式緩存(Distributed Cache)中來(lái);
2)啟動(dòng)MapJoin Task:去讀大表的數(shù)據(jù),每讀一個(gè)就會(huì)去和Distributed Cache中的數(shù)據(jù)去關(guān)聯(lián)一次,關(guān)聯(lián)上后進(jìn)行輸出。
整個(gè)階段,沒(méi)有reduce 和 shuffle,問(wèn)題在于如果小表過(guò)大,可能會(huì)出現(xiàn)OOM。
Union與union all優(yōu)化原理
union將多個(gè)結(jié)果集合并為一個(gè)結(jié)果集,結(jié)果集去重。代碼為:
- select id,name
- from t1
- union
- select id,name
- from t2
- union
- select id,name
- from t3
對(duì)應(yīng)的運(yùn)行邏輯為:

union all將多個(gè)結(jié)果集合并為一個(gè)結(jié)果集,結(jié)果集不去重。使用時(shí)多與group by結(jié)合使用,代碼為:
- select all.id, all.name
- from(
- select id,name
- from t1
- union all
- select id,name
- from t2
- union all
- select id,name
- from t3
- )all
- group by all.id ,all.name
對(duì)應(yīng)的運(yùn)行邏輯為:

從上面的兩個(gè)邏輯圖可以看到,第二種寫法性能要好。union寫法每?jī)煞輸?shù)據(jù)都要先合并去重一次,再和另一份數(shù)據(jù)合并去重,會(huì)產(chǎn)生較多次的reduce。第二種寫法直接將所有數(shù)據(jù)合并再一次性去重。
對(duì)union all的操作除了與group by結(jié)合使用還有一些細(xì)節(jié)需要注意:
1)對(duì) union all 優(yōu)化只局限于非嵌套查詢。
原代碼:job有3個(gè):
- SELECT *
- FROM
- (
- SELECT *
- FROM t1
- GROUP BY c1,c2,c3
- UNION ALL
- SELECT *
- FROM t2
- GROUP BY c1,c2,c3
- )t3
- GROUP BY c1,c2,c3
這樣的結(jié)構(gòu)是不對(duì)的,應(yīng)該修改為:job有1個(gè):
- SELECT *
- FROM
- (
- SELECT *
- FROM t1
- UNION ALL
- SELECT *
- FROM t2
- )t3
- GROUP BY c1,c2,c3
這樣的修改可以減少job數(shù)量,進(jìn)而提高效率。
2)語(yǔ)句中出現(xiàn)count(distinct …)結(jié)構(gòu)時(shí):
原代碼為:
- SELECT *
- FROM
- (
- SELECT * FROM t1
- UNION ALL
- SELECT c1,c2,c3,COUNT(DISTINCT c4)
- FROM t2 GROUP BY c1,c2,c3
- ) t3
- GROUP BY c1,c2,c3;
修改為:(采用臨時(shí)表消滅 COUNT(DISTINCT)作業(yè)不但能解決傾斜問(wèn)題,還能有效減少jobs)。
- INSERT t4 SELECT c1,c2,c3,c4 FROM t2 GROUP BY c1,c2,c3;
- SELECT c1,c2,c3,SUM(income),SUM(uv) FROM
- (
- SELECT c1,c2,c3,income,0 AS uv FROM t1
- UNION ALL
- SELECT c1,c2,c3,0 AS income,1 AS uv FROM t2
- ) t3
- GROUP BY c1,c2,c3;
Order by的優(yōu)化原理
如果指定了hive.mapred.mode=strict(默認(rèn)值是nonstrict),這時(shí)就必須指定limit來(lái)限制輸出條數(shù),原因是:所有的數(shù)據(jù)都會(huì)在同一個(gè)reducer端進(jìn)行,數(shù)據(jù)量大的情況下可能不能出結(jié)果,那么在這樣的嚴(yán)格模式下,必須指定輸出的條數(shù)。
所以數(shù)據(jù)量大的時(shí)候能不用order by就不用,可以使用sort by結(jié)合distribute by來(lái)進(jìn)行實(shí)現(xiàn)。
sort by是局部排序;
distribute by是控制map怎么劃分reducer。
- cluster by=distribute by + sort by
被distribute by設(shè)定的字段為KEY,數(shù)據(jù)會(huì)被HASH分發(fā)到不同的reducer機(jī)器上,然后sort by會(huì)對(duì)同一個(gè)reducer機(jī)器上的每組數(shù)據(jù)進(jìn)行局部排序。

例如:
- select mid, money, name
- from store
- cluster by mid
- select mid, money, name
- from store
- distribute by mid
- sort by mid
如果需要獲得與上面的中語(yǔ)句一樣的效果:
- select mid, money, name
- from store
- cluster by mid
- sort by money
注意被cluster by指定的列只能是降序,不能指定asc和desc。
不過(guò)即使是先distribute by然后sort by這樣的操作,如果某個(gè)分組數(shù)據(jù)太大也會(huì)超出reduce節(jié)點(diǎn)的存儲(chǔ)限制,常常會(huì)出現(xiàn)137內(nèi)存溢出的錯(cuò)誤,對(duì)大數(shù)據(jù)量的排序都是應(yīng)該避免的。
Count(distinct …)優(yōu)化
如下的sql會(huì)存在性能問(wèn)題:
- SELECT COUNT( DISTINCT id ) FROM TABLE_NAME WHERE ...;
主要原因是COUNT這種“全聚合(full aggregates)”計(jì)算時(shí),它會(huì)忽略用戶指定的Reduce Task數(shù),而強(qiáng)制使用1,這會(huì)導(dǎo)致最終Map的全部輸出由單個(gè)的ReduceTask處理。這唯一的Reduce Task需要Shuffle大量的數(shù)據(jù),并且進(jìn)行排序聚合等處理,這使得它成為整個(gè)作業(yè)的IO和運(yùn)算瓶頸。
圖形如下:

為了避免這一結(jié)構(gòu),我們采用嵌套的方式優(yōu)化sql:
- SELECT COUNT(*)
- FROM (
- SELECT DISTINCT id FROM TABLE_NAME WHERE …
- ) t;
這一結(jié)構(gòu)會(huì)將任務(wù)切分成兩個(gè),第一個(gè)任務(wù)借用多個(gè)reduce實(shí)現(xiàn)distinct去重并進(jìn)行初步count計(jì)算,然后再將計(jì)算結(jié)果輸出到第二個(gè)任務(wù)中進(jìn)行計(jì)數(shù)。
另外,再有的方法就是用group by()嵌套代替count(distinct a)。
如果能用group by的就盡量使用group by,因?yàn)間roup by性能比distinct更好。
HiveSQL細(xì)節(jié)優(yōu)化
1) 設(shè)置合理的mapreduce的task數(shù),能有效提升性能。
- set mapred.reduce.tasks=n
2) 在sql中or的用法需要加括號(hào),否則可能引起無(wú)分區(qū)限制:
- Select x
- from t
- where ds=d1
- and (province=’gd’ or province=’gx’)
3) 對(duì)運(yùn)算結(jié)果進(jìn)行壓縮:
- set hive.exec.compress.output=true;
4) 減少生成的mapreduce步驟:
4.1)使用CASE…WHEN…代替子查詢;
4.2)盡量盡早地過(guò)濾數(shù)據(jù),減少每個(gè)階段的數(shù)據(jù)量,對(duì)于分區(qū)表要加分區(qū),同時(shí)只選擇需要使用到的字段;
5) 在map階段讀取數(shù)據(jù)前,F(xiàn)ileInputFormat會(huì)將輸入文件分割成split。split的個(gè)數(shù)決定了map的個(gè)數(shù)。
- mapreduce.input.fileinputformat.split.minsize 默認(rèn)值 0
- mapreduce.input.fileinputformat.split.maxsize 默認(rèn)值 Integer.MAX_VALUE
- dfs.blockSize 默認(rèn)值 128M,所以在默認(rèn)情況下 map的數(shù)量=block數(shù)
6) 常用的參數(shù):
- hive.exec.reducers.bytes.per.reducer=1000000;
設(shè)置每個(gè)reduce處理的數(shù)據(jù)量,reduce個(gè)數(shù)=map端輸出數(shù)據(jù)總量/參數(shù);
- set hive.mapred.mode=nonstrict;
- set hive.exec.dynamic.partition=true;
- set hive.exec.dynamic.partition.mode=nonstrict;
- set mapred.job.name=p_${v_date};
- set mapred.job.priority=HIGH;
- set hive.groupby.skewindata=true;
- set hive.merge.mapredfiles=true;
- set hive.exec.compress.output=true;
- set mapred.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
- set mapred.output.compression.type=BLOCK;
- set mapreduce.map.memory.mb=4096;
- set mapreduce.reduce.memory.mb=4096;
- set hive.hadoop.supports.splittable.combineinputformat=true;
- set mapred.max.split.size=16000000;
- set mapred.min.split.size.per.node=16000000;
- set mapred.min.split.size.per.rack=16000000;
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- set hive.exec.reducers.bytes.per.reducer=128000000;
7)設(shè)置map個(gè)數(shù):
map個(gè)數(shù)和來(lái)源表文件壓縮格式有關(guān),.gz格式的壓縮文件無(wú)法切分,每個(gè)文件會(huì)生成一個(gè)map;
- set hive.hadoop.supports.splittable.combineinputformat=true; 只有這個(gè)參數(shù)打開,下面的3個(gè)參數(shù)才能生效
- set mapred.max.split.size=16000000; 每個(gè)map負(fù)載;
- set mapred.min.split.size.per.node=100000000; 每個(gè)節(jié)點(diǎn)map的最小負(fù)載,這個(gè)值必須小于set mapred.max.split.size的值;
- set mapred.min.split.size.per.rack=100000000; 每個(gè)機(jī)架map的最小負(fù)載;
- set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;