自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

詳解LongAdder實現(xiàn)原理

開發(fā) 前端
AtomicInteger、AtomicLong使用非阻塞的CAS算法原子性地更新某一個變量,比synchronized這些阻塞算法擁有更好的性能,但是在高并發(fā)情況下,大量線程同時去更新一個變量,由于同一時間只有一個線程能夠成功,絕大部分的線程在嘗試更新失敗后,會通過自旋的方式再次進行嘗試,嚴重占用了CPU的時間片。

 前言

AtomicInteger、AtomicLong使用非阻塞的CAS算法原子性地更新某一個變量,比synchronized這些阻塞算法擁有更好的性能,但是在高并發(fā)情況下,大量線程同時去更新一個變量,由于同一時間只有一個線程能夠成功,絕大部分的線程在嘗試更新失敗后,會通過自旋的方式再次進行嘗試,嚴重占用了CPU的時間片。

AtomicLong的實現(xiàn)原理圖:

 

 

LongAdder是JDK8新增的原子操作類,它提供了一種新的思路,既然AtomicLong的性能瓶頸是由于大量線程同時更新一個變量造成的,那么能不能把這個變量拆分出來,變成多個變量,然后讓線程去競爭這些變量,最后合并即可?LongAdder的設計精髓就在這里,通過將變量拆分成多個元素,降低該變量的并發(fā)度,最后進行合并元素,變相的減少了CAS的失敗次數(shù)。

LongAdder的實現(xiàn)原理圖:

LongAdder實現(xiàn)原理

常用方法

 

  1. public class LongAdder extends Striped64 implements Serializable { 
  2.     //構造方法 
  3.     public LongAdder() { 
  4.     } 
  5.     //加1操作 
  6.     public void increment(); 
  7.     //減1操作 
  8.     public void decrement(); 
  9.     //獲取原子變量的值 
  10.     public long longValue(); 
  11.  

 

下面給出一個簡單的例子,模擬50線程同時進行更新

 

  1. package com.xue.testLongAdder; 
  2.  
  3. import java.util.concurrent.ExecutorService; 
  4. import java.util.concurrent.Executors; 
  5. import java.util.concurrent.atomic.LongAdder; 
  6.  
  7. public class Main { 
  8.     public static void main(String[] args) { 
  9.         LongAdder adder = new LongAdder(); 
  10.         ExecutorService threadPool = Executors.newFixedThreadPool(20); 
  11.         for (int i = 0; i < 50; i++) { 
  12.             Runnable r = () -> { 
  13.                 adder.add(1); 
  14.             }; 
  15.             threadPool.execute(r); 
  16.         } 
  17.         threadPool.shutdown(); 
  18.         //若關閉線程池后,所有任務執(zhí)行完畢,則isTerminated()返回true 
  19.         while (!threadPool.isTerminated()) { 
  20.             System.out.println(adder.longValue()); 
  21.             break; 
  22.         } 
  23.     } 

 

輸出結果是50

其中,如果對線程池不熟悉的同學,可以先參考我的另外一篇文章說說線程池

原理解析

類圖

LongAdder實現(xiàn)原理

LongAdder內部維護了一個Cell類型的數(shù)組,其中Cell是Striped64中的一個靜態(tài)內部類。

Cell類

 

  1. abstract class Striped64 extends Number { 
  2.    
  3.     @sun.misc.Contended static final class Cell { 
  4.         volatile long value; 
  5.         Cell(long x) { value = x; } 
  6.         final boolean cas(long cmp, long val) { 
  7.             return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); 
  8.         } 
  9.     } 
  10.  

 

Cell用來封裝被拆分出來的元素,內部用一個value字段保存當前元素的值,等到需要合并時,則累加所有Cell數(shù)組中的value。Cell內部使用CAS操作來更新value值,對CAS操作不熟悉的同學,可以參考我的另外一篇文章淺探CAS實現(xiàn)原理

可以注意到,Cell類被 @sun.misc.Contended注解修飾了,這個注解是為了解決偽共享問題的,什么是偽共享?

  • 一個緩存行可以存儲多個變量(存滿當前緩存行的字節(jié)數(shù));而CPU對緩存的修改又是以緩存行為最小單位的,在多線程情況下,如果需要修改“共享同一個緩存行的變量”,就會無意中影響彼此的性能,這就是偽共享(False Sharing)。

對偽共享還不理解的同學,可以參考這位大佬的文章偽共享(False Sharing)底層原理及其解決方式

而LongAdder采用的是Cell數(shù)組,而數(shù)組元素是連續(xù)的,因此多個Cell對象共享一個緩存行的情況非常普遍,因此這里@sun.misc.Contended注解對單個Cell元素進行字節(jié)填充,確保一個Cell對象占據(jù)一個緩存行,即填充至64字節(jié)。

關于如何確定一個對象的大小,可以參考我的另外一篇文章對象的內存布局,怎樣確定對象的大小,這樣可以算出來,還需要填充多少字節(jié)。

longValue()

longValue()返回累加后的值

 

  1. public long longValue() { 
  2.      return sum(); 
  3.  } 
  4.  
  5.  public long sum() { 
  6.      Cell[] as = cells; Cell a; 
  7.      long sum = base; 
  8.      //當Cell數(shù)組不為null時,進行累加后返回,否則直接返回基準數(shù)base 
  9.      if (as != null) { 
  10.          for (int i = 0; i < as.length; ++i) { 
  11.              if ((a = as[i]) != null
  12.                  sum += a.value; 
  13.          } 
  14.      } 
  15.      return sum
  16.  } 

 

這可能是LongAdder中最簡單的方法了,就不進行贅述了。什么,你要看復雜的?好的,這就來了。

increment()

 

  1. public void increment() { 
  2.        add(1L); 
  3.     } 
  4.  
  5.  public void add(long x) { 
  6.      Cell[] as; long b, v; int m; Cell a; 
  7.      /** 
  8.       * 如果一下兩種條件則繼續(xù)執(zhí)行if內的語句 
  9.       * 1. cells數(shù)組不為null(不存在爭用的時候,cells數(shù)組一定為null,一旦對base的cas操作失敗, 
  10.       * 才會初始化cells數(shù)組) 
  11.       * 2. 如果cells數(shù)組為null,如果casBase執(zhí)行成功,則直接返回,如果casBase方法執(zhí)行失敗 
  12.       * (casBase失敗,說明第一次爭用沖突產生,需要對cells數(shù)組初始化)進入if內; 
  13.       * casBase方法很簡單,就是通過UNSAFE類的cas設置成員變量base的值為base+要累加的值 
  14.       * casBase執(zhí)行成功的前提是無競爭,這時候cells數(shù)組還沒有用到為null,可見在無競爭的情況下是 
  15.       * 類似于AtomticInteger處理方式,使用cas做累加。 
  16.       */ 
  17.      if ((as = cells) != null || !casBase(b = base, b + x)) { 
  18.          //uncontended判斷cells數(shù)組中,當前線程要做cas累加操作的某個元素是否#不#存在爭用, 
  19.          //如果cas失敗則存在爭用;uncontended=false代表存在爭用,uncontended=true代表不存在爭用。 
  20.          boolean uncontended = true
  21.          /** 
  22.           *1. as == null : cells數(shù)組未被初始化,成立則直接進入if執(zhí)行cell初始化 
  23.           *2. (m = as.length - 1) < 0: cells數(shù)組的長度為0 
  24.           *條件1與2都代表cells數(shù)組沒有被初始化成功,初始化成功的cells數(shù)組長度為2; 
  25.           *3. (a = as[getProbe() & m]) == null :如果cells被初始化,且它的長度不為0, 
  26.           * 則通過getProbe方法獲取當前線程Thread的threadLocalRandomProbe變量的值,初始為0, 
  27.           * 然后執(zhí)行threadLocalRandomProbe&(cells.length-1 ),相當于m%cells.length; 
  28.           * 如果cells[threadLocalRandomProbe%cells.length]的位置為null, 
  29.           * 這說明這個位置從來沒有線程做過累加, 
  30.           * 需要進入if繼續(xù)執(zhí)行,在這個位置創(chuàng)建一個新的Cell對象; 
  31.           *4. !(uncontended = a.cas(v = a.value, v + x)): 
  32.           * 嘗試對cells[threadLocalRandomProbe%cells.length]位置的Cell對象中的value值做累加操作, 
  33.           * 并返回操作結果,如果失敗了則進入if,重新計算一個threadLocalRandomProbe; 
  34.           如果進入if語句執(zhí)行l(wèi)ongAccumulate方法,有三種情況 
  35.           1. 前兩個條件代表cells沒有初始化, 
  36.           2. 第三個條件指當前線程hash到的cells數(shù)組中的位置還沒有其它線程做過累加操作, 
  37.           3. 第四個條件代表產生了沖突,uncontended=false 
  38.           **/ 
  39.          if (as == null || (m = as.length - 1) < 0 || 
  40.                  (a = as[getProbe() & m]) == null || 
  41.                  !(uncontended = a.cas(v = a.value, v + x))) 
  42.              longAccumulate(x, null, uncontended); 
  43.      } 
  44.  } 

 

其中l(wèi)ongAccumulate()的解析如下:

 

  1. final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { 
  2.     //獲取當前線程的threadLocalRandomProbe值作為hash值,如果當前線程的threadLocalRandomProbe為0, 
  3.     // 說明當前線程是第一次進入該方法,則強制設置線程的threadLocalRandomProbe為ThreadLocalRandom類的成員 
  4.     // 靜態(tài)私有變量probeGenerator的值,后面會詳細將hash值的生成; 
  5.     //另外需要注意,如果threadLocalRandomProbe=0,代表新的線程開始參與cell爭用的情況 
  6.     //1.當前線程之前還沒有參與過cells爭用(也許cells數(shù)組還沒初始化,進到當前方法來就是為了初始化cells數(shù)組 
  7.     //后爭用的), 
  8.     // 是第一次執(zhí)行base的cas累加操作失?。?nbsp;
  9.     //2.或者是在執(zhí)行add方法時,對cells某個位置的Cell的cas操作第一次失敗,則將wasUncontended設置為false, 
  10.     // 那么這里會將其重新置為true;第一次執(zhí)行操作失?。?nbsp;
  11.     //凡是參與了cell爭用操作的線程threadLocalRandomProbe都不為0; 
  12.     int h; 
  13.     if ((h = getProbe()) == 0) { 
  14.         //初始化ThreadLocalRandom; 
  15.         ThreadLocalRandom.current(); // force initialization 
  16.         //將h設置為0x9e3779b9 
  17.         h = getProbe(); 
  18.         //設置未競爭標記為true 
  19.         wasUncontended = true
  20.     } 
  21.     //cas沖突標志,表示當前線程hash到的Cells數(shù)組的位置,做cas累加操作時與其它線程發(fā)生了沖突,cas失??; 
  22.     // collide=true代表有沖突,collide=false代表無沖突 
  23.     boolean collide = false
  24.     for (;;) { 
  25.         Cell[] as; Cell a; int n; long v; 
  26.         //這個主干if有三個分支 
  27.         //1.主分支一:處理cells數(shù)組已經正常初始化了的情況(這個if分支處理add方法的四個條件中的3和4) 
  28.         //2.主分支二:處理cells數(shù)組沒有初始化或者長度為0的情況;(這個分支處理add方法的四個條件中的1和2) 
  29.         //3.主分支三:處理如果cell數(shù)組沒有初始化,并且其它線程正在執(zhí)行對cells數(shù)組初始化的操作, 
  30.         // 及cellbusy=1; 
  31.         // 則嘗試將累加值通過cas累加到base上 
  32.         //先看主分支一 
  33.         if ((as = cells) != null && (n = as.length) > 0) { 
  34.             /** 
  35.              *內部小分支一:這個是處理add方法內部if分支的條件3:如果被hash到的位置為null, 
  36.              * 說明沒有線程在這個位置設置過值, 
  37.              * 沒有競爭,可以直接使用,則用x值作為初始值創(chuàng)建一個新的Cell對象, 
  38.              * 對cells數(shù)組使用cellsBusy加鎖, 
  39.              * 然后將這個Cell對象放到cells[m%cells.length]位置上 
  40.              */ 
  41.             if ((a = as[(n - 1) & h]) == null) { 
  42.                 //cellsBusy == 0 代表當前沒有線程cells數(shù)組做修改 
  43.                 if (cellsBusy == 0) { 
  44.                     //將要累加的x值作為初始值創(chuàng)建一個新的Cell對象, 
  45.                     Cell r = new Cell(x); 
  46.                     //如果cellsBusy=0無鎖,則通過cas將cellsBusy設置為1加鎖 
  47.                     if (cellsBusy == 0 && casCellsBusy()) { 
  48.                         //標記Cell是否創(chuàng)建成功并放入到cells數(shù)組被hash的位置上 
  49.                         boolean created = false
  50.                         try { 
  51.                             Cell[] rs; int m, j; 
  52.                             //再次檢查cells數(shù)組不為null,且長度不為空,且hash到的位置的Cell為null 
  53.                             if ((rs = cells) != null && 
  54.                                     (m = rs.length) > 0 && 
  55.                                     rs[j = (m - 1) & h] == null) { 
  56.                                 //將新的cell設置到該位置 
  57.                                 rs[j] = r; 
  58.                                 created = true
  59.                             } 
  60.                         } finally { 
  61.                             //去掉鎖 
  62.                             cellsBusy = 0; 
  63.                         } 
  64.                         //生成成功,跳出循環(huán) 
  65.                         if (created) 
  66.                             break; 
  67.                         //如果created為false,說明上面指定的cells數(shù)組的位置cells[m%cells.length] 
  68.                         // 已經有其它線程設置了cell了, 
  69.                         // 繼續(xù)執(zhí)行循環(huán)。 
  70.                         continue
  71.                     } 
  72.                 } 
  73.                 //如果執(zhí)行的當前行,代表cellsBusy=1,有線程正在更改cells數(shù)組,代表產生了沖突,將collide設置為false 
  74.                 collide = false
  75.  
  76.                 /** 
  77.                  *內部小分支二:如果add方法中條件4的通過cas設置cells[m%cells.length]位置的Cell對象中的 
  78.                  * value值設置為v+x失敗, 
  79.                  * 說明已經發(fā)生競爭,將wasUncontended設置為true,跳出內部的if判斷, 
  80.                  * 最后重新計算一個新的probe,然后重新執(zhí)行循環(huán); 
  81.                  */ 
  82.             } else if (!wasUncontended) 
  83.                 //設置未競爭標志位true,繼續(xù)執(zhí)行,后面會算一個新的probe值,然后重新執(zhí)行循環(huán)。 
  84.                 wasUncontended = true
  85.             /** 
  86.              *內部小分支三:新的爭用線程參與爭用的情況:處理剛進入當前方法時threadLocalRandomProbe=0的情況, 
  87.              * 也就是當前線程第一次參與cell爭用的cas失敗,這里會嘗試將x值加到cells[m%cells.length] 
  88.              * 的value ,如果成功直接退出 
  89.              */ 
  90.             else if (a.cas(v = a.value, ((fn == null) ? v + x : 
  91.                     fn.applyAsLong(v, x)))) 
  92.                 break; 
  93.             /** 
  94.              *內部小分支四:分支3處理新的線程爭用執(zhí)行失敗了,這時如果cells數(shù)組的長度已經到了最大值 
  95.              * (大于等于cup數(shù)量), 
  96.              * 或者是當前cells已經做了擴容,則將collide設置為false,后面重新計算prob的值*/ 
  97.             else if (n >= NCPU || cells != as
  98.                 collide = false
  99.             /** 
  100.              *內部小分支五:如果發(fā)生了沖突collide=false,則設置其為true;會在最后重新計算hash值后, 
  101.              * 進入下一次for循環(huán) 
  102.              */ 
  103.             else if (!collide) 
  104.                 //設置沖突標志,表示發(fā)生了沖突,需要再次生成hash,重試。 
  105.                 // 如果下次重試任然走到了改分支此時collide=true,!collide條件不成立,則走后一個分支 
  106.                 collide = true
  107.             /** 
  108.              *內部小分支六:擴容cells數(shù)組,新參與cell爭用的線程兩次均失敗,且符合庫容條件,會執(zhí)行該分支 
  109.              */ 
  110.             else if (cellsBusy == 0 && casCellsBusy()) { 
  111.                 try { 
  112.                     //檢查cells是否已經被擴容 
  113.                     if (cells == as) {      // Expand table unless stale 
  114.                         Cell[] rs = new Cell[n << 1]; 
  115.                         for (int i = 0; i < n; ++i) 
  116.                             rs[i] = as[i]; 
  117.                         cells = rs; 
  118.                     } 
  119.                 } finally { 
  120.                     cellsBusy = 0; 
  121.                 } 
  122.                 collide = false
  123.                 continue;                   // Retry with expanded table 
  124.             } 
  125.             //為當前線程重新計算hash值 
  126.             h = advanceProbe(h); 
  127.  
  128.             //這個大的分支處理add方法中的條件1與條件2成立的情況,如果cell表還未初始化或者長度為0, 
  129.             // 先嘗試獲取cellsBusy鎖。 
  130.         }else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 
  131.             boolean init = false
  132.             try {  // Initialize table 
  133.                 //初始化cells數(shù)組,初始容量為2,并將x值通過hash&1,放到0個或第1個位置上 
  134.                 if (cells == as) { 
  135.                     Cell[] rs = new Cell[2]; 
  136.                     rs[h & 1] = new Cell(x); 
  137.                     cells = rs; 
  138.                     init = true
  139.                 } 
  140.             } finally { 
  141.                 //解鎖 
  142.                 cellsBusy = 0; 
  143.             } 
  144.             //如果init為true說明初始化成功,跳出循環(huán) 
  145.             if (init) 
  146.                 break; 
  147.         } 
  148.         /** 
  149.          *如果以上操作都失敗了,則嘗試將值累加到base上; 
  150.          */ 
  151.         else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))  
  152.             // Fall back on using base 
  153.             break; 
  154.     } 

 

以上2個方法的解析搬自于源碼閱讀:全方位講解LongAdder,此處對代碼做了微調,方便閱讀。

總結

LongAdder在沒有線程競爭的時候,只使用base值,此時的情況就類似與AtomicLong。但LongAdder的高明之處在于,發(fā)生線程競爭時,便會使用到Cell數(shù)組,所以該數(shù)組是惰性加載的。

Cell數(shù)組初始值為2,每次擴容(當線程競爭異常激烈時,發(fā)生擴容)為上次長度的2倍,因此數(shù)組長度一直是2的次冪,但是當數(shù)組長度≥CPU的核心數(shù)時,就不再進行擴容。為什么?我的理解是在一臺電腦中,最多能有CPU核心數(shù)個線程能夠并行,因此同時也就這么多個線程操作Cell數(shù)組,每個線程更新一個位置上的元素,且又因為數(shù)組中每個元素由于字節(jié)填充機制,十分的占據(jù)內存??紤]到這兩個因素,Cell數(shù)組在長度≥CPU核心數(shù)時,停止擴容。

確實,LongAdder花了很多心思提高了高并發(fā)下程序運行的效率,每一步都是在沒有更好的辦法下才會去選擇開銷更大的操作。在低并發(fā)下,LongAdder和AtomicLong效率上差不多,但LongAdder更加耗費內存。不過在高并發(fā)下,LongAdder將更加高效。

 

責任編輯:姜華 來源: 今日頭條
相關推薦

2022-12-19 08:00:00

SpringBootWeb開發(fā)

2015-07-10 12:23:05

JsPatch實現(xiàn)原理

2014-04-22 09:51:24

LongAdderAtomicLong

2021-05-08 07:57:17

ServletFilter設計模式

2023-07-11 08:00:00

2024-12-27 08:24:55

2020-12-21 07:31:23

實現(xiàn)單機JDK

2009-03-25 09:00:11

Group By排序MySQL

2023-07-17 08:02:44

ZuulIO反應式

2024-06-21 09:28:43

2021-10-15 06:43:11

LongAdderStriped64Java7

2021-08-29 07:41:48

數(shù)據(jù)HashMap底層

2021-12-13 10:43:45

HashMapJava集合容器

2011-08-25 10:07:24

Lua 5.0函數(shù)編譯器

2021-06-30 10:32:33

反射多態(tài)Java

2021-10-29 13:26:54

單點登錄SSO

2009-06-11 16:25:44

EJB2.0EJB

2021-08-31 07:36:22

LinkedListAndroid數(shù)據(jù)結構

2024-01-05 09:00:00

SpringMVC軟件

2020-09-13 13:26:10

Kafka消費者控制器
點贊
收藏

51CTO技術棧公眾號