Java Review - 并发编程_原子操作类LongAdder LongAccumulator剖析
文章目錄
- 概述
- 小Demo
- 源碼分析
- 重要的方法
- long sum()
- reset
- sumThenReset
- longValue()
- add(long x)
- longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
- LongAdder 小結
- LongAccumulator
- 概述
- LongAdder#add vs LongAccumulator#accumulate
- 小Demo
- 原子類總結
概述
Java Review - 并發編程_原子操作類原理剖析中提到了 AtomicLong通過CAS提供了非阻塞的原子性操作,相比使用阻塞算法的同步器來說它的性能已經很好了,但是JDK開發組并不滿足于此。使用AtomicLong時,在高并發下大量線程會同時去競爭更新同一個原子變量,但是由于同時只有一個線程的CAS操作會成功,這就造成了大量線程競爭失敗后,會通過無限循環不斷進行自旋嘗試CAS的操作,而這會白白浪費CPU資源。
因此JDK 8新增了一個原子性遞增或者遞減類LongAdder用來克服在高并發下使用AtomicLong的缺點。
既然AtomicLong的性能瓶頸是由于過多線程同時去競爭一個變量的更新而產生的,那么如果把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源,是不是就解決了性能問題?是的,LongAdder就是這個思路。
下面通過圖來理解兩者設計的不同之處
-
使用LongAdder時,則是在內部維護多個Cell變量,每個Cell里面有一個初始值為0的long型變量,這樣,在同等并發量的情況下,爭奪單個變量更新操作的線程量會減少,這變相地減少了爭奪共享資源的并發量。
-
另外,多個線程在爭奪同一個Cell原子變量時如果失敗了,它并不是在當前Cell變量上一直自旋CAS重試,而是嘗試在其他Cell的變量上進行CAS嘗試,這個改變增加了當前線程重試CAS成功的可能性。
-
最后,在獲取LongAdder當前值時,是把所有Cell變量的value值累加后再加上base返回的。
LongAdder維護了一個延遲初始化的原子性更新數組(默認情況下Cell數組是null)和一個基值變量base。由于Cells占用的內存是相對比較大的,所以一開始并不創建它,而是在需要時創建,也就是惰性加載。
當一開始判斷Cell數組是null并且并發線程較少時,所有的累加操作都是對base變量進行的。保持Cell數組的大小為2的N次方,在初始化時Cell數組中的Cell元素個數為2,數組里面的變量實體是Cell類型。Cell類型是AtomicLong的一個改進,用來減少緩存的爭用,也就是解決偽共享問題。
對于大多數孤立的多個原子操作進行字節填充是浪費的,因為原子性操作都是無規律地分散在內存中的(也就是說多個原子性變量的內存地址是不連續的),多個原子變量被放入同一個緩存行的可能性很小。但是原子性數組元素的內存地址是連續的,所以數組內的多個元素能經常共享緩存行,因此這里使用@sun.misc.Contended注解對Cell類進行字節填充,這防止了數組中多個元素共享一個緩存行,在性能上是一個提升。
小Demo
import java.util.concurrent.atomic.LongAdder;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/11/30 22:52* @mark: show me the code , change the world*/ public class AtomicLongTest {//(10)創建Long型原子計數器// private static AtomicLong atomicLong = new AtomicLong();private static LongAdder longAdder = new LongAdder();//(11)創建數據源private static Integer[] arrayOne = new Integer[]{0, 1, 2, 3, 0, 5, 6, 0, 56, 0};private static Integer[] arrayTwo = new Integer[]{10, 1, 2, 3, 0, 5, 6, 0, 56, 0};public static void main(String[] args) throws InterruptedException {//(12)線程one統計數組arrayOne中0的個數Thread threadOne = new Thread(() -> {int size = arrayOne.length;for (int i = 0; i < size; ++i) {if (arrayOne[i].intValue() == 0) {longAdder.increment();}}});//(13)線程two統計數組arrayTwo中0的個數Thread threadTwo = new Thread(() -> {int size = arrayTwo.length;for (int i = 0; i < size; ++i) {if (arrayTwo[i].intValue() == 0) {longAdder.increment();}}});//(14)啟動子線程threadOne.start();threadTwo.start();//(15)等待線程執行完畢threadOne.join();threadTwo.join();System.out.println("count 0:" + longAdder.sum());}}源碼分析
為了解決高并發下多線程對一個變量CAS爭奪失敗后進行自旋而造成的降低并發性能問題,LongAdder在內部維護多個Cell元素(一個動態的Cell數組)來分擔對單個變量進行爭奪的開銷。
先來思考幾個問題
(1)LongAdder的結構是怎樣的?
(2)當前線程應該訪問Cell數組里面的哪一個Cell元素?
(3)如何初始化Cell數組?
(4)Cell數組如何擴容?
(5)線程訪問分配的Cell元素有沖突后如何處理?
(6)如何保證線程操作被分配的Cell元素的原子性?
LongAdder類繼承自Striped64類,在Striped64內部維護著三個變量。
LongAdder的真實值其實是base的值與Cell數組里面所有Cell元素中的value值的累加,base是個基礎值,默認為0。
cellsBusy用來實現自旋鎖,狀態值只有0和1,當創建Cell元素,擴容Cell數組或者初始化Cell數組時,使用CAS操作該變量來保證同時只有一個線程可以進行其中之一的操作。
/*** Padded variant of AtomicLong supporting only raw accesses plus CAS.** JVM intrinsics note: It would be possible to use a release-only* form of CAS here, if it were provided.*/@sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class<?> ak = Cell.class;valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}-
Cell的構造很簡單,其內部維護一個被聲明為volatile的變量,這里聲明為volatile是因為線程操作value變量時沒有使用鎖,為了保證變量的內存可見性這里將其聲明為volatile的。
-
cas函數通過CAS操作,保證了當前線程更新時被分配的Cell元素中value值的原子性。
-
Cell類使用@sun.misc.Contended修飾是為了避免偽共享。
到這里我們回答了問題1和問題6。
重要的方法
long sum()
返回當前的值,內部操作是累加所有Cell內部的value值后再累加base。
/*** Returns the current sum. The returned value is <em>NOT</em> an* atomic snapshot; invocation in the absence of concurrent* updates returns an accurate result, but concurrent updates that* occur while the sum is being calculated might not be* incorporated.** @return the sum*/public long sum() {Cell[] as = cells; Cell a;long sum = base;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}由于計算總和時沒有對Cell數組進行加鎖,所以在累加過程中可能有其他線程對Cell中的值進行了修改,也有可能對數組進行了擴容,所以sum返回的值并不是非常精確的,其返回值并不是一個調用sum方法時的原子快照值。
reset
重置操作
/*** Resets variables maintaining the sum to zero. This method may* be a useful alternative to creating a new adder, but is only* effective if there are no concurrent updates. Because this* method is intrinsically racy, it should only be used when it is* known that no threads are concurrently updating.*/public void reset() {Cell[] as = cells; Cell a;base = 0L;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)a.value = 0L;}}}base置為0,如果Cell數組有元素,則元素值被重置為0
sumThenReset
sum的改造版本
/*** Equivalent in effect to {@link #sum} followed by {@link* #reset}. This method may apply for example during quiescent* points between multithreaded computations. If there are* updates concurrent with this method, the returned value is* <em>not</em> guaranteed to be the final value occurring before* the reset.** @return the sum*/public long sumThenReset() {Cell[] as = cells; Cell a;long sum = base;base = 0L;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null) {sum += a.value;a.value = 0L;}}}return sum;}在使用sum累加對應的Cell值后,把當前Cell的值重置為0, base重置為0。這樣,當多線程調用該方法時會有問題,比如考慮第一個調用線程清空Cell的值,則后一個線程調用時累加的都是0值。
longValue()
等價于sum
/*** Equivalent to {@link #sum}.** @return the sum*/public long longValue() {return sum();}add(long x)
/*** Adds the given value.** @param x the value to add*/public void add(long x) {Cell[] as; long b, v; int m; Cell a;if ((as = cells) != null || !casBase(b = base, b + x)) { // 1 boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 || // 2 (a = as[getProbe() & m]) == null || // 3!(uncontended = a.cas(v = a.value, v + x))) // 4 longAccumulate(x, null, uncontended); // 5}}/*** CASes the base field.*/final boolean casBase(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);}-
代碼(1)首先看cells是否為null,如果為null則當前在基礎變量base上進行累加,這時候就類似AtomicLong的操作。 如果cells不為null或者線程執行代碼(1)的CAS操作失敗了,則會去執行代碼(2)。
-
代碼(2)(3)決定當前線程應該訪問cells數組里面的哪一個Cell元素,如果當前線程映射的元素存在則執行代碼(4),使用CAS操作去更新分配的Cell元素的value值,如果當前線程映射的元素不存在或者存在但是CAS操作失敗則執行代碼(5)。
其實將代碼(2)(3)(4)合起來看就是獲取當前線程應該訪問的cells數組的Cell元素,然后進行CAS更新操作,只是在獲取期間如果有些條件不滿足則會跳轉到代碼(5)執行。
另外當前線程應該訪問cells數組的哪一個Cell元素是通過getProbe() & m進行計算的,其中m是當前cells數組元素個數-1,getProbe()則用于獲取當前線程中變量threadLocalRandomProbe的值,這個值一開始為0,在代碼(5)里面會對其進行初始化。并且當前線程通過分配的Cell元素的cas函數來保證對Cell元素value值更新的原子性,到這里我們回答了問題2和問題6。
longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
cells數組被初始化和擴容的地方
final void longAccumulate(long x, LongBinaryOperator fn,boolean wasUncontended) {//(6) 初始化當前線程的變量threadLocalRandomProbe的值int h;if ((h = getProbe()) == 0) {ThreadLocalRandom.current(); //h = getProbe();wasUncontended = true;}boolean collide = false;for (; ; ) {Cell[] as; Cell a; int n; long v;if ((as = cells) ! = null && (n = as.length) > 0) {//(7)if ((a = as[(n -1) & h]) == null) {//(8)if (cellsBusy == 0) { // Try to attach new CellCell r = new Cell(x); // Optimistically createif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try { // Recheck under lockCell[] rs; int m, j;if ((rs = cells) ! = null &&(m = rs.length) > 0 &&rs[j = (m -1) & h] == null) {rs[j] = r;created = true;}} finally {cellsBusy = 0;}if (created)break;continue; // Slot is now non-empty}}collide = false;}else if (! wasUncontended) // CAS already known to failwasUncontended = true;//當前Cell存在,則執行CAS設置(9)else if (a.cas(v = a.value, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break;//當前Cell數組元素個數大于CPU個數(10)else if (n >= NCPU || cells ! = as)collide = false; // At max size or stale//是否有沖突(11)else if (! collide)collide = true;//如果當前元素個數沒有達到CPU個數并且有沖突則擴容(12)else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) { // Expand table unless stale//12.1Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {//12.2cellsBusy = 0;}//12.3collide = false;continue; // Retry with expanded table}//(13)為了能夠找到一個空閑的Cell,重新計算hash值,xorshift算法生成隨機數h = advanceProbe(h);}//初始化Cell數組(14)else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try {if (cells == as) {//14.1Cell[] rs = new Cell[2];//14.2rs[h & 1] = new Cell(x);cells = rs;init = true;}} finally {//14.3cellsBusy = 0;}if (init)break;}else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))break; // Fall back on using base}}上面代碼比較復雜,這里我們主要關注問題3、問題4和問題5。
當每個線程第一次執行到代碼(6)時,會初始化當前線程變量threadLocalRandomProbe的值,上面也說了,這個變量在計算當前線程應該被分配到cells數組的哪一個Cell元素時會用到。
cells數組的初始化是在代碼(14)中進行的,其中cellsBusy是一個標示,為0說明當前cells數組沒有在被初始化或者擴容,也沒有在新建Cell元素,為1則說明cells數組在被初始化或者擴容,或者當前在創建新的Cell元素、通過CAS操作來進行0或1狀態的切換,這里使用casCellsBusy函數。假設當前線程通過CAS設置cellsBusy為1,則當前線程開始初始化操作,那么這時候其他線程就不能進行擴容了。
如代碼(14.1)初始化cells數組元素個數為2,然后使用h&1計算當前線程應該訪問celll數組的哪個位置,也就是使用當前線程的threadLocalRandomProbe變量值&(cells數組元素個數-1),然后標示cells數組已經被初始化,最后代碼(14.3)重置了cellsBusy標記。
顯然這里沒有使用CAS操作,卻是線程安全的,原因是cellsBusy是volatile類型的,這保證了變量的內存可見性,另外此時其他地方的代碼沒有機會修改cellsBusy的值。在這里初始化的cells數組里面的兩個元素的值目前還是null。這里回答了問題3,知道了cells數組如何被初始化。
cells數組的擴容是在代碼(12)中進行的,對cells擴容是有條件的,也就是代碼(10)(11)的條件都不滿足的時候。具體就是當前cells的元素個數小于當前機器CPU個數并且當前多個線程訪問了cells中同一個元素,從而導致沖突使其中一個線程CAS失敗時才會進行擴容操作。
這里為何要涉及CPU個數呢?只有當每個CPU都運行一個線程時才會使多線程的效果最佳,也就是當cells數組元素個數與CPU個數一致時,每個Cell都使用一個CPU進行處理,這時性能才是最佳的。
代碼(12)中的擴容操作也是先通過CAS設置cellsBusy為1,然后才能進行擴容。假設CAS成功則執行代碼(12.1)將容量擴充為之前的2倍,并復制Cell元素到擴容后數組。
另外,擴容后cells數組里面除了包含復制過來的元素外,還包含其他新元素,這些元素的值目前還是null。這里回答了問題4。
在代碼(7)(8)中,當前線程調用add方法并根據當前線程的隨機數threadLocalRandomProbe和cells元素個數計算要訪問的Cell元素下標,然后如果發現對應下標元素的值為null,則新增一個Cell元素到cells數組,并且在將其添加到cells數組之前要競爭設置cellsBusy為1。
代碼(13)對CAS失敗的線程重新計算當前線程的隨機值threadLocalRandomProbe,以減少下次訪問cells元素時的沖突機會。這里回答了問題5。
LongAdder 小結
JDK 8中新增的LongAdder原子性操作類,該類通過內部cells數組分擔了高并發下多線程同時對一個原子變量進行更新時的競爭量,讓多個線程可以同時對cells數組里面的元素進行并行的更新操作。
另外,數組元素Cell使用@sun.misc.Contended注解進行修飾,這避免了cells數組內多個原子變量被放入同一個緩存行,也就是避免了偽共享,這對性能也是一個提升。
LongAccumulator
概述
LongAdder類是LongAccumulator的一個特例, LongAccumulator比LongAdder的功能更強大。
例如下面的構造函數,其中accumulatorFunction是一個雙目運算器接口,其根據輸入的兩個參數返回一個計算值,identity則是LongAccumulator累加器的初始值。
/*** Creates a new instance using the given accumulator function* and identity element.* @param accumulatorFunction a side-effect-free function of two arguments* @param identity identity (initial value) for the accumulator function*/public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {this.function = accumulatorFunction;base = this.identity = identity;}看看 LongBinaryOperator
LongAccumulator相比于LongAdder,可以為累加器提供非0的初始值,后者只能提供默認的0值。另外,前者還可以指定累加規則,比如不進行累加而進行相乘,只需要在構造LongAccumulator時傳入自定義的雙目運算器即可,后者則內置累加的規則。
LongAdder#add vs LongAccumulator#accumulate
LongAdder#add方法
LongAccumulator#accumulate方法
LongAccumulator相比于LongAdder的不同在于,在調用casBase時后者傳遞的是b+x,前者則使用了r = function.applyAsLong(b = base, x)來計算。
另外,前者在調用longAccumulate時傳遞的是function,而后者是null。從下面的代碼可知,當fn為null時就使用v+x加法運算,這時候就等價于LongAdder,當fn不為null時則使用傳遞的fn函數計算。
LongAdder類是LongAccumulator的一個特例,只是后者提供了更加強大的功能,可以讓用戶自定義累加規則。
小Demo
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.LongAccumulator; import java.util.function.LongBinaryOperator; import java.util.stream.IntStream;/*** @author 小工匠* @version 1.0* @description: TODO* @date 2021/12/1 0:02* @mark: show me the code , change the world*/ public class LongAccumulator1 {public static void main(String[] args) {testAccumulate();}private static void testAccumulate() {LongBinaryOperator op = (x, y) -> 2 * x + y;LongAccumulator accumulator = new LongAccumulator(op, 1L);ExecutorService executor = Executors.newFixedThreadPool(2);IntStream.range(0, 100).forEach(i -> executor.submit(() -> accumulator.accumulate(i)));System.out.format("Add: %d\n", accumulator.getThenReset());executor.shutdown();} }原子類總結
并發包中的原子性操作類都是使用非阻塞算法CAS實現的,這相比使用鎖實現原子性操作在性能上有很大提高。
梳理了AtomicLong類的實現原理,然后JDK 8中新增的LongAdder類和LongAccumulator類的原理。
總結
以上是生活随笔為你收集整理的Java Review - 并发编程_原子操作类LongAdder LongAccumulator剖析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Java Review - 并发编程_原
- 下一篇: Java Review - 并发编程_并