高并发系列——CAS操作及CPU底层操作解析
CAS(Compare-and-Swap),即比較并替換,是一種實現并發算法時常用到的技術,Java并發包中的很多類都使用了CAS技術。CAS也是現在面試經常問的問題,本文將深入的介紹CAS的原理。
一、簡單實例
一個方法實現i++時,如下面的getAndIncrement1()方法,多線程調用時,會出現并發安全問題。這時,可以通過synchronized關鍵字顯式加鎖保證并發安全。
在JAVA并發包中,提供了很多的Atomic原子類,也可以保證線程安全,比如getAndIncrement2()中的AtomicInteger.getAndIncrement()方法。
package com.wuxiaolong.concurrent;import java.util.concurrent.atomic.AtomicInteger;/*** Description:** @author 諸葛小猿* @date 2020-09-14*/ public class Test1 {public static int i = 0;/*** 加鎖保證多線程調用的并發安全* @return*/public synchronized static int getAndIncrement1(){i++;return i;}/*** AtomicInteger是線程安全的 * @return*/public static int getAndIncrement2(){AtomicInteger ai = new AtomicInteger();int in = ai.getAndIncrement();return in;} }二、什么是CAS
CAS:compare and swap或compare and exchange,比較和交互。作用是保證在沒有鎖的情況下,多個線程對一個值的更新是線程安全的。
CAS過程:它包含三個參數CAS(V,E,N),V表示要更新的變量,E表示預期值,N表示新值,僅當V值等于E值時,才會將V值設置為N值,如果V值和E值不同,說明已經有其他線程做了更新,則當前線程什么都不做。
如果使用CAS實現i++這個操作,可以有如下幾步:
1.讀取當前i的值(比如i=0),記為E
2.計算i++的值為1,記為V
3.再次讀取i的值,記為N。
4.如果E==N,則將i的值更新為1,否則不更新。
CAS的過程很像樂觀鎖,樂觀鎖認為發生線程安全問題的概率比較小,所以不用直接加鎖,只是更新數據時比較一下原來的數據有沒有發生變化。
上面第四步中,如果E==N,并不能說明i的值沒有改變過,可能一個線程執行第四步的時候,另一個線程將i改變后又變回來了,對于第一個線程來說,并不知道這個中間過程的存在。這個現象就是ABA問題。
ABA問題如何解決?其實也很簡單,給i加一個版本號字段,每次i有變化都對版本號加1,每次更新i的時候除了比較E值,還比較版本號是否一致。這樣就解決了ABA問題。在實際開發過程中,如果ABA問題對業務沒有影響,就不用考慮這個問題。
三、CAS在AtomicInteger中的使用
CAS的底層實現、synchronized的底層實現、volatile的底層實現都是一樣的。我們以上面提到的AtomicInteger類說明。
AtomicInteger是線程安全的,通常說AtomicInteger是無鎖或自旋鎖。這就是CAS在JDK中的應用。
AtomicInteger.getAndIncrement()方法的源碼:
/*** Atomically increments by one the current value.** @return the previous value*/public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);}Unsafe.getAndAddInt()源碼:
public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}Unsafe.compareAndSwapInt()源碼:
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);這個方法是native修飾的,在JDK中就看不到源碼實現了。由于java代碼是在JVM中執行的,Oracle的JVM是Hotspot, 如果要看native方法的實現,可以找Hotspot的源碼,這個源碼是C和C++寫的。Unsafe.java這個類的源碼對應Hotspot源碼中unsafe.cpp,這個是C++寫的。
四、CAS的底層實現
CAS的底層實現,就必須了解Hotspot的源碼。可以在查看OpenJdk的代碼,這里就可以找到各種版本的源碼。我們以jdk8u為中的unsafe.cpp為例,繼續分析compareAndSwapInt方法。
// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/prims/unsafe.cppUNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))UnsafeWrapper("Unsafe_CompareAndSwapInt");oop p = JNIHandles::resolve(obj);jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);return (jint)(Atomic::cmpxchg(x, addr, e)) == e; UNSAFE_END可以看到調用到了Atomic::cmpxchg方法,繼續分析找到這個方法:
// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/runtime/atomic.cppjbyte Atomic::cmpxchg(jbyte exchange_value, volatile jbyte* dest, jbyte compare_value) {assert(sizeof(jbyte) == 1, "assumption.");uintptr_t dest_addr = (uintptr_t)dest;uintptr_t offset = dest_addr % sizeof(jint);volatile jint* dest_int = (volatile jint*)(dest_addr - offset);jint cur = *dest_int;jbyte* cur_as_bytes = (jbyte*)(&cur);jint new_val = cur;jbyte* new_val_as_bytes = (jbyte*)(&new_val);new_val_as_bytes[offset] = exchange_value;while (cur_as_bytes[offset] == compare_value) {//關鍵方法jint res = cmpxchg(new_val, dest_int, cur); if (res == cur) break;cur = res;new_val = cur;new_val_as_bytes[offset] = exchange_value;}return cur_as_bytes[offset]; }各種系統下各種cpu架構,都有相關的實現方法,具體的文件名稱如下:
// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/share/vm/runtime/atomic.inline.hpp#ifndef SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP #define SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP#include "runtime/atomic.hpp"// Linux #ifdef TARGET_OS_ARCH_linux_x86 # include "atomic_linux_x86.inline.hpp" #endif #ifdef TARGET_OS_ARCH_linux_sparc # include "atomic_linux_sparc.inline.hpp" #endif #ifdef TARGET_OS_ARCH_linux_zero # include "atomic_linux_zero.inline.hpp" #endif #ifdef TARGET_OS_ARCH_linux_arm # include "atomic_linux_arm.inline.hpp" #endif #ifdef TARGET_OS_ARCH_linux_ppc # include "atomic_linux_ppc.inline.hpp" #endif// Solaris #ifdef TARGET_OS_ARCH_solaris_x86 # include "atomic_solaris_x86.inline.hpp" #endif #ifdef TARGET_OS_ARCH_solaris_sparc # include "atomic_solaris_sparc.inline.hpp" #endif// Windows #ifdef TARGET_OS_ARCH_windows_x86 # include "atomic_windows_x86.inline.hpp" #endif// AIX #ifdef TARGET_OS_ARCH_aix_ppc # include "atomic_aix_ppc.inline.hpp" #endif// BSD #ifdef TARGET_OS_ARCH_bsd_x86 # include "atomic_bsd_x86.inline.hpp" #endif #ifdef TARGET_OS_ARCH_bsd_zero # include "atomic_bsd_zero.inline.hpp" #endif#endif // SHARE_VM_RUNTIME_ATOMIC_INLINE_HPP在src/os_cpu/目錄下面有各種系統下各種cpu架構的代碼實現,其中src/os_cpu/linux_x86/vm下是基于linux下x86架構的代碼,cmpxchg方法的最終實現:
// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hppinline jint Atomic::cmpxchg (jint exchange_value, volatile jint* dest, jint compare_value) {int mp = os::is_MP();__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)": "=a" (exchange_value): "r" (exchange_value), "a" (compare_value), "r" (dest), "r" (mp): "cc", "memory");return exchange_value; }其中__asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"一段代碼是核心,asm指匯編語言,是機器語言,直接和cpu交互。
LOCK_IF_MP指“如果是多個CPU就加鎖”,MP指Multi-Processors。程序會根據當前處理器的數量來決定是否為cmpxchg指令添加lock前綴。如果程序是在多處理器上運行,就為cmpxchg指令加上lock前綴(lock cmpxchg)。反之,如果程序是在單處理器上運行,就省略lock前綴(單個處理器自身會維護單處理器內的順序一致性,不需要lock前綴提供的內存屏障效果)。
// 地址:http://hg.openjdk.java.net/jdk8u/jdk8u/hotspot/file/a0eb08e2db5a/src/os_cpu/linux_x86/vm/atomic_linux_x86.inline.hpp// Adding a lock prefix to an instruction on MP machine #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "總結
從上面可以看出,CAS的本質就是:
lock cmpxchg 指令
但是cmpxchg這條cpu指令本身并不是原子性的,還是依賴了前面的lock指令。
關注公眾號,輸入“java-summary”即可獲得源碼。
完成,收工!
【傳播知識,共享價值】,感謝小伙伴們的關注和支持,我是【諸葛小猿】,一個彷徨中奮斗的互聯網民工。
總結
以上是生活随笔為你收集整理的高并发系列——CAS操作及CPU底层操作解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 类和对象——初入江湖
- 下一篇: 如何引爆社群互动