LongAdder ,這哥們勁兒大
我們在之前的文章中介紹到了 AtomicLong ,如果你還不了解,我建議你閱讀一下這篇文章
一場 Atomic XXX 的魔幻之旅
為什么我要先說 AtomicLong 呢?因為 LongAdder 的設(shè)計是根據(jù) AtomicLong 的缺陷來設(shè)計的。
為什么引入 LongAdder
我們知道,AtomicLong 是利用底層的 CAS 操作來提供并發(fā)性的,比如 addAndGet 方法
- public final long addAndGet(long delta) {
- return unsafe.getAndAddLong(this, valueOffset, delta) + delta;
- }
我們還知道,CAS 是一種輕量級的自旋方法,它的邏輯是采用自旋的方式不斷更新目標值,直到更新成功。(也即樂觀鎖的實現(xiàn)模式)
在并發(fā)數(shù)量比較低的場景中,線程沖突的概率比較小,自旋的次數(shù)不會很多。但是,在并發(fā)數(shù)量激增的情況下,會出現(xiàn)大量失敗并不斷自旋的場景,此時 AtomicLong 的自旋次數(shù)很容易會成為瓶頸。
為了解決這種缺陷,引入了本篇文章的主角 --- LongAdder,它主要解決高并發(fā)環(huán)境下 AtomictLong 的自旋瓶頸問題。
認識 LongAdder
我們先看一下 JDK 源碼中關(guān)于 LongAdder 的討論
這段話很清晰的表明,在多線程環(huán)境下,如果業(yè)務場景更側(cè)重于統(tǒng)計數(shù)據(jù)或者收集信息的話,LongAdder 要比 AtomicLong 有更好的性能,吞吐量更高,但是會占用更多的內(nèi)存。在并發(fā)數(shù)量較低或者單線程場景中,AtomicLong 和 LongAdder 具有相同的特征,也就是說,在上述場景中,AtomicLong 和 LongAdder 可以互換。
首先我們先來看一下 LongAdder類的定義:
可以看到,LongAdder 繼承于 Striped64類,并實現(xiàn)了 Serializable 接口。
Striped64 又是繼承于 Number 類,現(xiàn)在你可能不太清楚 Striped64 是做什么的,但是你別急。
我們知道,Number 類是基本數(shù)據(jù)類型的包裝類以及原子性包裝類的父類,繼承 Number 類表示它可以當作數(shù)值進行處理,然而這樣并不能說明什么,它對我們來說還是一頭霧水,隨后我翻了翻 Striped64 的繼承體系,發(fā)現(xiàn)這個類有幾個實現(xiàn),而這幾個實現(xiàn)能夠很好的幫助我們理解什么是 Striped64。
在其中我們看到了 Accumulator,它的中文概念就是累加器,所以我們猜測 Striped64 也能實現(xiàn)累加的功能。果不其然,在我求助了部分帖子之后,我們可以得出來一個結(jié)論:Striped64 就是一個支持 64 位的累加器。
但是那兩個 Accumulator 的實現(xiàn)可以實現(xiàn)累加,這個好理解,但是這兩個 Adder 呢?它們也能實現(xiàn)累加的功能嗎?
我們再次細致的查看一下 LongAdder 和 DoubleAdder 的源碼(因為這兩個類非常相似)。
(下面的敘事風格主要以 LongAdder 為準,DoubleAdder 會兼帶著聊一下,但不是主要敘事對象,畢竟這兩個類的基類非常相似。)
在翻閱一波源碼過后,我們發(fā)現(xiàn)了一些方法,比如 increment、decrement、add、reset、sum等,這些方法看起來好像是一個累加器所擁有的功能。
在詳細翻閱源碼后,我認證了我的想法,因為 increment 、decrement 都是調(diào)用的 add 方法,而 add 方法底層則使用了 longAccmulate,我給你畫個圖你就知道了。
所以,LongAdder 的第一個特性就是可以處理數(shù)值,而實現(xiàn)了 Serializable 這個接口表示 LongAdder 可以序列化,以便存儲在文件中或在網(wǎng)絡上傳輸。
然后我們繼續(xù)往下走,現(xiàn)在時間的羅盤指向了 LongAdder 的 add 方法,我們腦子里面已經(jīng)有印象,這個 LongAdder 方法是來做并發(fā)累加的,所以這個 add 方法如果不出意外就是并發(fā)的累加方法,那么很奇怪了,這個方法為什么沒有使用 synchronized 呢?
我們繼續(xù)觀察 add 方法都做了哪些操作。
開始時就是分配了一些變量而已啊,這也沒什么特殊的,繼續(xù)看到第二行的時候,我們發(fā)現(xiàn)有一個 cells 對象,這個 cells 對象是個數(shù)組,好像是個全局的。。。。。。那么在哪里定義的呢?
一想到還有個 Striped64 這個類,果斷點進去看到了這個全局變量,另外,Striped64 這個類很重要,我們可不能把它忘了。
在 Striped64 類中,赫然出現(xiàn)了下面這三個重要的全局變量。
由 transient 修飾的這三個全局變量,會保證不會被序列化。
我們先不解釋這三個變量是什么,有什么用,因為我們剛剛擼到了 add 方法,所以還是回到 add 方法上來:
繼續(xù)向下走,我們看到了 caseBase 方法,這個方法又是干什么的?點進去,發(fā)現(xiàn)還是 Striped64 中的方法,現(xiàn)在,我們陷入了沉思。
我們上面只是知道了 Striped64 是一個累加器,但是我現(xiàn)在要不要擼一遍 Striped64 的源碼??????
認識 Striped64
果然出來混都是要還的,上面沒有介紹的 Striped64 的三個變量,現(xiàn)在就要好好介紹一波了。
- cells 它是一個數(shù)組,如果不是 null ,它的大小就是 2 的次方。
- base 是 Striped64 的基本數(shù)值,主要在沒有爭用時使用,同時也作為 cells 初始化時使用,使用 CAS 進行更新。
- cellsBusy 它是一個 SpinLock(自旋鎖),在初始化 cells 時使用。
除了這三個變量之外,還有一個變量,我竟然把它忽視了,罪過罪過。
它表示的是 CPU 的核數(shù)。
好像并沒有看出來什么玄機,接著往下走
這里出現(xiàn)了一個 Cell 元素的內(nèi)部類,里面出現(xiàn)了 long 型的 value 值,cas 方法,UNSAFE,valueOffSet 元素,如果你看過我的這篇文章 Atomic 原子工具類詳解 或者你了解 AtomicLong 的設(shè)計的話,你就知道,Cell 的設(shè)計是和 AtomicLong 完全一樣的,都是使用了 volatile 變量、Unsafe 加上字段的偏移量,再用 CAS 進行修改。
而這個 Cell 元素,就是 cells 數(shù)組中的每個對象。
這里比較特殊的是 @sun.misc.Contended 注解,它是 Java 8 中新增的注解,用來避免緩存的偽共享,減少 CPU 緩存級別的競爭。
害,就這?
先不要著急,Striped64 最核心的功能是分別為 LongAdder 和 DoubleAdder 提供并發(fā)累加的功能,所以 Striped64 中的 longAccumulate 和 doubleAccumulate 才是關(guān)鍵,我們主要介紹 longAccumulate 方法,方法比較長,我們慢慢進入節(jié)奏。
最關(guān)鍵的 longAccumulate
先貼出來 longAccumulate 的完整代碼,然后我們再進行分析:
- final void longAccumulate(long x, LongBinaryOperator fn,
- boolean wasUncontended) {
- // 獲取線程的哈希值
- int h;
- if ((h = getProbe()) == 0) {
- ThreadLocalRandom.current(); // force initialization
- h = getProbe();
- wasUncontended = true;
- }
- boolean collide = false; // True if last slot nonempty
- for (;;) {
- Cell[] as; Cell a; int n; long v;
- if ((as = cells) != null && (n = as.length) > 0) { // cells 已經(jīng)初始化了
- if ((a = as[(n - 1) & h]) == null) { // 對應的 cell 不存在,需要新建
- if (cellsBusy == 0) { // 只有在 cells 沒上鎖時才嘗試新建
- Cell r = new Cell(x);
- if (cellsBusy == 0 && casCellsBusy()) { // 上鎖
- boolean created = false;
- try { // 上鎖后判斷 cells 對應元素是否被占用
- Cell[] rs; int m, j;
- if ((rs = cells) != null &&
- (m = rs.length) > 0 &&
- rs[j = (m - 1) & h] == null) {
- rs[j] = r;
- created = true;
- }
- } finally {
- cellsBusy = 0;
- }
- if (created) // cell 創(chuàng)建完畢,可以退出
- break;
- continue; // 加鎖后發(fā)現(xiàn) cell 元素已經(jīng)不再為空,輪詢重試
- }
- }
- collide = false;
- }
- // 下面這些 else 在嘗試檢測當前競爭度大不大,如果大則嘗試擴容,如
- // 果擴容已經(jīng)沒用了,則嘗試 rehash 來分散并發(fā)到不同的 cell 中
- else if (!wasUncontended) // 已知 CAS 失敗,說明并發(fā)度大
- wasUncontended = true; // rehash 后重試
- else if (a.cas(v = a.value, ((fn == null) ? v + x : // 嘗試 CAS 將值更新到 cell 中
- fn.applyAsLong(v, x))))
- break;
- else if (n >= NCPU || cells != as) // cells 數(shù)組已經(jīng)夠大,rehash
- collide = false; // At max size or stale
- else if (!collide) // 到此說明其它競爭已經(jīng)很大,rehash
- collide = true;
- else if (cellsBusy == 0 && casCellsBusy()) { // rehash 都沒用,嘗試擴容
- try {
- if (cells == as) { // 加鎖過程中可能有其它線程在擴容,需要排除該情形
- Cell[] rs = new Cell[n << 1];
- for (int i = 0; i < n; ++i)
- rs[i] = as[i];
- cells = rs;
- }
- } finally {
- cellsBusy = 0;
- }
- collide = false;
- continue; // Retry with expanded table
- }
- h = advanceProbe(h); // rehash
- }
- else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells 未初始化
- boolean init = false;
- try { // Initialize table
- if (cells == as) {
- Cell[] rs = new Cell[2];
- rs[h & 1] = new Cell(x);
- cells = rs;
- init = true;
- }
- } finally {
- cellsBusy = 0;
- }
- if (init)
- break;
- }
- else if (casBase(v = base, ((fn == null) ? v + x :
- fn.applyAsLong(v, x))))
- break; // 其它線程在初始化 cells 或在擴容,嘗試更新 base
- }
- }
先別忙著驚訝,整理好心情慢慢看。
首先,在 Striped64 中,會先計算哈希,哈希值用于分發(fā)線程到 cells 數(shù)組。Striped64 中利用了 Thread 類中用來做偽隨機數(shù)的 threadLocalRandomProbe:
- public class Thread implements Runnable {
- @sun.misc.Contended("tlr")
- int threadLocalRandomProbe;
- }
Striped64 中復制(直接拿來用)了 ThreadLocalRandom 中的一些方法,使用 unsafe 來獲取和修改字段值。
可以理解為 getProbe 用來獲取哈希值,advanceProbe 用來更新哈希值。
而其中的 PROBE 常量是在類加載的時候從類加載器提取的 threadLocalRandomProbe 的常量值。
然后是一系列的循環(huán)判斷向 cell 數(shù)組映射的操作,因為 Cells 類占用比較多的空間,所以它的初始化按需進行的,開始時為空,需要時先創(chuàng)建兩個元素,不夠用時再擴展成兩倍大小。在修改 cells 數(shù)組(如擴展)時需要加鎖,這也就是 cellsBusy 的作用。
釋放鎖只需要將 cellsBusy 從 0 -> 1 即可。
- cellsBusy = 0;
另外,這個方法雖然代碼行很多,使用了很多 if else ,但其實代碼設(shè)計使用了雙重檢查鎖,也就是下面這種模式
- if (condition_met) { // 只在必要時進入
- lock(); // 加鎖
- done = false; // 因為外層有輪詢,需要記錄任務是否需要繼續(xù)
- try {
- if (condition_met) { // 前面的 if 到加鎖間狀態(tài)可能變化,需要重新判斷
- // ...
- done = true; // 任務完成
- }
- } finally {
- unlock(); // 確保鎖釋放
- }
- if (done) // 任務完成,可以退出輪詢
- break;
- }
而 doubleAccumulate 的整體邏輯與 longAccumulate 幾乎一樣,區(qū)別在于將 double 存儲成 long 時需要轉(zhuǎn)換。例如在創(chuàng)建 cell 時,需要
- Cell r = new Cell(Double.doubleToRawLongBits(x));
doubleToRawLongBits 是一個 native 方法,將 double 轉(zhuǎn)成 long。在累加時需要再轉(zhuǎn)來回:
- else if (a.cas(v = a.value,
- ((fn == null) ?
- Double.doubleToRawLongBits
- (Double.longBitsToDouble(v) + x) : // 轉(zhuǎn)回 double 做累加
- Double.doubleToRawLongBits
- (fn.applyAsDouble
- (Double.longBitsToDouble(v), x)))))
上面的流程我們只是高度概括了下,實際的分支要遠比我們概括的更多,longAccumulate 會根據(jù)不同的狀態(tài)來執(zhí)行不同的分支,比如在線程競爭非常激烈的情況下,還會通過對 cells 進行擴容或者重新計算哈希值來重新分散線程,這些做法的目的是將多個線程的計數(shù)請求分散到不同的 cell 中的 index 上,這其實和 ConcurrentHashMap 的設(shè)計思路一樣,只不過 Java7 中的 ConcurrentHashMap 實現(xiàn) segment 加鎖使用了比較重的 synchronized 鎖,而 Striped64 使用了輕量級的 unsafe CAS 來進行并發(fā)操作。
一口氣終于講完一個段落了,累屁我了,歇會繼續(xù)肝下面。
下面再說回 LongAdder 這個類。
LongAdder 的再認識
所以,LongAdder 的原理就是,在最初無競爭時,只更新 base 值,當有多線程競爭時通過分段的思想,讓不同的線程更新不同的段,最后把這些段相加就得到了完整的 LongAdder 存儲的值,下面我畫個圖幫助你理解一下。
如果你理解了上面 Striped64 的描述和上面這幅圖之后,LongAdder 你就理解的差不多了,最后還有一個 LongAdder 中的 sum 方法需要強調(diào)下:
sum 方法會返回當前總和,在沒有并發(fā)的情況下會返回一個準確的結(jié)果,也就是把所有的 base 值相加求和之后的結(jié)果,那么,現(xiàn)在有一個問題,如果前面已經(jīng)累加到 sum 上的 Cell 的 value 值有修改,不就沒法計算了嗎?
這里的結(jié)論就是,LongAdder 不是強一致性的,它是最終一致性。
后記
這篇我和你聊了一下為什么引入 LongAdder 以及 AtomicLong 有哪些缺陷,然后帶你了解了一下 LongAdder 的源碼和它的底層實現(xiàn),如果這篇文章對你有幫助的話,可以給我個三連,你的支持是我更新最大的動力!