HBase Compaction 原理與線上調(diào)優(yōu)實(shí)踐
一、Compaction 介紹
HBase 是基于一種 LSM-Tree(Log-Structured Merge Tree)體系架構(gòu)的存儲(chǔ)模型設(shè)計(jì)的,寫入時(shí)先寫入 WAL(Write-Ahead-Log)日志,再寫入 Memstore 緩存,滿足一定條件后,會(huì)執(zhí)行 Flush 操作將緩存數(shù)據(jù)刷寫到磁盤,生成一個(gè) HFile 數(shù)據(jù)文件。隨著數(shù)據(jù)不斷寫入,HFile 文件會(huì)越來越多,文件太多導(dǎo)致查詢數(shù)據(jù)時(shí) IO 次數(shù)增加,進(jìn)而影響到 HBase 的查詢性能。為了優(yōu)化讀的性能,采用合并小 HFile 的方法來減少文件數(shù)量,這種合并 HFile 的操作就稱為 Compaction。Compaction 是從一個(gè) Region 的一個(gè) Store 中選擇部分 HFile 文件進(jìn)行合并的過程。合并原理是從這些待合并的數(shù)據(jù)文件中依次讀出 KeyValue,由小到大排序后寫入一個(gè)新的文件中。之后這個(gè)新生成的文件就會(huì)取代之前已合并的所有文件對(duì)外提供服務(wù)。
1.1 Compaction 的分類
HBase 根據(jù)合并規(guī)模將 Compaction 分為兩類:Minor Compaction 和 Major Compaction。
- Minor Compaction 是指選取部分小的、相鄰的 HFile,將它們合并成一個(gè)更大的 HFile;
- Major Compaction 是指將一個(gè)Store 中所有的 HFile 合并成一個(gè) HFile,這個(gè)過程會(huì)清理三種無意義的數(shù)據(jù):TTL 過期數(shù)據(jù)、被刪除的數(shù)據(jù)與版本號(hào)超過設(shè)定版本號(hào)的數(shù)據(jù)。
下圖形象的描述了2種 Compaction 的區(qū)別:
一般情況下,Major Compaction 持續(xù)時(shí)間比較長,整個(gè)過程消耗大量系統(tǒng)資源,因此線上數(shù)據(jù)量較大的業(yè)務(wù)通常推薦關(guān)閉自動(dòng)觸發(fā) Major Compaction 功能,改為在業(yè)務(wù)低峰期手動(dòng)觸發(fā)(或設(shè)置策略自動(dòng)在低峰期觸發(fā))。
1.2 Compaction的意義
- 合并小文件,減少文件數(shù),提升讀取性能,穩(wěn)定隨機(jī)讀延遲;
- 合并的時(shí)候會(huì)讀取遠(yuǎn)程 DataNode 上的文件寫入本地 DataNode,提高數(shù)據(jù)的本地化率;
- 清除過期數(shù)據(jù)和被刪除的數(shù)據(jù),減少表的存儲(chǔ)量。
1.3 Compaction觸發(fā)時(shí)機(jī)
HBase 中觸發(fā) Compaction 的時(shí)機(jī)有很多種,最常見的觸發(fā)時(shí)機(jī)有三種:后臺(tái)線程周期性檢查時(shí)觸發(fā)、MemStore Flush 觸發(fā)以及手動(dòng)觸發(fā)。
(1)后臺(tái)線程周期性檢查:后臺(tái)線程 CompactionChecker 會(huì)定期檢查是否需要執(zhí)行 Compaction,檢查周期為
hbase.server.thread.wakefrequency *hbase.server.compactchecker.interval.multiplier,這里主要考慮的是一段時(shí)間內(nèi)沒有寫入請(qǐng)求導(dǎo)致 Flush 觸發(fā)不了 Compaction 的情況。其中參數(shù) hbase.server.thread.wakefrequency 默認(rèn)值是10s,是 HBase 服務(wù)端線程喚醒時(shí)間間隔,參數(shù) hbase.server.compactchecker.interval.multiplier 默認(rèn)值1000,是 Compaction 操作周期性檢查乘數(shù)因子。10 * 1000 s 約等于 2hrs 46mins 40sec。
(2)MemStore Flush:Compaction 的根源在于 Flush,MemStore 達(dá)到一定閾值就會(huì)觸發(fā) Flush ,將內(nèi)存中的數(shù)據(jù)刷寫到磁盤生成 HFile 文件,隨著 HFile 文件越來越多就需要執(zhí)行 Compaction。HBase 每次 Flush之后,都會(huì)判斷是否需要進(jìn)行 Compaction,一旦滿足 Minor Compaction 或 Major Compaction 的條件便會(huì)觸發(fā)執(zhí)行。
(3)手動(dòng):是指通過 HBase API、HBase Shell 或者 Master UI 界面等方式執(zhí)行 compact、major_compact 等命令。
二、Compaction流程
了解完基本的背景后,接下來介紹 Compaction 的整個(gè)過程。
- RegionServer 啟動(dòng)一個(gè) Compaction 檢查線程,定期對(duì) Region 的 Store 進(jìn)行檢查;
- Compaction 始于特定的觸發(fā)條件。一旦觸發(fā),HBase 會(huì)將該 Compaction 交由一個(gè)獨(dú)立的線程處理;
- 從對(duì)應(yīng)的
Store 中選擇合適的 HFile 文件,這步是整個(gè) Compaction
的核心,選取文件時(shí)需要遵循很多條件,比如文件數(shù)既不能太多也不能太少、文件大小不能太大等,盡可能地選取承載 IO
負(fù)載重的文件集?;诖耍琀Base 實(shí)現(xiàn)了多種文件選取策略:常用的有
RatioBasedCompactionPolicy、
ExploringCompactionPolicy和
StripeCompactionPolicy 等,也支持自定義的 Compaction 算法; - 選出待合并的文件后,會(huì)根據(jù)這些 HFile 文件的總大小選擇對(duì)應(yīng)的線程池來進(jìn)行處理;
- 對(duì)這些文件執(zhí)行具體的 Compaction 操作。
下圖簡單的描述了上述流程。
下面對(duì)圖2中具體的每一步進(jìn)行詳細(xì)說明。
2.1 啟動(dòng) Compaction 定時(shí)線程
在 RegionServer 啟動(dòng)時(shí),會(huì)初始化 CompactSplitThread 線程以及定時(shí)檢查的 CompactionChecker ,默認(rèn)10s執(zhí)行一次。
// Compaction thread
this.compactSplitThread = new CompactSplitThread(this);
// Background thread to check for compactions; needed if region has not gotten updates
// in a while. It will take care of not checking too frequently on store-by-store basis.
this.compactionChecker = new CompactionChecker(this, this.threadWakeFrequency, this);
if (this.compactionChecker != null) choreService.scheduleChore(compactionChecker);
其中 CompactSplitThread 是用來實(shí)現(xiàn) Compaction 以及 Split 流程的類,而 CompactChecker 是用來周期性檢查是否執(zhí)行 Compaction 的。
CompactionChecker 是 ScheduledChore 類型,而 ScheduledChore 是 HBase定期執(zhí)行的一個(gè) Task。
2.2 觸發(fā) Compaction
Compaction 的觸發(fā)時(shí)機(jī)在上面已經(jīng)介紹過,下面對(duì)這3種觸發(fā)機(jī)制進(jìn)行詳細(xì)的介紹。
2.2.1 后臺(tái)線程周期性檢查
后臺(tái)線程 CompactionChecker 定期檢查是否需要執(zhí)行 Compaction,檢查周期為:hbase.regionserver.compaction.check.period(默認(rèn)10s)。
(1)首先檢查文件數(shù)是否大于可執(zhí)行 Compaction 的文件數(shù),一旦大于就會(huì)觸發(fā) Compaction。
(2)如果不滿足,會(huì)接著檢查是否到了 Major Compaction 的執(zhí)行周期。如果當(dāng)前 Store 中 HFile 的最早更新時(shí)間早于某個(gè)值 mcTime,就會(huì)觸發(fā) Major Compaction,其中 mcTime 是一個(gè)浮動(dòng)值,浮動(dòng)區(qū)間默認(rèn)為[7-7*0.2,7+7*0.2],其中7為配置項(xiàng) hbase.hregion.majorcompaction 設(shè)置,0.2為配置項(xiàng) hbase.hregion.
majorcompaction.jitter,所以在7天左右就會(huì)執(zhí)行一次 Major Compaction。用戶如果想禁用 Major Compaction,只需要將參數(shù)hbase.hregion
.majorcompaction 設(shè)為0。
(3)如果到了 Major Compaction 的執(zhí)行周期:
- 首先判斷有幾個(gè) HFile 文件,如果只有1個(gè)文件,會(huì)判斷是否有過期數(shù)據(jù)、本地化率是否比較低,如果都不滿足就不做 Major
Compaction; - 如果大于1個(gè)文件,也會(huì)做 Major
Compaction。
后臺(tái)線程周期性檢查的流程如圖3所示。
下面是該線程的關(guān)鍵代碼:
//ScheduledChore的run方法會(huì)一直調(diào)用chore函數(shù)
@Override
protected void chore() {
// 遍歷instance下的所有online的region 進(jìn)行循環(huán)檢測
// onlineRegions是HRegionServer上存儲(chǔ)的所有能夠提供有效服務(wù)的在線Region集合;
for (HRegion r : this.instance.onlineRegions.values()) {
if (r == null)
continue;
// 取出每個(gè)region的store
for (Store s : r.getStores().values()) {
try {
// 檢查是否需要compact的時(shí)間間隔 hbase.server.compactchecker.interval.multiplier * hbase.server.thread.wakefrequency,multiplier默認(rèn)1000;
long multiplier = s.getCompactionCheckMultiplier();
assert multiplier > 0;
// 未到multiplier的倍數(shù)跳過,每當(dāng)?shù)蜃觟teration為合并檢查倍增器multiplier的整數(shù)倍時(shí),才會(huì)發(fā)起檢查
if (iteration % multiplier != 0) continue;
// 需要合并的話,發(fā)起SystemCompaction請(qǐng)求,此處最終比較的是是否當(dāng)前hfile數(shù)量減去正在compacting的文件數(shù)大于設(shè)置的compact min值。若滿足則執(zhí)行systemcompact
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestSystemCompaction(r, s, getName()
+ " requests compaction");
} else if (s.isMajorCompaction()) {
if (majorCompactPriority == DEFAULT_PRIORITY
|| majorCompactPriority > r.getCompactPriority()) {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use default priority", null);
} else {
this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ " requests major compaction; use configured priority",
this.majorCompactPriority, null);
}
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
iteration = (iteration == Long.MAX_VALUE) ? 0 : (iteration + 1);
}
2.2.2 Memstore Flush 觸發(fā)
Memstore Flush 會(huì)產(chǎn)生 HFile 文件,文件越來越多就需要 Compaction。因此在每次執(zhí)行完 Flush 操作之后,都會(huì)對(duì)當(dāng)前 Store 中的文件數(shù)進(jìn)行判斷,一旦文件數(shù)超過 Compaction 的閾值 ,就會(huì)觸發(fā) Compaction。這里需要強(qiáng)調(diào)的是,Compaction 是以 Store 為單位進(jìn)行的,而在 Flush 觸發(fā)條件下,整個(gè) Region 的所有 Store 都會(huì)執(zhí)行 Compaction,所以會(huì)在短時(shí)間內(nèi)可能會(huì)執(zhí)行多次 Compaction。下面是 Flush 操作觸發(fā) Compaction 的代碼。
/**
* Flush a region.
* @param region Region to flush.
* @param emergencyFlush Set if we are being force flushed. If true the region
* needs to be removed from the flush queue. If false, when we were called
* from the main flusher run loop and we got the entry to flush by calling
* poll on the flush queue (which removed it).
* @param forceFlushAllStores whether we want to flush all store.
* @return true if the region was successfully flushed, false otherwise. If
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
private boolean flushRegion(final Region region, final boolean emergencyFlush,
boolean forceFlushAllStores) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
lock.readLock().lock();
try {
// flush
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flush(forceFlushAllStores);
// 檢查是否需要compact
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
// 檢查是否需要split
boolean shouldSplit = ((HRegion)region).checkSplit() != null;
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
// 發(fā)起compact請(qǐng)求
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" + (region != null ? (" for region " +
Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
}
return true;
}
2.2.3 手動(dòng)觸發(fā)
手動(dòng)觸發(fā)就是通過命令或者 API 接口手動(dòng)觸發(fā) Compaction,手動(dòng)觸發(fā)的原因有三個(gè):
- 很多業(yè)務(wù)擔(dān)心自動(dòng) Major Compaction 影響讀寫性能,因此會(huì)選擇低峰期手動(dòng)觸發(fā);
- 用戶在執(zhí)行完修改ttl的屬性后希望立刻生效,執(zhí)行手動(dòng)觸發(fā) Major Compaction;
- 硬盤容量不夠的情況下手動(dòng)觸發(fā) Major Compaction 刪除大量過期數(shù)據(jù)。
大多數(shù)都是基于第1點(diǎn)原因進(jìn)行手動(dòng)觸發(fā)。
2.3 選擇待合并的文件
Compaction 的核心就是選擇合適的文件進(jìn)行合并,因?yàn)楹喜⑽募拇笮∫约捌洚?dāng)前承載的 IO 直接決定了 Compaction 的效果。希望能找到這樣的文件:承載了大量 IO 請(qǐng)求但是文件大小很小,這樣 Compaction 本身不會(huì)消耗太多 IO,而且合并完成之后對(duì)讀的性能會(huì)有顯著提升?,F(xiàn)實(shí)情況可能大部分都不會(huì)是這樣。目前 HBase 提供了多種 Minor Compaction 文件選擇策略,通過配置項(xiàng) hbase.hstore.engine.class 設(shè)置。不管哪種策略,在執(zhí)行之前都要做對(duì)文件做一些篩選操作,排除不符合條件的文件,以減少 Compaction 的工作量,減少對(duì)讀寫的影響。
- 排除當(dāng)前正在執(zhí)行 Compaction 的文件;
- 如果一個(gè)文件所有的記錄都已經(jīng)過期,則直接將文件刪除;
- 排除過大的單個(gè)文件,如果文件大小大于 hbase.hstore.compaction.max.size(默認(rèn)Long最大值)則被排除,不排除會(huì)產(chǎn)生大量 IO 消耗。
排除完后剩下的文件稱為候選文件,接下來會(huì)再判斷是否滿足 Major Compaction 條件,如果滿足,就會(huì)選擇全部文件進(jìn)行合并。判斷條件有下面三條,只要滿足其中一條就會(huì)執(zhí)行 Major Compaction:
- 到了 Compaction 自動(dòng)執(zhí)行的周期且候選文件數(shù)小于 hbase.hstore.compaction
.max(默認(rèn)10),如果關(guān)掉自動(dòng) Major Compaction 執(zhí)行則不適用; - Store 中含有 Reference 文件,Reference 文件是 Split Region 產(chǎn)生的臨時(shí)引用文件,在 Compaction 過程中刪除;
- 用戶手動(dòng)執(zhí)行的 Major Compaction。
如果不滿足上述執(zhí)行條件,則為 Minor compaction。Minor Compaction 的策略有很多種,下面重點(diǎn)介紹
RationBasedCompactionPolicy(0.98之前的版本)、ExploringCompactionPolicy(0.98之后默認(rèn)的版本) 和 StripeCompactionPolicy 的執(zhí)行策略。
2.3.1 Compaction文件選擇策略的建模
所謂的 Compaction 文件選擇策略可以建模為下面的問題:
圖中的每個(gè)數(shù)字表示了文件的 Sequence ID,數(shù)字越大則文件越新,很有可能剛剛Flush而成,意味著文件 Size 也可能越小。這樣的文件在 Compaction 時(shí)優(yōu)先選擇,因此 Store下的 Storefile 文件會(huì)依據(jù) Sequence ID 從小到大排序,依次標(biāo)記為 f[0]、f[1]。。。。f[n-1],篩選策略就是要確定一個(gè)連續(xù)范圍 [Start, End] 內(nèi)的 Storefile 參與 Compaction。
Compaction 的目的是減少文件數(shù)量和刪除無用的數(shù)據(jù),優(yōu)化讀性能,Compaction 實(shí)現(xiàn)是將原文件的內(nèi)容重寫到新的文件,如果文件過大意味著 Compaction 的時(shí)間長,Compaction 過程中產(chǎn)生的 IO 放大越明顯,因此文件篩選的準(zhǔn)則是用最小的 IO 代價(jià)去合并減少最多的文件數(shù)。
Compaction 依賴兩個(gè)先決條件:
- 所有 StoreFile 按照順序進(jìn)行排序(此順序?yàn)椋豪衔募谇埃挛募诤螅?/li>
- 參與 Compaction 的文件必須是連續(xù)的。
2.3.2 RationBasedCompactionPolicy
基本思想就是選擇在固定 End 為最后一個(gè)文件的前提下(一般情況),從隊(duì)列頭開始滑動(dòng)尋找 Start,直到 Start 滿足下面的條件之一便停止掃描:
- 當(dāng)前文件大小
< 比當(dāng)前文件新的所有文件大小總和 * ratio,就是滿足公式 f[start].size <= ratio *
(f[start+1].size +.......+ f[end-1].size)。其中 ration 是一個(gè)可變的比例,高峰期 ration
為1.2,非高峰期 ration 為5,非高峰期允許合并更大的文件。
可以通過參數(shù) hbase.offpeak.start.hour 和 hbase.offpeak.end.hour 設(shè)置高峰期時(shí)間段。 - 當(dāng)前所剩候選文件數(shù) >= hbase.store.compaction.min(默認(rèn)為3),因?yàn)橐WC本次 Compaction 的時(shí)候文件個(gè)數(shù)要大于配置的 Compaction 最小值。
下面附上 RationBasedCompactionPolicy 的具體邏輯代碼。
/**
* @param candidates pre-filtrate
* @return filtered subset
* -- Default minor compaction selection algorithm:
* choose CompactSelection from candidates --
* First exclude bulk-load files if indicated in configuration.
* Start at the oldest file and stop when you find the first file that
* meets compaction criteria:
* (1) a recently-flushed, small file (i.e. <= minCompactSize)
* OR
* (2) within the compactRatio of sum(newer_files)
* Given normal skew, any newer files will also meet this criteria
* <p/>
* Additional Note:
* If fileSizes.size() >> maxFilesToCompact, we will recurse on
* compact(). Consider the oldest files first to avoid a
* situation where we always compact [end-threshold,end). Then, the
* last file becomes an aggregate of the previous compactions.
*
* normal skew:
*
* older ----> newer (increasing seqID)
* _
* | | _
* | | | | _
* --|-|- |-|- |-|---_-------_------- minCompactSize
* | | | | | | | | _ | |
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
ArrayList<StoreFile> applyCompactionPolicy(ArrayList<StoreFile> candidates,
boolean mayUseOffPeak, boolean mayBeStuck) throws IOException {
if (candidates.isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
// 獲取文件合并比例:取參數(shù)hbase.hstore.compaction.ratio,默認(rèn)為1.2
double ratio = comConf.getCompactionRatio();
if (mayUseOffPeak) {
// 取參數(shù)hbase.hstore.compaction.ratio.offpeak,默認(rèn)為5.0
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
}
// get store file sizes for incremental compacting selection.
final int countOfFiles = candidates.size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
StoreFile file = candidates.get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
// tooFar表示后移動(dòng)最大文件數(shù)位置的文件大小,也就是剛剛滿足達(dá)到最大文件數(shù)位置的那個(gè)文件,從i至tooFar數(shù)目為合并時(shí)允許的最大文件數(shù)
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
sumSize[i] = fileSizes[i]
+ ((i + 1 < countOfFiles) ? sumSize[i + 1] : 0)
- ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
}
// 倒序循環(huán),如果文件數(shù)目滿足最小合并時(shí)允許的最小文件數(shù),且該位置的文件大小大于合并時(shí)允許的文件最小大小與下一個(gè)文件窗口文件總大小乘以一定比例中的較大者,則繼續(xù);
// 實(shí)際上就是選擇出一個(gè)文件窗口內(nèi)能最小能滿足的文件大小的一組文件
while (countOfFiles - start >= comConf.getMinFilesToCompact() &&
fileSizes[start] > Math.max(comConf.getMinCompactSize(),
(long) (sumSize[start + 1] * ratio))) {
++start;
}
if (start < countOfFiles) {
LOG.info("Default compaction algorithm has selected " + (countOfFiles - start)
+ " files from " + countOfFiles + " candidates");
} else if (mayBeStuck) {
// We may be stuck. Compact the latest files if we can.保證最小文件數(shù)目的要求
int filesToLeave = candidates.size() - comConf.getMinFilesToCompact();
if (filesToLeave >= 0) {
start = filesToLeave;
}
}
candidates.subList(0, start).clear();
return candidates;
}
2.3.3 ExploringCompactionPolicy
該策略繼承自 RatioBasedCompactionPolicy,不同的是 Ration 策略在找到一個(gè)合適的文件集合之后就停止掃描了,而 Exploring 策略會(huì)把 Storefile 列表劃分成多個(gè)子隊(duì)列,從中找出一個(gè)最優(yōu)解參與 Compaction。最優(yōu)解可以理解為:待合并文件數(shù)最多或者待合并文件數(shù)相同的情況下文件較小,這樣有利于減少 Compaction 帶來的 IO 消耗。算法流程可以描述為:
- 從頭到尾遍歷文件,判斷所有符合條件的組合;
- 選擇組合內(nèi)文件數(shù) >= minFiles,且 <= maxFiles;
- 計(jì)算各組合文件的總大小 size,選擇組合 size <= MaxCompactSize,且 >= minCompactSize;
- 每個(gè)組合里面的每一個(gè)文件大小都必須滿足 FileSize(i) <= (sum(0,N,FileSize(_)) - FileSize(i)) * ration,意義在于去掉很大的文件,每次 Compaction 時(shí)應(yīng)該盡量合并一些大小較小的文件;
- 滿足以上 1-4 條件的組合里面選擇文件數(shù)最多,文件數(shù)一樣多時(shí)進(jìn)一步選擇文件總 size 最小的,目的在于盡可能多地合并文件并且 Compaction 帶來的 IO 壓力越小越好。
下面附上 ExploringCompactionPolicy 的具體邏輯代碼。
public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
final double currentRatio = mayUseOffPeak
? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
// Start off choosing nothing.
List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
long bestSize = 0;
long smallestSize = Long.MAX_VALUE;
int opts = 0, optsInRatio = 0, bestStart = -1; // for debug logging
// Consider every starting place. 從頭到尾遍歷文件
for (int start = 0; start < candidates.size(); start++) {
// Consider every different sub list permutation in between start and end with min files.
for (int currentEnd = start + minFiles - 1;
currentEnd < candidates.size(); currentEnd++) {
List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
// Sanity checks
if (potentialMatchFiles.size() < minFiles) {
continue;
}
if (potentialMatchFiles.size() > maxFiles) {
continue;
}
// Compute the total size of files that will
// have to be read if this set of files is compacted. 計(jì)算文件大小
long size = getTotalStoreSize(potentialMatchFiles);
// Store the smallest set of files. This stored set of files will be used
// if it looks like the algorithm is stuck. 總size最小的
if (mightBeStuck && size < smallestSize) {
smallest = potentialMatchFiles;
smallestSize = size;
}
if (size > comConf.getMaxCompactSize(mayUseOffPeak)) {
continue;
}
++opts;
if (size >= comConf.getMinCompactSize()
&& !filesInRatio(potentialMatchFiles, currentRatio)) {
continue;
}
++optsInRatio;
if (isBetterSelection(bestSelection, bestSize, potentialMatchFiles, size, mightBeStuck)) {
bestSelection = potentialMatchFiles;
bestSize = size;
bestStart = start;
}
}
}
if (bestSelection.size() == 0 && mightBeStuck) {
LOG.debug("Exploring compaction algorithm has selected " + smallest.size()
+ " files of size "+ smallestSize + " because the store might be stuck");
return new ArrayList<StoreFile>(smallest);
}
LOG.debug("Exploring compaction algorithm has selected " + bestSelection.size()
+ " files of size " + bestSize + " starting at candidate #" + bestStart +
" after considering " + opts + " permutations with " + optsInRatio + " in ratio");
return new ArrayList<StoreFile>(bestSelection);
}
2.3.4 StripeCompactionPolicy
Stripe Compaction (HBASE-7667)還是為了減少 Major Compaction 的壓力而提出的。其思想是:減少 Major Compaction 壓力最直接辦法是減少 Region 的大小,最好整個(gè)集群都是由很多小 Region 組成,這樣參與 Compaction 的文件總大小就必然不會(huì)太大??墒?Region 設(shè)置過小會(huì)導(dǎo)致 Region 數(shù)量很多,這一方面會(huì)導(dǎo)致 HBase 管理 Region 的開銷很大,另一方面 Region 過多也要求 HBase 能夠分配更多的內(nèi)存作為 Memstore 使用,否則有可能導(dǎo)致整個(gè) RegionServer 級(jí)別的 Flush,進(jìn)而引起長時(shí)間的寫阻塞。因此單純地通過將 Region 大小設(shè)置過小并不能本質(zhì)解決問題。
(1) Level Compaction
社區(qū)開發(fā)者借鑒了 Leveldb 的 Compaction 策略 Level Compaction。Level Compaction 設(shè)計(jì)思路是將 Store 中的所有數(shù)據(jù)劃分為很多層,每一層都會(huì)有一部分?jǐn)?shù)據(jù),如下圖所示:
數(shù)據(jù)組織形式不再按照時(shí)間前后進(jìn)行組織,而是按照 KeyRange 進(jìn)行組織,每個(gè) KeyRange 中會(huì)包含多個(gè)文件,這些文件所有數(shù)據(jù)的 Key 必須分布在同一個(gè)范圍。比如 Key 分布在 Key0~KeyN 之間的所有數(shù)據(jù)都會(huì)落在第一個(gè) KeyRange 區(qū)間的文件中,Key 分布在 KeyN+1~KeyT 之間的所有數(shù)據(jù)會(huì)分布在第二個(gè)區(qū)間的文件中,以此類推。
整個(gè)數(shù)據(jù)體系會(huì)被劃分為很多層,最上層(Level 0)表示最新數(shù)據(jù),最下層(Level 6)表示最舊數(shù)據(jù)。每一層都由大量 KeyRange 塊組成(Level 0除外),KeyRange 之間沒有 Key 重合。而且層數(shù)越大,對(duì)應(yīng)層的每個(gè) KeyRange 塊大小越大,下層 KeyRange 塊大小是上一層大小的10倍。圖中 Range 顏色越深,對(duì)應(yīng)的 Range 塊越大。
數(shù)據(jù)從 Memstore 中 Flush 之后,會(huì)首先落入 Level 0,此時(shí)落入 Level 0 的數(shù)據(jù)可能包含所有可能的 Key。此時(shí)如果需要執(zhí)行 Compaction,只需要將 Level 0 中的 KV 一個(gè)一個(gè)讀出來,然后按照 Key 的分布分別插入 Level 1 中對(duì)應(yīng) KeyRange 塊的文件中,如果此時(shí)剛好 Level 1 中的某個(gè)KeyRange 塊大小超過了一定閾值,就會(huì)繼續(xù)往下一層合并。
Level Compaction 依然會(huì)有 Major Compaction 的概念,發(fā)生 Major Compaction 只需要將 Range 塊內(nèi)的文件執(zhí)行合并就可以,而不需要合并整個(gè) Region 內(nèi)的數(shù)據(jù)文件。
可見,這種 Compaction 在合并的過程中,從上到下只需要部分文件參與,而不需要對(duì)所有文件執(zhí)行 Compaction 操作。另外,Level Compaction 還有另外一個(gè)好處,對(duì)于很多只讀最近寫入數(shù)據(jù)’的業(yè)務(wù)來說,大部分讀請(qǐng)求都會(huì)落到 Level 0,這樣可以使用 SSD 作為上層 Level 存儲(chǔ)介質(zhì),進(jìn)一步優(yōu)化讀。然而,這種 Compaction 因?yàn)?Level 層數(shù)太多導(dǎo)致 Compaction 的次數(shù)明顯增多,經(jīng)過測試,發(fā)現(xiàn)這種 Compaction 并沒有對(duì) IO 利用率有任何提升。
(2)Stripe Compaction
雖然原生的 Level Compaction 并不適用于 HBase,但是這種 Compaction 的思想?yún)s激發(fā)了HBase 研發(fā)者的靈感,再結(jié)合之前提到的小 Region 策略,就形成了 Stripe Compaction。
同 Level Compaction 相同,Stripe Compaction 會(huì)將整個(gè) Store 中的文件按照 Key 劃分為多個(gè) Range,稱為 Stripe,Stripe 的數(shù)量可以通過參數(shù)設(shè)定,相鄰的 Stripe 之間 Key 不會(huì)重合。Stripe 類似于 Sub-Region 的概念,即將一個(gè)大 Region 切分成了很多小的 Sub-Region。
隨著數(shù)據(jù)寫入,Memstore 執(zhí)行 Flush 之后形成 HFile,這些 HFile 并不會(huì)馬上寫入對(duì)應(yīng)的 Stripe,而是放到一個(gè)稱為 L0 的地方,用戶可以配置 L0 可以放置 HFile 的數(shù)量。一旦 L0 放置的文件數(shù)超過設(shè)定值,系統(tǒng)就會(huì)將這些 HFile 寫入對(duì)應(yīng)的 Stripe:首先讀出 HFile 的 KVs,再根據(jù)每個(gè) KV 的 Key 定位到具體的 Stripe,將該 KV 插入對(duì)應(yīng) Stripe 的文件中即可,如圖6所示。由于 Stripe 是個(gè)小的 Region,所以 Compaction 并不會(huì)太多消耗系統(tǒng)資源。另外,讀取數(shù)據(jù)時(shí),根據(jù)對(duì)應(yīng)的 Key 查找到對(duì)應(yīng)的 Stripe,然后在 Stripe 內(nèi)部執(zhí)行查找,因?yàn)?Stripe 內(nèi)數(shù)據(jù)量相對(duì)很小,所以一定程度上也可以提升數(shù)據(jù)查找性能。
2.4 執(zhí)行 Compaction 操作
挑選好待合并文件后,就是執(zhí)行真正的合并。合并流程主要分為以下幾步:
- 按順序讀出待合并所有 HFile 文件的 KV,并順序?qū)懙轿挥?/tmp 目錄下的臨時(shí)文件中;
- 將臨時(shí)文件移動(dòng)到對(duì)應(yīng) Region 的正式數(shù)據(jù)目錄中;
- 將 Compaction 的輸入文件路徑和輸出文件路徑封裝為 KV 寫入 WAL 日志,并打上 Compaction 標(biāo)記,最后強(qiáng)制執(zhí)行 sync;
- 將對(duì)應(yīng) Region 數(shù)據(jù)目錄下的 Compaction 的輸入文件全部刪除。
HBase對(duì)整個(gè) Compaction 的考慮是非常全面的,上述4個(gè)步驟的每一步發(fā)生錯(cuò)誤,都具有很強(qiáng)的容錯(cuò)性和冪等性(執(zhí)行一次和多次的結(jié)果相同)。
- 如果 RS 在步驟2或步驟2之前發(fā)生異常,本次 Compaction 會(huì)被認(rèn)為失敗,如果繼續(xù)進(jìn)行同樣的 Compaction,上次異常對(duì)接下來的 Compaction不會(huì)有任何影響,也不會(huì)對(duì)讀寫有影響,唯一的影響就是多了一份冗余的數(shù)據(jù);
- 如果 RS 在步驟2之后、步驟3或步驟3之前發(fā)生異常,也僅僅會(huì)多一份冗余數(shù)據(jù);
- 如果在步驟3之后、步驟4之前發(fā)生異常,則 RS 在重新打開 Region 之后就會(huì)從 WAL 中看到上次 Compaction 的日志。因?yàn)榇藭r(shí)輸入文件和輸出文件已經(jīng)持久化到 HDFS,因此只需要根據(jù) WAL 日志移除掉 Compaction 的輸入文件即可。
下面附上 Store 的 compact 方法。
public List<StoreFile> compact(CompactionContext compaction,
CompactionThroughputController throughputController, User user) throws IOException {
assert compaction != null;
List<StoreFile> sfs = null;
CompactionRequest cr = compaction.getRequest();
try {
// Do all sanity checking in here if we have a valid CompactionRequest
// because we need to clean up after it on the way out in a finally
// block below
long compactionStartTime = EnvironmentEdgeManager.currentTime();
assert compaction.hasSelection();
Collection<StoreFile> filesToCompact = cr.getFiles();
assert !filesToCompact.isEmpty();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
// 再次檢查
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
}
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this + " of " + this.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + fs.getTempDir() + ", totalSize="
+ TraditionalBinaryPrefix.long2String(cr.getSize(), "", 1));
// Commence the compaction. 開始compact,newFiles是合并后的新文件
List<Path> newFiles = compaction.compact(throughputController, user);
long outputBytes = 0L;
// TODO: get rid of this!
if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
LOG.warn("hbase.hstore.compaction.complete is set to false");
sfs = new ArrayList<StoreFile>(newFiles.size());
final boolean evictOnClose =
cacheConf != null? cacheConf.shouldEvictOnClose(): true;
for (Path newFile : newFiles) {
// Create storefile around what we wrote with a reader on it.
StoreFile sf = createStoreFileAndReader(newFile);
sf.closeReader(evictOnClose);
sfs.add(sf);
}
return sfs;
}
// Do the steps necessary to complete the compaction.
// 將newFiles移動(dòng)到新的位置,返回StoreFile列表
sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
// 在WAL中寫入Compaction記錄
writeCompactionWalRecord(filesToCompact, sfs);
// 將新生成的StoreFile列表替換到StoreFileManager的storefile中
replaceStoreFiles(filesToCompact, sfs);
// 根據(jù)compact類型,累加相應(yīng)計(jì)數(shù)器
if (cr.isMajor()) {
majorCompactedCellsCount += getCompactionProgress().totalCompactingKVs;
majorCompactedCellsSize += getCompactionProgress().totalCompactedSize;
} else {
compactedCellsCount += getCompactionProgress().totalCompactingKVs;
compactedCellsSize += getCompactionProgress().totalCompactedSize;
}
for (StoreFile sf : sfs) {
outputBytes += sf.getReader().length();
}
// At this point the store will use new files for all new scanners.
// 歸檔舊文件
completeCompaction(filesToCompact, true); // Archive old files & update store size.
long now = EnvironmentEdgeManager.currentTime();
if (region.getRegionServerServices() != null
&& region.getRegionServerServices().getMetrics() != null) {
region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
outputBytes);
}
// 記錄日志信息并返回
logCompactionEndMessage(cr, sfs, now, compactionStartTime);
return sfs;
} finally {
finishCompactionRequest(cr);
}
}
三、Compaction 的限流
上述幾種策略都是根據(jù)不同的業(yè)務(wù)場景設(shè)置對(duì)應(yīng)的文件選擇策略,核心都是減少參與 Compaction 的文件數(shù),縮短整個(gè) Compaction 執(zhí)行的時(shí)間,間接降低 Compaction 的 IO 放大效應(yīng),減少對(duì)業(yè)務(wù)讀寫的延遲影響。但是,如果不對(duì) Compaction 執(zhí)行階段的讀寫吞吐量進(jìn)行限制的話也會(huì)引起短時(shí)間大量系統(tǒng)資源消耗,影響用戶讀寫延遲。HBase 通過限制 Compaction 速度 和 Compaction 的帶寬來對(duì) Compaction 進(jìn)行限流。
3.1 Limit Compaction Speed
該優(yōu)化方案通過感知 Compaction 的壓力情況自動(dòng)調(diào)節(jié)系統(tǒng)的 Compaction 吞吐量,在壓力大的時(shí)候降低合并吞吐量,壓力小的時(shí)候增加合并吞吐量。
基本原理為:
在正常情況下,用戶需要設(shè)置吞吐量下限參數(shù) hbase.hstore.compaction.throughput.lower.bound (默認(rèn)10MB/sec) 和上限參數(shù) hbase.hstore.compaction.throughput.higher.bound (默認(rèn)20MB/sec),實(shí)際會(huì)工作時(shí)吞吐量為 lower + (higer – lower) * ratio,其中 ratio 是一個(gè)取值范圍在0到1的小數(shù),它由當(dāng)前 Store 中待參與 Compation 的 HFile 數(shù)量決定,數(shù)量越多,ratio 越小,反之越大。
如果當(dāng)前 Store中 HFile 的數(shù)量太多,并且超過了參數(shù) blockingFileCount,此時(shí)所有寫請(qǐng)求就會(huì)阻塞等待 Compaction 完成,這種場景下上述限制會(huì)自動(dòng)失效。
3.2 Compaction BandWidth Limit
原理其實(shí)和 Limit Compaction Speed 思路基本一致,它主要涉及兩個(gè)參數(shù):compactBwLimit 和 numOfFilesDisableCompactLimit。
作用分別如下:
- compactBwLimit:一次 Compaction 的最大帶寬使用量,如果 Compaction 所使用的帶寬高于該值,就會(huì)強(qiáng)制令其 sleep 一段時(shí)間。
- numOfFilesDisableCompactLimit:很顯然,在寫請(qǐng)求非常大的情況下,限制 Compaction 帶寬的使用量必然會(huì)導(dǎo)致 HFile 堆積,進(jìn)而會(huì)影響到讀請(qǐng)求響應(yīng)延時(shí)。因此該值意義就很明顯,一旦 Store 中 HFile 數(shù)量超過該設(shè)定值,帶寬限制就會(huì)失效。
// 該方法進(jìn)行Compaction的動(dòng)態(tài)限制
private void tune(double compactionPressure) {
double maxThroughputToSet;
// 壓力大于1,最大限速不受限制
if (compactionPressure > 1.0) {
// set to unlimited if some stores already reach the blocking store file count
maxThroughputToSet = Double.MAX_VALUE;
// 空閑時(shí)間,最大限速為設(shè)置的Compaction最大吞吐量
} else if (offPeakHours.isOffPeakHour()) {
maxThroughputToSet = maxThroughputOffpeak;
} else {
// compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
// calculate the throughput limitation.
// lower + (higher - lower) * ratio
maxThroughputToSet =
maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
* compactionPressure;
}
if (LOG.isDebugEnabled()) {
LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
+ throughputDesc(maxThroughputToSet));
}
this.maxThroughput = maxThroughputToSet;
}
再來看下獲取 R S的 Compaction 壓力的 getCompactionPressure 方法,其實(shí)就是遍歷每個(gè) Region 的每個(gè) Store,取壓力最大的。
@Override
public double getCompactionPressure() {
double max = 0;
for (Region region : onlineRegions.values()) {
for (Store store : region.getStores()) {
double normCount = store.getCompactionPressure();
if (normCount > max) {
max = normCount;
}
}
}
return max;
}
@Override
public double getCompactionPressure() {
int storefileCount = getStorefileCount();
int minFilesToCompact = comConf.getMinFilesToCompact();
if (storefileCount <= minFilesToCompact) {
return 0.0;
}
return (double) (storefileCount - minFilesToCompact) / (blockingFileCount - minFilesToCompact);
}
HBase 的限流方案通過感知 Compaction 的壓力情況自動(dòng)調(diào)節(jié)系統(tǒng)的 Compaction 吞吐量,在壓力大的時(shí)候降低合并吞吐量,壓力小的時(shí)候增加合并吞吐量。
基本原理為:
在正常情況下,用戶需要設(shè)置吞吐量下限參數(shù) hbase.hstore.compaction.throughput.lower.bound (默認(rèn)10MB/sec) 和上限參數(shù) hbase.hstore.compaction.throughput.higher.bound(默認(rèn)20MB/sec),而實(shí)際會(huì)工作在吞吐量為 lower + (higer – lower) * ratio的情況下,其中 ratio 是一個(gè)取值范圍在0到1的小數(shù),它由當(dāng)前 Store 中待參與 Compation 的 HFile 數(shù)量決定,數(shù)量越多,ratio 越小,反之越大。
如果當(dāng)前 Store中 HFile 的數(shù)量太多,并且超過了 blockingFileCount 的值,該值由參數(shù) hbase.hstore.blockingStoreFiles 配置,此時(shí)所有寫請(qǐng)求就會(huì)阻塞等待 Compaction 完成,這種場景下,上述限制會(huì)自動(dòng)失效。
四、線上遇到的問題及調(diào)優(yōu)方法
由于線上環(huán)境的復(fù)雜性,對(duì) Compaction 模塊做了較多的優(yōu)化,下面選取兩個(gè)典型案例進(jìn)行說明。
4.1 關(guān)閉了自動(dòng)觸發(fā) Major Compaction,但是監(jiān)控中 Major Compaction 隊(duì)列仍然有值進(jìn)而影響讀寫性能
線上集群都是關(guān)閉自動(dòng)觸發(fā) Major Compaction 的功能,在業(yè)務(wù)低峰期由定時(shí)任務(wù)手動(dòng)觸發(fā) Major Compaction。在某次故障中,業(yè)務(wù)反饋?zhàn)x寫性能在非執(zhí)行 Major Compaction 的時(shí)段延遲比較大。查看監(jiān)控發(fā)現(xiàn),監(jiān)控中的 Major Compaction 隊(duì)列的值比較大。
下面是當(dāng)時(shí)的 Major Compaction 隊(duì)列長度和讀寫調(diào)用平均耗時(shí)的監(jiān)控圖,從圖中可以很明顯地看出下面幾點(diǎn):
- Major Compaction 的隊(duì)列長度比較大的時(shí)候,讀寫的耗時(shí)也比較大;
- Major Compaction 的隊(duì)列長度跟入流量有關(guān)系,入流量比較大的時(shí)候,Major Compaction 的隊(duì)列長度就比較大。
這里就產(chǎn)生了疑問,關(guān)閉了自動(dòng) Major Compaction,是什么條件觸發(fā)了 Major Compaction ?
帶著上面的疑問,我們從源碼的層面對(duì)問題進(jìn)行分析。
1)首先查看了 Major Compaction 隊(duì)列長度這個(gè)指標(biāo)的含義,該指標(biāo)表示 longCompaction 線程池的工作隊(duì)列中等待的個(gè)數(shù)。
@Override
public int getLargeCompactionQueueSize() {
//The thread could be zero. if so assume there is no queue.
if (this.regionServer.compactSplitThread == null) {
return 0;
}
return this.regionServer.compactSplitThread.getLargeCompactionQueueSize();
}
public int getLargeCompactionQueueSize() {
return longCompactions.getQueue().size();
}
2)查看 HBase 日志,發(fā)現(xiàn)確實(shí)有做 Major Compaction 的行為。
3)進(jìn)一步排查什么時(shí)候會(huì)去調(diào)用 long Compaction 的線程池,查看 Compaction 選擇 long Compaction 和 small Compaction 隊(duì)列相關(guān)的源碼。
/**
* @param candidateFiles candidate files, ordered from oldest to newest. All files in store.
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
final List<StoreFile> filesCompacting, final boolean isUserCompaction,
final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
// Stuck and not compacting enough (estimate). It is not guaranteed that we will be
// able to compact more if stuck and compacting, because ratio policy excludes some
// non-compacting files from consideration during compaction (see getCurrentEligibleFiles).
int futureFiles = filesCompacting.isEmpty() ? 0 : 1;
boolean mayBeStuck = (candidateFiles.size() - filesCompacting.size() + futureFiles)
>= storeConfigInfo.getBlockingFileCount();
candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
LOG.debug("Selecting compaction from " + candidateFiles.size() + " store files, " +
filesCompacting.size() + " compacting, " + candidateSelection.size() +
" eligible, " + storeConfigInfo.getBlockingFileCount() + " blocking");
// If we can't have all files, we cannot do major anyway
boolean isAllFiles = candidateFiles.size() == candidateSelection.size();
if (!(forceMajor && isAllFiles)) {
// 過濾掉大文件
candidateSelection = skipLargeFiles(candidateSelection, mayUseOffPeak);
isAllFiles = candidateFiles.size() == candidateSelection.size();
}
...
}
其中 skipLargeFiles 方法對(duì)待合并文件進(jìn)行過濾,去掉大文件,該閾值是由
maxCompactSize =
conf.getLong(HBASE_HSTORE_COMPACTION_MAX_SIZE_KEY, Long.MAX_VALUE)配置,默認(rèn)是 Long.MAX_VALUE。
/**
* @param candidates pre-filtrate
* @return filtered subset
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates,
boolean mayUseOffpeak) {
int pos = 0;
while (pos < candidates.size() && !candidates.get(pos).isReference()
&& (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize(mayUseOffpeak))) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
candidates.subList(0, pos).clear();
}
return candidates;
}
之后再通過待合并文件的大小來選擇 long Compaction 線程池還是 small Compaction 的線程池。
@Override
public boolean throttleCompaction(long compactionSize) {
return compactionSize > comConf.getThrottlePoint();
}
這個(gè)閾值的計(jì)算方法如下,默認(rèn)是2.5G,就是說如果待合并的文件大小大于2.5G,就會(huì)放到 long Compaction 的線程池中去執(zhí)行。
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
4)查看 ReigonServer 該時(shí)間段的日志,發(fā)現(xiàn)有大量大于 2.5G 的文件在 Compaction,這就解釋了為什么RS日志中該時(shí)間段并沒有做 Major Compaction 的日志但是 long Compaction 隊(duì)列有值的問題。
至此,問題原因就找到了,入流量的增加導(dǎo)致單個(gè) HFile 文件比較大,F(xiàn)lush 之后做 Minor Compaction 的時(shí)候如果待合并文件總大小大于2.5G(默認(rèn)值)的時(shí)候,會(huì)將此次 Minor Compaction 放入到 long Compaction 的線程池中執(zhí)行。待合并的文件比較大導(dǎo)致磁盤 IO 消耗比較高,進(jìn)而影響到讀寫性能。
5)措施
我們調(diào)整了 Compaction 的參數(shù)
hbase.hstore.compaction.max.size 將該值修改為2G,表示在 Minor Compaction 的時(shí)候大于 2G 的 HFile 將會(huì)被排除,等到業(yè)務(wù)低峰期的時(shí)候再對(duì)大于2G的文件合并,減少 Compaction 對(duì)磁盤 IO 的影響。
6)效果
調(diào)整之后,在非手動(dòng)觸發(fā) Major Compaction 期間就很少有占用 long Compaction 線程池的情況出現(xiàn)了,讀寫平均耗時(shí)也降到了50ms以下。
4.2 定時(shí)手動(dòng)觸發(fā)的 Major Compation 任務(wù)執(zhí)行時(shí)間過長
業(yè)務(wù)反饋某張表的讀寫性能最近有點(diǎn)慢,通過監(jiān)控查看到該表的存儲(chǔ)一直在增長,存儲(chǔ)單副本達(dá)到了578TB。查看表信息,該表的TTL設(shè)置的15天,該表的輸入流量也沒有明顯的增加。監(jiān)控圖如下:
于是懷疑每天的 Compaction 任務(wù)沒有做完,導(dǎo)致過期數(shù)據(jù)未能完全刪除。查看線上配置,Major Compaction 的線程池大小是1,該表的數(shù)據(jù)量又比較大。于是調(diào)整了 Compaction 線程池的大小為10,并且設(shè)置了集群的空閑時(shí)間 hbase.offpeak.start.hour 與 hbase.offpeak
.end.hour,在這個(gè)時(shí)間段內(nèi) Compaction 的時(shí)候可以增加待合并文件大小。調(diào)整完成后,通過監(jiān)控查看 Compaction 的效果對(duì)比圖,可以看到 Compaction 的工作量明顯增大了。
查看該表所占存儲(chǔ)的大小,可以看到該表已經(jīng)從 578T 下降到了 349T,下降幅度達(dá)到了40%。業(yè)務(wù)的讀寫耗時(shí)也恢復(fù)正常。Compaction 的參數(shù)比較重要, 在調(diào)整的時(shí)候需要考慮對(duì)業(yè)務(wù)是否有影響,調(diào)整之后要多觀察業(yè)務(wù)的耗時(shí)情況,可以循序漸進(jìn)的對(duì)參數(shù)進(jìn)行調(diào)整。
五、Compaction相關(guān)參數(shù)介紹
下面附上 Compaction 相關(guān)的參數(shù),線上環(huán)境可以根據(jù)實(shí)際情況進(jìn)行調(diào)整。
六、總結(jié)
Compaction 是 HBase 提升讀寫性能非常重要的手段,而 Compaction 的邏輯又比較復(fù)雜,并且使用不當(dāng),會(huì)導(dǎo)致寫放大,進(jìn)而會(huì)影響到正常的讀寫請(qǐng)求。本文重點(diǎn)介紹了 Compaction 的觸發(fā)機(jī)制、Compaction 發(fā)展過程中出現(xiàn)的多種合并策略、待合并文件的選擇算法、 Compaction 的限流以及 Compaction 相關(guān)的參數(shù)做了詳細(xì)的描述,最后選擇線上的2個(gè)案例,介紹了具體的分析思路和調(diào)優(yōu)的方法,經(jīng)調(diào)優(yōu)后,性能得到了成倍的提升,保障了業(yè)務(wù)高效、穩(wěn)定的運(yùn)行。