在面試官面前我是這樣介紹CAS的
如何回答什么是CAS?
CAS是Compare And Swap的簡稱,單從字面理解是比較并替換,實際指的是Unsafe類中的三個方法compareAndSwapObject,compareAndSwapInt,compareAndSwapLong,三個方法分別是以比較并替換的方式對Object類型的數據,對int類型的數據,對long類型的數據保證其操作的原子性。
在CAS比較并替換的邏輯中有三個重要的概念:預估值,內存值,更新值,而比較替換的邏輯為:如果預估值等于內存值,則將內存值更新為更新值,否則就不更新。
比較和替換這兩個動作,無論是在java層面實現還是在jvm層面實現在不加鎖的情況下都是無法完全保證原子性的,因此不得不依賴硬件對于并發(fā)的支持,即操作系統(tǒng)底層提供了cmpxchg指令,這一個指令就可以完成比較和替換兩個動作。那么即便是將兩個動作濃縮到一個匯編指令里面,就能保證原子性嗎?
答案是肯定的,這是因為計算機硬件自身支持一個匯編指令執(zhí)行過程是不允許被中斷的,這兩個動作已經濃縮到了cmpxchg這一個匯編指令里面了,不能中斷意味著這兩個動作必須在同個cpu時間片段內完成,一氣呵成,所以cmpxchg指令本身就具備了原子性。
但是不同的平臺實現cas有不同的方式。這個匯編指令的名字可能也會不同,這個需要注意。
分析:
分析CAS前必須先明白原子性是什么。
什么是原子性
原子性:指事務的不可分割性,一個事務的所有操作要么不間斷地全部被執(zhí)行,要么一個也沒有執(zhí)行。
前置了解
我們都知道所有的程序都是運行在cpu上的,cpu執(zhí)行的是機器碼,如00,01,0A。因為這些指令操作起來太麻煩且不好記憶,所以后來有人發(fā)明了更加方便操作和好懂的匯編語言,匯編是一種低級語言,這里可以理解為機器碼對外所呈現的樣子,也可以干脆理解為cpu執(zhí)行的就是匯編語言?;久糠N平臺都實現了自己的匯編語言,所以不同平臺的匯編語言是不能通用的。
匯編和機器碼的對應關系如下:
ADD reg8/mem8,reg8 對應 00
ADD reg16/mem16,reg16 對應 01
OR reg8,reg8/mem8 對應 0A
OR reg16,reg16/mem16 對應 0B
jvm是一個虛擬機,它有一套自己的字節(jié)碼指令,它有一套內存管理機制和垃圾回收機制,它有自己的運行機制,它有類似與寄存器的操作數棧,它自身就是一臺計算機,它專門運行java語言,java語言依靠jvm運行,需要先編譯成字節(jié)碼指令,就像C++語言運行要先編譯成匯編語言一樣。
jvm是C實現的,歸根結底要運行在cpu上,所以我們可以知道java語言的運行過程為:java編譯成c++,再編譯成匯編,再編譯成機器碼。
通過一個例子來看下java代碼到機器碼的翻譯過程:
java代碼:i++
java代碼編譯成字節(jié)碼反編譯的代碼:getstatic i 獲取i的值 iconst_1 準備常量1 iadd 加1 putstatic 將修改后的值寫回變量i
iadd會被jvm翻譯成類似于下面的匯編代碼:mov (%rsp),%edx add $0x8,%rsp add %edx,%eax
而匯編指令add對應的二進制機器碼為00,01等。
現在來理解下原子性概念中的不可分割和不間斷執(zhí)行
不可分割性就是原子性。而事物的不可分割則是事務不受外界影響。
不管是java還是c++代碼,最終運行的時候都要被編譯成機器碼,而機器碼是cpu運行的基本單元,所以只有一個機器碼指令才是天然的不可分割,而為了方便開發(fā)和理解,每個平臺都有一套對應機器碼的匯編語言,以linux為例,操作系統(tǒng)會保證每個匯編指令的執(zhí)行都是不允許被中斷的,也就是一個匯編指令也是不可分割的。
多個指令就一定是分割的嗎?
不一定,只要多個指令能夠不間斷執(zhí)行就可以認為是沒有被分割的。
那如何才算不間斷執(zhí)行呢?
不間斷就是幾個指令能夠順序執(zhí)行,且執(zhí)行過程中或者線程不可中斷,或者線程中斷后不受其他線程影響。
那什么是中斷呢?
中斷就是線程被掛起。
那什么時候線程才會被掛起呢?
java程序員都知道,線程執(zhí)行過程中遇到鎖的時候,如果沒有獲取到鎖就會阻塞掛起。當調用wait方法,join方法,park方法的時候都會進入阻塞掛起的狀態(tài),但是這些狀態(tài)都是程序員人為的,是可以避免的。
但是有一種掛起是不可避免的,我們知道cpu是串行的運行模式,一個時間點上只能運行一個線程。為了讓所有的線程看起來都是在同時運行,操作系統(tǒng)的機制是將cpu的執(zhí)行時間分成很多個細小的時間片段,由操作系統(tǒng)為每個線程分配時間片段,當輪到某個時間片段執(zhí)行時,綁定此時間片段的線程才會被執(zhí)行,當這個時間片段用完,當前的線程如果還沒有執(zhí)行完的話就會被掛起,等待再次被調度執(zhí)行。這個過程中線程就被中斷了。
中斷后不受其他線程影響怎么理解?
例如synchronized代碼塊,雖然線程在執(zhí)行代碼塊邏輯的時候會被cpu時間片段調度中斷,但是synchronized關鍵字通過加鎖的方式只允許一個線程進入代碼塊邏輯,這就保證了當前線程在運行代碼塊中邏輯的時候雖然會中斷,但是不受其他線程的影響。
不能看出上面的說不可中斷和中斷不受其他線程影響對應的正是cas的方式和加鎖的方式實現原子性。
本篇我們只看CAS方式
CAS實現原理
計算機做了什么?
我們知道了cas最終是靠計算機底層原語cmpxchg支持,下面就是linux_x86底層的匯編實現。
linux_x86底層實現
\hotspot\src\os_cpu\linux_x86\vm\atomic_linux_x86.inline.hpp
inline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {
int mp = os::is_MP(); // 內聯函數,用來判斷當前系統(tǒng)是否為多處理器
__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
: "=a" (exchange_value)
: "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp)
: "cc", "memory");
return exchange_value;
}
在這個方法中最關鍵的是一行代碼是:LOCK_IF_MP(%4) "cmpxchgl %1,(%3),cmpxchgl關鍵字就是匯編指令原語,這個原語在不同的計算機的實現不一樣,在linux_x86就叫做cmpxchgl。
而cmpxchg方法可以理解為linux_x86平臺對外提供cas支持的API。
這段代碼中我們看到LOCK_IF_MP關鍵字,看到lock一般我們會想到加鎖,這里確實是加鎖的意思,或許你會說cas操作為什么還要加鎖,如果這樣,那直接用加鎖的方式實現原子性不就可以了,這段代碼的邏輯其實是先判斷是否為多核處理器,如果是多核就會加鎖,如果單核就不加鎖。而這個加鎖的實現根據系統(tǒng)平臺的不同會有不同的實現方式,大致分為鎖總線和鎖緩存。
我們說了,一條機器指令(或者說匯編指令)一定能在一個cpu時間片段內完成,如果兩個線程占據兩個時間片,這個兩個時間片段都是要執(zhí)行同一個指令,當一個時間片段執(zhí)行完,才能執(zhí)行下一個時間片段,這樣看起來單個指令的執(zhí)行是串行的,不會有問題,但是現在的處理器一般都會存在多核,甚至多cpu,每個核都會有自己的緩存,所以,多核情況下僅僅靠cas是無法保證原子性的,操作系統(tǒng)內部通過鎖來規(guī)避。
這屬于操作系統(tǒng)為實現更高效而不得不付出的復雜性。這里牽扯到緩存一致性協議,介紹操作系統(tǒng)的時候再細說。
JVM做了什么?
既然計算機提供了cas支持,那么應用程序怎么應用呢,java虛擬機為java實現提供了支持。
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
oop p = JNIHandles::resolve(obj); //查找要指定的對象
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset); //獲取要操作的是對象的字段的內存地址
return (jint)(Atomic::cmpxchg(x, addr, e)) == e; //執(zhí)行Atomic類中的cmpxchg
UNSAFE_END
以上源碼中真正實現cas的代碼為:
Atomic::cmpxchg(x, addr, e)
不難看出,x為新值,addr為地址,e為期望值
jvm底層調用了匯編代碼中的方法cmpxchg,這個方法就是上面匯編代碼中方法。
上面這個方法是jvm底層實現,也是jvm對于cas的支持,jvm對外也提供了API,只不過是以本地方法的形式提供給java使用,它的體現就是 Unsafe類下面提供的三個本地方法compareAndSwapObject,compareAndSwapInt,compareAndSwapLong。
圖片
這個幾個方法的具體實現都在jvm內部。上面的jvm代碼對應的就是compareAndSwapInt本地方法的具體實現,其他方法也都大同小異。
Unsafe類是java層面提供的用于內存管理的類,除內存操作方法外,同時還提供線程調度操作,,內存屏障相關操作等,但是它不是應用類加載器加載的,因此程序員是不能直接使用的。
我們來解釋下這幾個參數:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
var1:要修改的對象起始地址 如:0x00000111
var2:需要修改的具體內存地址 如100 。0x0000011+100 = 0x0000111就是要修改的值的地址
注意沒有var3
var4:期望內存中的值,拿這個值和0x0000111內存中的中值比較,如果為true,則修改,返回ture,否則返回false,等待下次修改。
var5:如果上一步比較為ture,則把var5更新到0x0000111其實的內存中。
原子操作,直接操作內存。
JAVA做了什么?
java提供了java.util.concurrent.atomic包,包下面提供了各種原子類,如下圖。
圖片
這些原子類底層基本都是依賴Unsafe類的三個方法實現。
接下來以AtomicInteger來介紹
先來看例子:
static int i=0;
public static void main(String[] args) throws IOException, InterruptedException {
Thread T1=new Thread(new Runnable(){
@Override
public void run(){
for(int n=0;n<10000;n++){
i++;
}
}
});
Thread T2=new Thread(new Runnable(){
@Override
public void run(){
for(int n=0;n<10000;n++){
i++;
}
}
});
T1.start();
T2.start();
T1.join();
T2.join();
System.out.println(i);
大家都清楚這段代碼最終打印的結果不一定是20000,因為i++這個操作不是原子性的。
上述代碼的邏輯我們替換為AtomicInteger來實現
static AtomicInteger atomic=new AtomicInteger();
public static void main(String[] args) throws IOException, InterruptedException {
Thread T1=new Thread(new Runnable(){
@Override
public void run(){
for(int n=0;n<10000;n++){
atomic.getAndAdd(1);
}
}
});
Thread T2=new Thread(new Runnable(){
@Override
public void run(){
for(int n=0;n<10000;n++){
atomic.getAndAdd(1);
}
}
});
T1.start();
T2.start();
T1.join();
T2.join();
System.out.println(atomic);
這段代碼實現得到的結果是正確的20000。
先來看看AtomicInteger類的源碼(只貼出主要代碼)
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
private static final Unsafe unsafe = Unsafe.getUnsafe();
private volatile int value;
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) {
throw new Error(ex);
}
}
public AtomicInteger(int initialValue) {
value = initialValue;
}
public AtomicInteger() {
}
private volatile int value;
public final int getAndAdd(int delta) {
return unsafe.getAndAddInt(this, valueOffset, delta);
}
}
先來解釋下getAndAddInt方法中的這幾個參數
value是AtomicInteger對象的成員變量,它就是實際要被操作的變量;
this是指當前AtomicInteger對象;
valueOffse是指value這個屬性所代表的具體值位于AtomicInteger對象所分配的內存空間的偏移量。
從上面的代碼中可以看出valueOffse的值是通過靜態(tài)方法確定的,也就是說在創(chuàng)建AtomicInteger對象前這個值就已經確定了且是固定的。之所以能固定下來,是因為java中的類在類加載的時候就已經確定了類中的成員變量所處整個內存空間的地址。所以同個類下的所有對象的這個偏移量值都是一樣的。
Unsafe類中的getAndAddInt方法的源碼如下
public final int getAndAddInt(Object var1, long var2, int var4) {
int var5;
do {
var5 = this.getIntVolatile(var1, var2);
} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));
return var5;
}
這個方法中var1就是AtomicInteger對象本身,var2是偏移量,var4是要加加的值。var5是查詢出的當前AtomicInteger對象內存空間在valueOffse偏移量處的值是多少。
getAndAddInt方法的整體邏輯就是先獲取當前內存值作為預估值,利用compareAndSwapInt方法進行cas,即比較預估值和計算底層的內存值,如果相等就用更新值替換內存值,如果不等就返回false,然后重復上面的步驟,直到替換成功。
借助這個思想,也可以解決一些數據庫層面的問題
int c=0;
while(c=0){
1 select ov from t where id=1;
2 int nv=ov*x;
3 c = update t set ov=nv where id=1 and ov=ov;
}
CAS的問題
不支持高并發(fā)
CAS只能用于并發(fā)不是很高的場景,java中的源碼我們看到了,一般為了保證一定能替換成功需要在cas外套一個循環(huán),如果并發(fā)很高,處于循環(huán)中的線程很多,就會導致cpu飆升,一般并發(fā)很高的場景需要用鎖解決。
ABA問題
使用CAS需要注意ABA問題,所謂ABA問題其實很簡單,先羅列下步驟
獲取:獲取內存值作為預期值
比較:預期值和內存值;
替換:內存值=更新值;
如果在獲取和比較之間內存值由1被改為2,又由2被改回了1,這種情況下是不會影響后面的比較和替換的。這便是ABA問題。可能結果上沒有問題,但是在邏輯上是有問題的。不能確保所有的場景都有問題。
ABA問題的解決
jdk提供 AtomicStampedReference 原子類解決ABA問題。
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return
expectedReference == current.reference &&
expectedStamp == current.stamp &&
((newReference == current.reference &&
newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}
private boolean casPair(Pair<V> cmp, Pair<V> val) {
return UNSAFE.compareAndSwapObject(this, pairOffset, cmp, val);
}
pair是一個靜態(tài)內部類,也是在類加載的時候計算固定的偏移量,只不過這個內部類中有兩個屬性,一個是具體的值,一個是時間戳。
通過compareAndSwapObject進行cas操作。
不難看出,原來是通過比較value的值容易出現ABA問題,現在是通過compareAndSwapObject方法比較value和時間戳來判斷是否要替換,替換的時候,會把value和時間戳都替換。