Java 原子操作類之18羅漢增強類
Java開發(fā)手冊
17.【參考】volatile 解決多線程內(nèi)存不可見問題對于一寫多讀,是可以解決變量同步問題,但是如果多
寫,同樣無法解決線程安全問題。
說明:如果是 count++操作,使用如下類實現(xiàn):
AtomicInteger count = new AtomicInteger();
count.addAndGet(1);
如果是 JDK8,推薦使用 LongAdder 對象,比 AtomicLong 性能更好(減少樂觀鎖的重試次數(shù))
基本類型原子類
- AtomicInteger
- AtomicBoolean
- AtomicLong
常用API簡介
- public final int get() //獲取當前的值
- public final int getAndSet(int newValue)//獲取當前的值,并設置新的值
- public final int getAndIncrement()//獲取當前的值,并自增
- public final int getAndDecrement() //獲取當前的值,并自減
- public final int getAndAdd(int delta) //獲取當前的值,并加上預期的值
- boolean compareAndSet(int expect, int update) //如果輸入的數(shù)值等于預期值,則以原子方式將該值設置為輸入值(update)
舉個栗子
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
class MyNumber {
AtomicInteger atomicInteger = new AtomicInteger();
public void addPlusPlus(){
atomicInteger.incrementAndGet();
}
}
public class AtomicIntegerDemo {
public static final int SIZE = 50;
public static void main(String[] args) throws InterruptedException {
MyNumber myNumber = new MyNumber();
CountDownLatch countDownLatch = new CountDownLatch(SIZE);
for (int i = 1; i <= SIZE; i++) {
new Thread(() -> {
try {
for (int j = 1 ;j <=1000; j++) {
myNumber.addPlusPlus();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t"+"---result : "+myNumber.atomicInteger.get());
}
}
上述案例使用的AtomicInteger進行的類似累加的操作,底層使用的volatile來實現(xiàn)的,已經(jīng)保障了可見性,所以數(shù)據(jù)一定是正確的。
之所以是CountDownLatch 是為了保障main線程輸出結(jié)果的時候,所有的線程都已經(jīng)完成了計算。
數(shù)組類型原子類
- AtomicIntegerArray
- AtomicLongArray
- AtomicReferenceArray
demo
import java.util.concurrent.atomic.AtomicIntegerArray;
public class AtomicIntegerArrayDemo {
public static void main(String[] args) {
AtomicIntegerArray atomicIntegerArray = new AtomicIntegerArray(5);
for (int i = 0; i <atomicIntegerArray.length(); i++) {
System.out.println(atomicIntegerArray.get(i));
}
System.out.println();
System.out.println();
System.out.println();
int tmpInt = 0;
tmpInt = atomicIntegerArray.getAndSet(0,1122);
System.out.println(tmpInt+"\t"+atomicIntegerArray.get(0));
atomicIntegerArray.getAndIncrement(1);
atomicIntegerArray.getAndIncrement(1);
tmpInt = atomicIntegerArray.getAndIncrement(1);
System.out.println(tmpInt+"\t"+atomicIntegerArray.get(1));
}
}
引用類型原子類
- AtomicReference
- AtomicStampedReference
- AtomicMarkableReference
AtomicReference
使用場景
解決并發(fā)修改多個屬性
AtomicInteger、AtomicBoolean等java.util.concurrent包下面的類,但是這個只能并發(fā)修改一個屬性,如果我需要對多個屬性同時進行并發(fā)修改,并且保證原子性呢?
AtomicReference和AtomicInteger非常類似,不同之處就在于AtomicInteger是對整數(shù)的封裝,而AtomicReference則對應普通的對象引用,是操控多個屬性的原子性的并發(fā)類。
舉個栗子
public class AtomicReferenceDemo {
public static void main(String[] args) {
User z3 = new User("z3",24);
User li4 = new User("li4",26);
AtomicReference<User> atomicReferenceUser = new AtomicReference<>();
atomicReferenceUser.set(z3);
System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
System.out.println(atomicReferenceUser.compareAndSet(z3,li4)+"\t"+atomicReferenceUser.get().toString());
}
}
使用AtomicReference實現(xiàn)CAS
/**
* 題目:實現(xiàn)一個自旋鎖
* 自旋鎖好處:循環(huán)比較獲取沒有類似wait的阻塞。
*
* 通過CAS操作完成自旋鎖,A線程先進來調(diào)用myLock方法自己持有鎖5秒鐘,B隨后進來后發(fā)現(xiàn)
* 當前有線程持有鎖,不是null,所以只能通過自旋等待,直到A釋放鎖后B隨后搶到。
*/
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void MyLock() {
System.out.println(Thread.currentThread().getName()+"\t"+"---come in");
while(!atomicReference.compareAndSet(null,Thread.currentThread())) {
}
System.out.println(Thread.currentThread().getName()+"\t"+"---持有鎖成功");
}
public void MyUnLock() {
atomicReference.compareAndSet(Thread.currentThread(),null);
System.out.println(Thread.currentThread().getName()+"\t"+"---釋放鎖成功");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(() -> {
spinLockDemo.MyLock();
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
spinLockDemo.MyUnLock();
},"t1").start();
new Thread(() -> {
spinLockDemo.MyLock();
spinLockDemo.MyUnLock();
},"t2").start();
}
}
AtomicStampedReference
攜帶版本號的引用類型原子類,可以解決ABA問題
demo
public class ABADemo {
static AtomicInteger atomicInteger = new AtomicInteger(100);
static AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);
public static void main(String[] args) {
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName()+"\t"+"---默認版本號: "+stamp);
//讓后面的t4獲得和t3一樣的版本號,都是1,好比較
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
atomicStampedReference.compareAndSet(100,101,stamp,stamp+1);
System.out.println(Thread.currentThread().getName()+"\t"+"---1次版本號: "+atomicStampedReference.getStamp());
atomicStampedReference.compareAndSet(101,100,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println(Thread.currentThread().getName()+"\t"+"---2次版本號: "+atomicStampedReference.getStamp());
},"t3").start();
new Thread(() -> {
int stamp = atomicStampedReference.getStamp();
System.out.println(Thread.currentThread().getName()+"\t"+"---默認版本號: "+stamp);
//上前面的t3完成ABA問題
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
boolean result = atomicStampedReference.compareAndSet(100, 20210308, stamp, stamp + 1);
System.out.println(Thread.currentThread().getName()+"\t"+"---操作成功否:"+result+"\t"+atomicStampedReference.getStamp()+"\t"+atomicStampedReference.getReference());
},"t4").start();
}
public static void abaProblem() {
new Thread(() -> {
atomicInteger.compareAndSet(100,101);
atomicInteger.compareAndSet(101,100);
},"t1").start();
//暫停毫秒
try { TimeUnit.MILLISECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); }
new Thread(() -> {
boolean b = atomicInteger.compareAndSet(100, 20210308);
System.out.println(Thread.currentThread().getName()+"\t"+"修改成功否:"+b+"\t"+atomicInteger.get());
},"t2").start();
}
}
AtomicMarkableReference
原子更新帶有標記位的引用類型對象,它的定義就是將狀態(tài)戳簡化為true|false,
可以理解為上面AtomicStampedReference的簡化版,就是不關(guān)心修改過幾次,僅僅關(guān)心是否修改過。因此變量mark是boolean類型,僅記錄值是否有過修改。不建議使用。
demo
public class ABADemo {
static AtomicMarkableReference markableReference = new AtomicMarkableReference(100,false);
public static void main(String[] args) {
System.out.println("============AtomicMarkableReference不關(guān)心引用變量更改過幾次,只關(guān)心是否更改過======================");
new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName()+"\t 1次版本號"+marked);
try { TimeUnit.MILLISECONDS.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); }
markableReference.compareAndSet(100,101,marked,!marked);
System.out.println(Thread.currentThread().getName()+"\t 2次版本號"+markableReference.isMarked());
markableReference.compareAndSet(101,100,markableReference.isMarked(),!markableReference.isMarked());
System.out.println(Thread.currentThread().getName()+"\t 3次版本號"+markableReference.isMarked());
},"t5").start();
new Thread(() -> {
boolean marked = markableReference.isMarked();
System.out.println(Thread.currentThread().getName()+"\t 1次版本號"+marked);
//暫停幾秒鐘線程
try { TimeUnit.MILLISECONDS.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); }
markableReference.compareAndSet(100,2020, marked, !marked);
System.out.println(Thread.currentThread().getName()+"\t"+markableReference.getReference()+"\t"+markableReference.isMarked());
},"t6").start();
}
}
對象的屬性修改原子類
- AtomicIntegerFieldUpdater:原子更新對象中int類型字段的值
- AtomicLongFieldUpdater:原子更新對象中Long類型字段的值
- AtomicReferenceFieldUpdater:原子更新引用類型字段的值
為什么有這些東西?
使用目的:以一種線程安全的方式操作非線程安全對象內(nèi)的某些字段。
使用要求
- 更新的對象屬性必須使用 public volatile 修飾符。
- 因為對象的屬性修改類型原子類都是抽象類,所以每次使用都必須使用靜態(tài)方法newUpdater()創(chuàng)建一個更新器,并且需要設置想要更新的類和屬性。
demo
AtomicIntegerFieldUpdaterDemo
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
class BankAccount {
String bankName = "ccb";
//以一種線程安全的方式操作非線程安全對象內(nèi)的某些字段
//1 更新的對象屬性必須使用 public volatile 修飾符。
public volatile int money = 0;
//2 因為對象的屬性修改類型原子類都是抽象類,所以每次使用都必須
// 使用靜態(tài)方法newUpdater()創(chuàng)建一個更新器,并且需要設置想要更新的類和屬性。
private static final AtomicIntegerFieldUpdater<BankAccount> FieldUpdater = AtomicIntegerFieldUpdater.newUpdater(BankAccount.class,"money");
public void transfer(BankAccount bankAccount) {
FieldUpdater.incrementAndGet(bankAccount);
}
}
public class AtomicIntegerFieldUpdaterDemo {
public static void main(String[] args) throws InterruptedException {
BankAccount bankAccount = new BankAccount();
for (int i = 1; i <=1000; i++) {
new Thread(() -> {
bankAccount.transfer(bankAccount);
},String.valueOf(i)).start();
}
//暫停幾秒鐘線程
try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---bankAccount: "+bankAccount.money);
}
}
AtomicReferenceFieldUpdater
package com.atguigu.juc.atomics;
import lombok.Data;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@Data
class MyVar {
public volatile String isInit = "111";
private static final AtomicReferenceFieldUpdater<MyVar,String> FieldUpdater = AtomicReferenceFieldUpdater.newUpdater(MyVar.class,String.class,"isInit");
public void init(MyVar myVar) {
if(FieldUpdater.compareAndSet(myVar,"111", "222")) {
System.out.println(Thread.currentThread().getName()+"\t"+"---start init");
try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); }
System.out.println(Thread.currentThread().getName()+"\t"+"---end init -- " + myVar.getIsInit());
}else{
System.out.println(Thread.currentThread().getName()+"\t"+"---搶奪失敗,已經(jīng)有線程在修改中 --" + myVar.getIsInit());
}
}
}
/**
*
* 多線程并發(fā)調(diào)用一個類的初始化方法,如果未被初始化過,將執(zhí)行初始化工作,要求只能初始化一次
*/
public class AtomicReferenceFieldUpdaterDemo {
public static void main(String[] args) {
MyVar myVar = new MyVar();
for (int i = 1; i <=5; i++) {
new Thread(() -> {
myVar.init(myVar);
},String.valueOf(i)).start();
}
}
}
關(guān)聯(lián)面試題
- 面試官問你:你在哪里用了volatile :AtomicReferenceFieldUpdater :
- 既然已經(jīng)有了AtomicInteger,為什么又多此一舉弄出個AtomicIntegerFieldUpdater來呢?其實主要有兩方面原因:
- 要使用AtomicInteger需要修改代碼,將原來int類型改造成AtomicInteger,使用該對象的地方都要進行調(diào)整(多進行一次get()操作獲取值),但是有時候代碼不是我們想改就能改動的。
- 也是比較重要的一個特性,AtomicIntegerFieldUpdater可以節(jié)省內(nèi)存消耗。
- 引用:https://juejin.cn/post/6907610980428021773#comment
原子操作增強類
分類
- DoubleAccumulator
- DoubleAdder
- LongAccumulator
- LongAdder
前言
- 熱點商品點贊計算器,點贊數(shù)加加統(tǒng)計,不要求實時精確
- 一個很大的List,里面都是int類型,如何實現(xiàn)加加,說說思路
入門講解
- LongAdder只能用來計算加法,且從零開始計算
- LongAccumulator 提供了自定義的函數(shù)操作
demo
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.LongBinaryOperator;
public class LongAdderAPIDemo {
public static void main(String[] args) {
LongAdder longAdder = new LongAdder();//只能做加法
longAdder.increment();
longAdder.increment();
longAdder.increment();
System.out.println(longAdder.longValue());
LongAccumulator longAccumulator = new LongAccumulator((left, right) -> left - right, 100);
longAccumulator.accumulate(1);//1
longAccumulator.accumulate(2);//3
longAccumulator.accumulate(3);//6
System.out.println(longAccumulator.longValue());
}
}
demo--LongAdder高性能對比Code
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
class ClickNumber {
int number = 0;
public synchronized void add_Synchronized() {
number++;
}
AtomicInteger atomicInteger = new AtomicInteger();
public void add_AtomicInteger() {
atomicInteger.incrementAndGet();
}
AtomicLong atomicLong = new AtomicLong();
public void add_AtomicLong() {
atomicLong.incrementAndGet();
}
LongAdder longAdder = new LongAdder();
public void add_LongAdder() {
longAdder.increment();
//longAdder.sum();
}
LongAccumulator longAccumulator = new LongAccumulator(Long::sum,0);
public void add_LongAccumulator() {
longAccumulator.accumulate(1);
}
}
/**
*
* 50個線程,每個線程100W次,總點贊數(shù)出來
*/
public class LongAdderCalcDemo {
public static final int SIZE_THREAD = 50;
public static final int _1W = 10000;
public static void main(String[] args) throws InterruptedException {
ClickNumber clickNumber = new ClickNumber();
long startTime;
long endTime;
CountDownLatch countDownLatch1 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch2 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch3 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch4 = new CountDownLatch(SIZE_THREAD);
CountDownLatch countDownLatch5 = new CountDownLatch(SIZE_THREAD);
//========================
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try {
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_Synchronized();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch1.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch1.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_Synchronized"+"\t"+clickNumber.number);
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try {
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_AtomicInteger();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch2.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch2.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicInteger"+"\t"+clickNumber.atomicInteger.get());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try {
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_AtomicLong();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch3.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch3.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_AtomicLong"+"\t"+clickNumber.atomicLong.get());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try {
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_LongAdder();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch4.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch4.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAdder"+"\t"+clickNumber.longAdder.longValue());
startTime = System.currentTimeMillis();
for (int i = 1; i <=SIZE_THREAD; i++) {
new Thread(() -> {
try {
for (int j = 1; j <=100 * _1W; j++) {
clickNumber.add_LongAccumulator();
}
}catch (Exception e){
e.printStackTrace();
}finally {
countDownLatch5.countDown();
}
},String.valueOf(i)).start();
}
countDownLatch5.await();
endTime = System.currentTimeMillis();
System.out.println("----costTime: "+(endTime - startTime) +" 毫秒"+"\t add_LongAccumulator"+"\t"+clickNumber.longAccumulator.longValue());
}
}
結(jié)果
源碼、原理分析
官方api
這個類是通常優(yōu)選AtomicLong當多個線程更新時使用,用于諸如收集統(tǒng)計信息,不用于細粒度同步控制的共同總和。 在低更新爭議下,這兩類具有相似的特征。 但是,在高度爭議的情況下,這一類的預期吞吐量明顯高于犧牲更高的空間消耗。
LongAdder是Striped64的子類,Striped64有幾個比較重要的成員函數(shù)
- base變量:非競爭狀態(tài)條件下,直接累加到該變量上
- Cell[ ]數(shù)組:競爭條件下(高并發(fā)下),累加各個線程自己的槽Cell[i]中
/** Number of CPUS, to place bound on table sizeCPU數(shù)量,即cells數(shù)組的最大長度 */
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
cells數(shù)組,為2的冪,2,4,8,16.....,方便以后位運算
*/
transient volatile Cell[] cells;
/**基礎(chǔ)value值,當并發(fā)較低時,只累加該值主要用于沒有競爭的情況,通過CAS更新。
* Base value, used mainly when there is no contention, but also as
* a fallback during table initialization races. Updated via CAS.
*/
transient volatile long base;
/**創(chuàng)建或者擴容Cells數(shù)組時使用的自旋鎖變量調(diào)整單元格大?。〝U容),創(chuàng)建單元格時使用的鎖。
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int cellsBusy;
cell 是java.util.concurrent.atomic 下 Striped64 的一個內(nèi)部類
LongAdder為什么這么快-分散熱點
LongAdder在無競爭的情況,跟AtomicLong一樣,對同一個base進行操作。
當出現(xiàn)競爭關(guān)系時則采用化整為零的做法,用空間換時間,用一個數(shù)組cells將一個value拆分進這個數(shù)組cells。多個線程需要同時對value進行操作時候,
對線程id進行hash,再根據(jù)hash值映射到這個數(shù)組cells的某個下標,再對該下標所對應的值進行自增操作。當所有線程操作完畢,將數(shù)組cells的所有值和無競爭值base都加起來作為最終結(jié)果。
sum( )會將所有cell數(shù)組中的value和base累加作為返回值,核心的思想就是將之前AtomicLong一個value的更新壓力分散到多個value中去,從而降級更新熱點。