多线程小抄集(新编四)
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
 
 歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new4/
ConcurrentHashMap
ConcurrentHashMap是線程安全的HashMap,鍵值都不能為null。
JDK7的實(shí)現(xiàn):內(nèi)部采用分段鎖來實(shí)現(xiàn),默認(rèn)初始容量為16(所以理論上這個(gè)時(shí)候最多可以同時(shí)支持 16 個(gè)線程并發(fā)寫,只要它們的操作分別分布在不同的 Segment 上。這個(gè)值可以在初始化的時(shí)候設(shè)置為其他值,但是一旦初始化以后,它是不可以擴(kuò)容的),裝載因子為0.75f,分段16,每個(gè)段的HashEntry<K,V>[]大小為2。每次擴(kuò)容為原來容量的2倍,ConcurrentHashMap不會(huì)對(duì)整個(gè)容器進(jìn)行擴(kuò)容,而只對(duì)某個(gè)segment進(jìn)行擴(kuò)容。在獲取size操作的時(shí)候,先嘗試2(RETRIES_BEFORE_LOCK)次通過不鎖住Segment的方式統(tǒng)計(jì)各個(gè)Segment大小,如果統(tǒng)計(jì)的過程中,容器的count發(fā)生了變化,再采用加鎖的方式來統(tǒng)計(jì)所有的Segment的大小。
 
 JDK8的實(shí)現(xiàn):ConcurrentHashMap中結(jié)構(gòu)為:數(shù)組、鏈表和紅黑樹。當(dāng)某個(gè)槽內(nèi)的元素個(gè)數(shù)增加到8個(gè)且table容量大于或者等于64時(shí),由鏈表轉(zhuǎn)為紅黑樹;當(dāng)某個(gè)槽內(nèi)的元素個(gè)數(shù)減少到6個(gè)時(shí),由紅黑樹重新轉(zhuǎn)回鏈表。鏈表轉(zhuǎn)紅黑樹的過程,就是把給定順序的元素構(gòu)造成一顆紅黑樹的過程。需要注意的是,當(dāng)table容量小于64時(shí),只會(huì)擴(kuò)容,并不會(huì)把鏈表轉(zhuǎn)為紅黑樹。在轉(zhuǎn)化過程中,使用同步塊鎖住當(dāng)前槽的首元素,防止其他進(jìn)程對(duì)當(dāng)前槽進(jìn)行增刪改操作,轉(zhuǎn)化完成后利用CAS替換原有的鏈表。
 
 無論是JDK7還是JDK8,ConcurrentHashMap的size方法都只能返回一個(gè)大概數(shù)量,無法做到100%的精確,因?yàn)橐呀?jīng)統(tǒng)計(jì)過的槽在size返回最終結(jié)果前有可能又出現(xiàn)了變化,從而導(dǎo)致返回大小與實(shí)際大小存在些許差異。
JDK8中size方法的實(shí)現(xiàn)比JDK7要簡單很多:
public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); }最大值是 Integer 類型的最大值,但是 Map 的 size 可能超過 MAX_VALUE, 所以還有一個(gè)方法 mappingCount(),JDK 的建議使用 mappingCount() 而不是size()。mappingCount() 的代碼如下:
public long mappingCount() {long n = sumCount();return (n < 0L) ? 0L : n; // ignore transient negative values }以上可以看出,無論是 size() 還是 mappingCount(), 計(jì)算大小的核心方法都是 sumCount()。sumCount() 的代碼如下:
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum; }CounterCell 這個(gè)類到底是什么?我們會(huì)發(fā)現(xiàn)它使用了 @sun.misc.Contended 標(biāo)記的類,內(nèi)部包含一個(gè) volatile 變量。@sun.misc.Contended 這個(gè)注解標(biāo)識(shí)著這個(gè)類防止需要防止 “偽共享”。
 JDK1.8 size 是通過對(duì) baseCount 和 counterCell 進(jìn)行 CAS 計(jì)算,最終通過 baseCount 和 遍歷 CounterCell 數(shù)組得出 size。
JDK8 ConcurrentHashMap CPU 100%
遞歸調(diào)用computeIfAbsent方法
線程安全的非阻塞隊(duì)列
非阻塞隊(duì)列有ConcurrentLinkedQueue, ConcurrentLinkedDeque。元素不能為null。以ConcurrentLinkedQueue為例,有頭head和尾tail兩個(gè)指針,遵循FIFO的原則進(jìn)行入隊(duì)和出隊(duì),方法有add(E e), peek()取出不刪除, poll()取出刪除, remove(Object o),size(), contains(Object o), addAll(Collection c), isEmpty()。ConcurrentLinkedDeque是雙向隊(duì)列,可以在頭和尾兩個(gè)方向進(jìn)行相應(yīng)的操作。
阻塞隊(duì)列
阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作支持阻塞的插入和移除方法。 支持阻塞的插入方法:意思是當(dāng)隊(duì)列滿時(shí),隊(duì)列會(huì)阻塞插入元素的線程,直到隊(duì)列不滿。 支持阻塞的移除方法:意思是隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡?任何阻塞隊(duì)列中的元素都不能為null.
阻塞隊(duì)列的插入和移除操作處理方式:
| 入隊(duì) | add(e) | offer(e) | put(e) | offer(e,timeout,unit) | 
| 出隊(duì) | remove() | poll() | take() | poll(timeout,unit) | 
| 查看 | element() | peek() | 無 | 無 | 
如果是無界隊(duì)列,隊(duì)列不可能出現(xiàn)滿的情況,所以使用put或offer方法永遠(yuǎn)不會(huì)被阻塞,而且使用offer方法時(shí),該方法永遠(yuǎn)返回true.
Java里的阻塞隊(duì)列:ArrayBlockingQueue:一個(gè)由數(shù)組結(jié)構(gòu)組成的有界阻塞隊(duì)列。 LinkedeBlockingQueue:一個(gè)有鏈表結(jié)構(gòu)組成的(有界/無界)阻塞隊(duì)列。 PriorityBlockingQueue:一個(gè)支持優(yōu)先級(jí)排序的無界阻塞隊(duì)列 。DelayQueue:一個(gè)使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無界阻塞隊(duì)列。 SynchronousQueue:一個(gè)不存儲(chǔ)元素的阻塞隊(duì)列。 LinkedTransferQueue:一個(gè)由鏈表結(jié)構(gòu)組成的無界阻塞隊(duì)列。 LinkedBlockingDeque:一個(gè)由鏈表結(jié)構(gòu)組成的雙向阻塞隊(duì)列。
ArrayBlockingQueue
此隊(duì)列按照FIFO的原則對(duì)元素進(jìn)行排序,支持公平和非公平策略,默認(rèn)為不公平。初始化時(shí)必須設(shè)定容量大小。
public ArrayBlockingQueue(int capacity) public ArrayBlockingQueue(int capacity, boolean fair) public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c)LinkedBlockingQueue
與ArrayBlockingQueue一樣,按照FIFO原則進(jìn)行排序,內(nèi)部采用鏈表結(jié)構(gòu),且不能設(shè)置為公平的。默認(rèn)隊(duì)列長度為Integer.MAX_VALUE。
public LinkedBlockingQueue() {this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) public LinkedBlockingQueue(Collection<? extends E> c)PriorityBlockingQueue
是一個(gè)支持優(yōu)先級(jí)的無界阻塞隊(duì)列,默認(rèn)初始容量為11,默認(rèn)情況下采用自然順序升序排列,不能保證同優(yōu)先級(jí)元素的順序。底層采用二叉堆實(shí)現(xiàn)。內(nèi)部元素要么實(shí)現(xiàn)Comparable接口,要么在初始化的時(shí)候指定構(gòu)造函數(shù)的Comparator來對(duì)元素進(jìn)行排序。
public PriorityBlockingQueue() public PriorityBlockingQueue(int initialCapacity) public PriorityBlockingQueue(int initialCapacity,Comparator<? super E> comparator) public PriorityBlockingQueue(Collection<? extends E> c)DelayQueue
支持延時(shí)獲取元素的無界阻塞隊(duì)列。內(nèi)部包含一個(gè)PriorityQueue來實(shí)現(xiàn),隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口,在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素。只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。 DelayQueue非常有用,可以將DelayQueue運(yùn)用在下面應(yīng)用場景。
- 緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存緩存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素時(shí),表示緩存有效期到了。
- 定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將會(huì)執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開始執(zhí)行。
SynchronousQueue
不存儲(chǔ)元素的阻塞隊(duì)列,支持公平和非公平策略。每一個(gè)put操作必須等待一個(gè)take操作,否則不能繼續(xù)添加元素。非常適合傳遞性場景。
CopyOnWriteArrayList
在每次修改時(shí),都會(huì)創(chuàng)建并重新發(fā)布一個(gè)新的容器副本,從而實(shí)現(xiàn)可變性。CopyOnWriteArrayList的迭代器保留一個(gè)指向底層基礎(chǔ)數(shù)組的引用,這個(gè)數(shù)組當(dāng)前位于迭代器的起始位置,由于它不會(huì)被修改,因此在對(duì)其進(jìn)行同步時(shí)只需確保數(shù)組內(nèi)容的可見性。因此,多個(gè)線程可以同時(shí)對(duì)這個(gè)容器進(jìn)行迭代,而不會(huì)彼此干擾或者與修改容器的線程相互干擾?!皩憰r(shí)復(fù)制”容器返回的迭代器不會(huì)拋出ConcurrentModificationException并且返回的元素與迭代器創(chuàng)建時(shí)的元素完全一致,而不必考慮之后修改操作所帶來的影響。顯然,每當(dāng)修改容器時(shí)都會(huì)復(fù)制底層數(shù)組,這需要一定的開銷,特別是當(dāng)容器的規(guī)模較大時(shí),僅當(dāng)?shù)僮鬟h(yuǎn)遠(yuǎn)多于修改操作時(shí),才應(yīng)該使用“寫入時(shí)賦值”容器。
原子類
Java中Atomic包里一共提供了12個(gè)類,屬于4種類型的原子更新方式,分別是原子更新基本類型、原子更新數(shù)組、原子更新引用、原子更新屬性(字段)。Atomic包里的類基本都是使用Unsafe實(shí)現(xiàn)的包裝類。
 1)原子更新基本類型:AtomicBoolean,AtomicInteger, AtomicLong.
 2)原子更新數(shù)組:AtomicIntegerArray,AtomicLongArray, AtomicReferenceArray.
 3)原子更新引用類型:AtomicReference, AtomicStampedReference, AtomicMarkableReference.
 4 ) 原子更新字段類型:AtomicReferenceFieldUpdater, AtomicIntegerFieldUpdater, AtomicLongFieldUpdater.
原子更新基本類型
AtomicBoolean,AtomicInteger, AtomicLong三個(gè)類提供的方法類似,以AtomicInteger為例:有int addAndGet(int delta), boolean compareAndSet(int expect, int update), int getAndIncrement(), void lazySet(int newValue),int getAndSet(int newValue)。
// setup to use Unsafe.compareAndSwapInt for updates private static final Unsafe unsafe = Unsafe.getUnsafe(); private static final long valueOffset; static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); } } private volatile int value;public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update); }sun.misc.Unsafe只提供三種CAS方法:compareAndSwapObject, compareAndSwapInt和compareAndSwapLong。對(duì)于AtomicBoolean,它是先把Boolean轉(zhuǎn)換成整形,再使用compareAndSwapInt進(jìn)行CAS,原子更新char,float,double變量也可以用類似的思路來實(shí)現(xiàn)。
原子更新數(shù)組
以AtomicIntegerArray為例,此類主要提供原子的方式更新數(shù)組里的整形,常用方法如下:
- int addAndGet(int i, int delta):以原子的方式將輸入值與數(shù)組中索引i的元素相加。
- boolean compareAndSet(int i, int expect, int update):如果當(dāng)前值等于預(yù)期值,則以原子方式將數(shù)組位置i的元素設(shè)置成update值。
- AtomicIntegerArray的兩個(gè)構(gòu)造方法: - AtomicIntegerArray(int length):指定數(shù)組的大小,并初始化為0
- AtomicIntegerArray(int [] array):對(duì)給定的數(shù)組進(jìn)行拷貝。
 
案例:
int value[] = new int[]{1,2,3};AtomicIntegerArray aia = new AtomicIntegerArray(value);System.out.println(aia.getAndSet(1, 9));System.out.println(aia.get(1));System.out.println(value[1]);運(yùn)行結(jié)果:2 9 2
原子更新引用示例
public class AtomicReferenceDemo {public static void main(String[] args) {User user = new User("conan", 15);AtomicReference<User> userRef = new AtomicReference<>(user);User userChg = new User("Shinichi", 17);userRef.compareAndSet(user, userChg);User newUser = userRef.get();System.out.println(user);System.out.println(userChg);System.out.println(userRef.get());AtomicIntegerFieldUpdater<User> fieldUpdater = AtomicIntegerFieldUpdater.newUpdater(User.class, "age");fieldUpdater.getAndIncrement(newUser);System.out.println(newUser);System.out.println(fieldUpdater.get(newUser));}@Data@AllArgsConstructorstatic class User{private String name;volatile int age;} }輸出:
AtomicReferenceDemo.User(name=conan, age=15) AtomicReferenceDemo.User(name=Shinichi, age=17) AtomicReferenceDemo.User(name=Shinichi, age=17) AtomicReferenceDemo.User(name=Shinichi, age=18) 18CAS
全稱CompareAndSwap。假設(shè)有三個(gè)操作數(shù):內(nèi)存值V,舊的預(yù)期值A(chǔ),要修改的值B,當(dāng)且僅當(dāng)預(yù)期值A(chǔ)和內(nèi)存值V相同時(shí),才會(huì)將內(nèi)存值修改為B并返回true,否則什么都不做并返回false。當(dāng)然CAS一定要配合volatile變量,這樣才能保證每次拿到的遍歷是主內(nèi)存中最新的那個(gè)值,否則舊的預(yù)期值A(chǔ)對(duì)某條線程來說,永遠(yuǎn)是一個(gè)不會(huì)變的值A(chǔ),只要某次CAS操作失敗,永遠(yuǎn)都不可能成功。cmpxchg指令
AQS
全稱AbstractQueuedSynchronizer。如果說JUC(java.util.concurrent)的基礎(chǔ)是CAS的話,那么AQS就是整個(gè)JAVA并發(fā)包的核心了,ReentrantLock, ReentrantReadWriteLock, CountDownLatch, Semaphore等都用到了它。
CAS的問題
AtomicStampedReference
循環(huán)時(shí)間長開銷大
 自旋CAS如果長時(shí)間不成功,會(huì)給CPU帶來非常大的執(zhí)行開銷。
只能保證一個(gè)共享變量的原子操作
 當(dāng)對(duì)一個(gè)共享變量執(zhí)行操作時(shí),我們可以使用循環(huán)CAS的方式來保證原子操作,但是對(duì)多個(gè)共享變量操作時(shí),循環(huán)CAS就無法保證操作的原子性。AtomicReference類來保證引用對(duì)象之間的原子性,就可以把多個(gè)變量放在一個(gè)對(duì)象里來執(zhí)行CAS操作。
如何避免死鎖
死鎖是指兩個(gè)或兩個(gè)以上的進(jìn)程在執(zhí)行過程中,因爭奪資源而造成的一種互相等待的現(xiàn)象,若無外力作用,他們都將無法推進(jìn)下去。這是一個(gè)嚴(yán)重的問題,因?yàn)樗梨i會(huì)讓你的程序掛起無法完成任務(wù),死鎖的發(fā)生必須滿足以下4個(gè)條件:
避免死鎖最簡單的方法就是阻止循環(huán)等待條件,將系統(tǒng)中所有的資源設(shè)置標(biāo)志位、排序,規(guī)定所有的進(jìn)程申請資源必須以一定的順序做操作來避免死鎖。
支持定時(shí)的鎖 boolean tryLock(long timeout, TimeUnit unit)
 通過ThreadDump來分析找出死鎖
怎么檢測一個(gè)線程是否擁有鎖
java.lang.Thread中有一個(gè)方法:public static native boolean holdsLock(Object obj). 當(dāng)且僅當(dāng)當(dāng)前線程擁有某個(gè)具體對(duì)象的鎖時(shí)返回true
JAVA中的線程調(diào)度算法
搶占式。一個(gè)線程用完CPU之后,操作系統(tǒng)會(huì)根據(jù)現(xiàn)場優(yōu)先級(jí)、線程饑餓情況等數(shù)據(jù)算出一個(gè)總的優(yōu)先級(jí)并分配下一個(gè)時(shí)間片給某個(gè)線程執(zhí)行。
無鎖
要保證現(xiàn)場安全,并不是一定就要進(jìn)行同步,兩者沒有因果關(guān)系。同步只是保證共享數(shù)據(jù)爭用時(shí)的正確性的手段,如果一個(gè)方法本來就不涉及共享數(shù)據(jù),那它自然就無須任何同步措施去保證正確性,因此會(huì)有一些代碼天生就是線程安全的。
歡迎跳轉(zhuǎn)到本文的原文鏈接:https://honeypps.com/java/java-concurrent-note-list-new4/
歡迎支持筆者新作:《深入理解Kafka:核心設(shè)計(jì)與實(shí)踐原理》和《RabbitMQ實(shí)戰(zhàn)指南》,同時(shí)歡迎關(guān)注筆者的微信公眾號(hào):朱小廝的博客。
 
總結(jié)
以上是生活随笔為你收集整理的多线程小抄集(新编四)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 多线程小抄集(新编三)
- 下一篇: Kafka最全面试题整理|划重点要考
