Caffeine源碼解讀-緩存過期淘汰相關(guān)算法
本文轉(zhuǎn)載自微信公眾號「肌肉碼農(nóng)」,作者鄒學(xué)。轉(zhuǎn)載本文請聯(lián)系肌肉碼農(nóng)公眾號。
引子:
上一篇通過使用示例講解了Caffeine的框架部分 Caffeine源碼解讀-架構(gòu)篇,這一篇繼續(xù)通過示例講解緩存過期相關(guān)算法部分,來看看它與guava cache有什么不一樣的設(shè)計。
使用示例:
繼續(xù)使用相同的例子,不過是從PUT、GET開始說起,了解了它的工作流程自然會知道它的緩存過期邏輯:
- //初始化
- Cache<String, String> cache = Caffeine.newBuilder().maximumSize(100)
- .expireAfterWrite(1, TimeUnit.SECONDS).build();
- //PUT
- cache.put("a", "b");
- //GET
- System.out.println(cache.getIfPresent("a"));
guava是在put時候進(jìn)行過期淘汰,那Caffeine也會是一樣嗎?
put/get:
大部分情況創(chuàng)建的是有界cache,put方法會進(jìn)入BoundedLocalCache的這個方法中:put(K key, V value, boolean notifyWriter, boolean onlyIfAbsent),當(dāng)Cache之前不包含該元素時會執(zhí)行以下的代碼:
- //從cache中取出之前的值
- Node<K, V> prior = data.get(nodeFactory.newLookupKey(key));
- if (prior == null) {
- //prior =null 表示之前元素不存在
- //因為不存在該元素,所以需要根據(jù)key、value創(chuàng)建一個新的node
- //這里有null的判斷是這部分邏輯外層是個循環(huán),用循環(huán)的原因是后面的異步操作需要保證成功。
- if (node == null) {
- //新建node
- node = nodeFactory.newNode(key, keyReferenceQueue(),
- value, valueReferenceQueue(), newWeight, now);
- //設(shè)置Node的初始時間,用于過期策略
- setVariableTime(node, expireAfterCreate(key, value, now));
- setAccessTime(node, now);
- setWriteTime(node, now);
- }
- if (notifyWriter && hasWriter()) {
- ............................
- } else { //如果還未完成該key的寫
- //將新建的node寫入到data中
- prior = data.putIfAbsent(node.getKeyReference(), node);
- if (prior == null) {
- //當(dāng)之前不存在該值時,執(zhí)行afterWrite操作,并執(zhí)行AddTask任務(wù)
- afterWrite(new AddTask(node, newWeight));
- return null;
- }
- }
- }
因為它的保持一致性代碼比較多,所以只需先讀中文注釋部分,從代碼可以看出寫緩存操作還是比較簡單:new一個node然后寫到data中去,最后觸發(fā)afterWrite后返回null.
最后一步afterWrite方法做了什么?
首先看一下AddTask是什么?
- final class AddTask implements Runnable {
- final Node<K, V> node;
- final int weight;
- AddTask(Node<K, V> node, int weight) {
- this.weight = weight;
- this.node = node;
- }
- .................................
- }
AddTask實現(xiàn)了runnable接口,也就是說完成add操作后,會異步執(zhí)行一個add任務(wù),這個就是它與guava最大的不同點-異步, 我們先把同步部分看完,畢竟它還是put操作返回null前要執(zhí)行這部分的,afterWrite方法如下:
- void afterWrite(Runnable task) {
- if (buffersWrites()) {
- for (int i = 0; i < WRITE_BUFFER_RETRIES; i++) {
- if (writeBuffer().offer(task)) {
- //觸發(fā)寫后調(diào)度
- scheduleAfterWrite();
- return;
- }
- scheduleDrainBuffers();
- }
- ..........
- } else {
- scheduleAfterWrite();
- }
- }
從上面代碼來看,該方法觸發(fā)了寫后調(diào)度,寫后調(diào)度最終后異步執(zhí)行drainBuffersTask,這個任務(wù)會整理cache中各node狀態(tài)并做出處理:
- voidscheduleDrainBuffers() {
- if (drainStatus() >= PROCESSING_TO_IDLE) {
- return;
- }
- if (evictionLock.tryLock()) {
- try {
- //獲得狀態(tài)
- int drainStatus = drainStatus();
- //只允許存在三種狀態(tài)
- if (drainStatus >= PROCESSING_TO_IDLE) {
- return;
- }
- lazySetDrainStatus(PROCESSING_TO_IDLE);
- //異步調(diào)用內(nèi)存調(diào)整任務(wù) drainBuffersTask
- executor().execute(drainBuffersTask);
- } catch (Throwable t) {
- logger.log(Level.WARNING, "Exception thrown when submitting maintenance task", t);
- maintenance(/* ignored */ null);
- } finally {
- evictionLock.unlock();
- }
- }
- }
從上面步驟來看,put流程是這樣的:先將元素寫入到cache,然后觸發(fā)調(diào)度,調(diào)度會根據(jù)閑忙狀態(tài)判斷是否執(zhí)行異步drainBuffersTask。
get的流程與put之差不多,因為get會改變key的使用情況影響過期結(jié)果,所以最終也可能會觸發(fā)drainBuffersTask執(zhí)行maintenance方法來清理緩存:
- void maintenance(@Nullable Runnable task) {
- lazySetDrainStatus(PROCESSING_TO_IDLE);
- try {
- //排出讀緩存
- drainReadBuffer();
- //排出寫緩存
- drainWriteBuffer();
- if (task != null) {
- task.run();
- }
- //排出key引用
- drainKeyReferences();
- //排出value引用
- drainValueReferences();
- //過期entry
- expireEntries();
- //淘汰entry
- evictEntries();
- } finally {
- if ((drainStatus() != PROCESSING_TO_IDLE) || !casDrainStatus(PROCESSING_TO_IDLE, IDLE)) {
- lazySetDrainStatus(REQUIRED);
- }
- }
- }
數(shù)據(jù)結(jié)構(gòu)
上一篇文章有講到Caffeine使用一個ConcurrencyHashMap來保存所有數(shù)據(jù),而這一節(jié)主要講過期淘汰策略所采用的數(shù)據(jù)結(jié)構(gòu),其中寫過期是使用writeOrderDeque,這個比較簡單無需多說,而讀過期相對復(fù)雜很多,使用W-TinyLFU的結(jié)構(gòu)與算法。
網(wǎng)絡(luò)上有很多文章介紹W-TinyLFU結(jié)構(gòu)的,大家可以去查一下,這里主要是從源碼來分析,總的來說它使用了三個雙端隊列:accessOrderEdenDeque,accessOrderProbationDeque,accessOrderProtectedDeque,使用雙端隊列的原因是支持LRU算法比較方便。
accessOrderEdenDeque屬于eden區(qū),緩存1%的數(shù)據(jù),其余的99%緩存在main區(qū)。
accessOrderProbationDeque屬于main區(qū),緩存main內(nèi)數(shù)據(jù)的20%,這部分是屬于冷數(shù)據(jù),即將補淘汰。
accessOrderProtectedDeque屬于main區(qū),緩存main內(nèi)數(shù)據(jù)的20%,這部分是屬于熱數(shù)據(jù),是整個緩存的主存區(qū)。
我們先看一下淘汰方法入口:
- void evictEntries() {
- if (!evicts()) {
- return;
- }
- //先從edn區(qū)淘汰
- int candidates = evictFromEden();
- //eden淘汰后的數(shù)據(jù)進(jìn)入main區(qū),然后再從main區(qū)淘汰
- evictFromMain(candidates);
- }
accessOrderEdenDeque對應(yīng)W-TinyLFU的W(window),這里保存的是最新寫入數(shù)據(jù)的引用,它使用LRU淘汰,這里面的數(shù)據(jù)主要是應(yīng)對突發(fā)流量的問題,淘汰后的數(shù)據(jù)進(jìn)入accessOrderProbationDeque.代碼如下:
- int evictFromEden() {
- int candidates = 0;
- Node<K, V> node = accessOrderEdenDeque().peek();
- while (edenWeightedSize() > edenMaximum()) {
- // The pending operations will adjust the size to reflect the correct weight
- if (node == null) {
- break;
- }
- Node<K, V> next = node.getNextInAccessOrder();
- if (node.getWeight() != 0) {
- node.makeMainProbation();
- //先從eden區(qū)移除
- accessOrderEdenDeque().remove(node);
- //移除的數(shù)據(jù)加入到main區(qū)的probation隊列
- accessOrderProbationDeque().add(node);
- candidates++;
- lazySetEdenWeightedSize(edenWeightedSize() - node.getPolicyWeight());
- }
- node = next;
- }
- return candidates;
- }
數(shù)據(jù)進(jìn)入probation隊列后,繼續(xù)執(zhí)行以下代碼:
- void evictFromMain(int candidates) {
- int victimQueue = PROBATION;
- Node<K, V> victim = accessOrderProbationDeque().peekFirst();
- Node<K, V> candidate = accessOrderProbationDeque().peekLast();
- while (weightedSize() > maximum()) {
- // Stop trying to evict candidates and always prefer the victim
- if (candidates == 0) {
- candidate = null;
- }
- // Try evicting from the protected and eden queues
- if ((candidate == null) && (victim == null)) {
- if (victimQueue == PROBATION) {
- victim = accessOrderProtectedDeque().peekFirst();
- victimQueue = PROTECTED;
- continue;
- } else if (victimQueue == PROTECTED) {
- victim = accessOrderEdenDeque().peekFirst();
- victimQueue = EDEN;
- continue;
- }
- // The pending operations will adjust the size to reflect the correct weight
- break;
- }
- // Skip over entries with zero weight
- if ((victim != null) && (victim.getPolicyWeight() == 0)) {
- victim = victim.getNextInAccessOrder();
- continue;
- } else if ((candidate != null) && (candidate.getPolicyWeight() == 0)) {
- candidate = candidate.getPreviousInAccessOrder();
- candidates--;
- continue;
- }
- // Evict immediately if only one of the entries is present
- if (victim == null) {
- candidates--;
- Node<K, V> evict = candidate;
- candidate = candidate.getPreviousInAccessOrder();
- evictEntry(evict, RemovalCause.SIZE, 0L);
- continue;
- } else if (candidate == null) {
- Node<K, V> evict = victim;
- victim = victim.getNextInAccessOrder();
- evictEntry(evict, RemovalCause.SIZE, 0L);
- continue;
- }
- // Evict immediately if an entry was collected
- K victimKey = victim.getKey();
- K candidateKey = candidate.getKey();
- if (victimKey == null) {
- Node<K, V> evict = victim;
- victim = victim.getNextInAccessOrder();
- evictEntry(evict, RemovalCause.COLLECTED, 0L);
- continue;
- } else if (candidateKey == null) {
- candidates--;
- Node<K, V> evict = candidate;
- candidate = candidate.getPreviousInAccessOrder();
- evictEntry(evict, RemovalCause.COLLECTED, 0L);
- continue;
- }
- // Evict immediately if the candidate's weight exceeds the maximum
- if (candidate.getPolicyWeight() > maximum()) {
- candidates--;
- Node<K, V> evict = candidate;
- candidate = candidate.getPreviousInAccessOrder();
- evictEntry(evict, RemovalCause.SIZE, 0L);
- continue;
- }
- // Evict the entry with the lowest frequency
- candidates--;
- //最核心算法在這里:從probation的頭尾取出兩個node進(jìn)行比較頻率,頻率更小者將被remove
- if (admit(candidateKey, victimKey)) {
- Node<K, V> evict = victim;
- victim = victim.getNextInAccessOrder();
- evictEntry(evict, RemovalCause.SIZE, 0L);
- candidate = candidate.getPreviousInAccessOrder();
- } else {
- Node<K, V> evict = candidate;
- candidate = candidate.getPreviousInAccessOrder();
- evictEntry(evict, RemovalCause.SIZE, 0L);
- }
- }
- }
上面的代碼邏輯是從probation的頭尾取出兩個node進(jìn)行比較頻率,頻率更小者將被remove,其中尾部元素就是上一部分從eden中淘汰出來的元素,如果將兩步邏輯合并起來講是這樣的:在eden隊列通過lru淘汰出來的”候選者“與probation隊列通過lru淘汰出來的“被驅(qū)逐者“進(jìn)行頻率比較,失敗者將被從cache中真正移除。下面看一下它的比較邏輯admit:
- boolean admit(K candidateKey, K victimKey) {
- int victimFreq = frequencySketch().frequency(victimKey);
- int candidateFreq = frequencySketch().frequency(candidateKey);
- //如果候選者的頻率高就淘汰被驅(qū)逐者
- if (candidateFreq > victimFreq) {
- return true;
- //如果被驅(qū)逐者比候選者的頻率高,并且候選者頻率小于等于5則淘汰者
- } else if (candidateFreq <= 5) {
- // The maximum frequency is 15 and halved to 7 after a reset to age the history. An attack
- // exploits that a hot candidate is rejected in favor of a hot victim. The threshold of a warm
- // candidate reduces the number of random acceptances to minimize the impact on the hit rate.
- return false;
- }
- //隨機(jī)淘汰
- int random = ThreadLocalRandom.current().nextInt();
- return ((random & 127) == 0);
- }
從frequencySketch取出候選者與被驅(qū)逐者的頻率,如果候選者的頻率高就淘汰被驅(qū)逐者,如果被驅(qū)逐者比候選者的頻率高,并且候選者頻率小于等于5則淘汰者,如果前面兩個條件都不滿足則隨機(jī)淘汰。
整個過程中你是不是發(fā)現(xiàn)protectedDeque并沒有什么作用,那它是怎么作為主存區(qū)來保存大部分?jǐn)?shù)據(jù)的呢?
- //onAccess方法觸發(fā)該方法
- void reorderProbation(Node<K, V> node) {
- if (!accessOrderProbationDeque().contains(node)) {
- // Ignore stale accesses for an entry that is no longer present
- return;
- } else if (node.getPolicyWeight() > mainProtectedMaximum()) {
- return;
- }
- long mainProtectedWeightedSize = mainProtectedWeightedSize() + node.getPolicyWeight();
- //先從probation中移除
- accessOrderProbationDeque().remove(node);
- //加入到protected中
- accessOrderProtectedDeque().add(node);
- node.makeMainProtected();
- long mainProtectedMaximum = mainProtectedMaximum();
- //從protected中移除
- while (mainProtectedWeightedSize > mainProtectedMaximum) {
- Node<K, V> demoted = accessOrderProtectedDeque().pollFirst();
- if (demoted == null) {
- break;
- }
- demoted.makeMainProbation();
- //加入到probation中
- accessOrderProbationDeque().add(demoted);
- mainProtectedWeightedSize -= node.getPolicyWeight();
- }
- lazySetMainProtectedWeightedSize(mainProtectedWeightedSize);
- }
當(dāng)數(shù)據(jù)被訪問時并且該數(shù)據(jù)在probation中,這個數(shù)據(jù)就會移動到protected中去,同時通過lru從protected中淘汰一個數(shù)據(jù)進(jìn)入到probation中。
這樣數(shù)據(jù)流轉(zhuǎn)的邏輯全部通了:新數(shù)據(jù)都會進(jìn)入到eden中,通過lru淘汰到probation,并與probation中通過lru淘汰的數(shù)據(jù)進(jìn)行使用頻率pk,如果勝利了就繼續(xù)留在probation中,如果失敗了就會被直接淘汰,當(dāng)這條數(shù)據(jù)被訪問了,則移動到protected。當(dāng)其它數(shù)據(jù)被訪問了,則它可能會從protected中通過lru淘汰到probation中。
TinyLFU
傳統(tǒng)LFU一般使用key-value形式來記錄每個key的頻率,優(yōu)點是數(shù)據(jù)結(jié)構(gòu)非常簡單,并且能跟緩存本身的數(shù)據(jù)結(jié)構(gòu)復(fù)用,增加一個屬性記錄頻率就行了,它的缺點也比較明顯就是頻率這個屬性會占用很大的空間,但如果改用壓縮方式存儲頻率呢? 頻率占用空間肯定可以減少,但會引出另外一個問題:怎么從壓縮后的數(shù)據(jù)里獲得對應(yīng)key的頻率呢?
TinyLFU的解決方案是類似位圖的方法,將key取hash值獲得它的位下標(biāo),然后用這個下標(biāo)來找頻率,但位圖只有0、1兩個值,那頻率明顯可能會非常大,這要怎么處理呢? 另外使用位圖需要預(yù)占非常大的空間,這個問題怎么解決呢?
TinyLFU根據(jù)最大數(shù)據(jù)量設(shè)置生成一個long數(shù)組,然后將頻率值保存在其中的四個long的4個bit位中(4個bit位不會大于15),取頻率值時則取四個中的最小一個。
Caffeine認(rèn)為頻率大于15已經(jīng)很高了,是屬于熱數(shù)據(jù),所以它只需要4個bit位來保存,long有8個字節(jié)64位,這樣可以保存16個頻率。取hash值的后左移兩位,然后加上hash四次,這樣可以利用到16個中的13個,利用率挺高的,或許有更好的算法能將16個都利用到。
- public void increment(@Nonnull E e) {
- if (isNotInitialized()) {
- return;
- }
- int hash = spread(e.hashCode());
- int start = (hash & 3) << 2;
- // Loop unrolling improves throughput by 5m ops/s
- int index0 = indexOf(hash, 0); //indexOf也是一種hash方法,不過會通過tableMask來限制范圍
- int index1 = indexOf(hash, 1);
- int index2 = indexOf(hash, 2);
- int index3 = indexOf(hash, 3);
- boolean added = incrementAt(index0, start);
- added |= incrementAt(index1, start + 1);
- added |= incrementAt(index2, start + 2);
- added |= incrementAt(index3, start + 3);
- //當(dāng)數(shù)據(jù)寫入次數(shù)達(dá)到數(shù)據(jù)長度時就重置
- if (added && (++size == sampleSize)) {
- reset();
- }
- }
給對應(yīng)位置的bit位四位的Int值加1:
- boolean incrementAt(int i, int j) {
- int offset = j << 2;
- long mask = (0xfL << offset);
- //當(dāng)已達(dá)到15時,次數(shù)不再增加
- if ((table[i] & mask) != mask) {
- table[i] += (1L << offset);
- return true;
- }
- return false;
- }
獲得值的方法也是通過四次hash來獲得,然后取最小值:
- public int frequency(@Nonnull E e) {
- if (isNotInitialized()) {
- return 0;
- }
- int hash = spread(e.hashCode());
- int start = (hash & 3) << 2;
- int frequency = Integer.MAX_VALUE;
- //四次hash
- for (int i = 0; i < 4; i++) {
- int index = indexOf(hash, i);
- //獲得bit位四位的Int值
- int count = (int) ((table[index] >>> ((start + i) << 2)) & 0xfL);
- //取最小值
- frequency = Math.min(frequency, count);
- }
- return frequency;
- }
當(dāng)數(shù)據(jù)寫入次數(shù)達(dá)到數(shù)據(jù)長度時就會將次數(shù)減半,一些冷數(shù)據(jù)在這個過程中將歸0,這樣會使hash沖突降低:
- void reset() {
- int count = 0;
- for (int i = 0; i < table.length; i++) {
- count += Long.bitCount(table[i] & ONE_MASK);
- table[i] = (table[i] >>> 1) & RESET_MASK;
- }
- size = (size >>> 1) - (count >>> 2);
- }