高并发编程-CountDownLatch深入解析
要點解說
CountDownLatch允許一個或者多個線程一直等待,直到一組其它操作執行完成。在使用CountDownLatch時,需要指定一個整數值,此值是線程將要等待的操作數。當某個線程為了要執行這些操作而等待時,需要調用await方法。await方法讓線程進入休眠狀態直到所有等待的操作完成為止。當等待的某個操作執行完成,它使用countDown方法來減少CountDownLatch類的內部計數器。當內部計數器遞減為0時,CountDownLatch會喚醒所有調用await方法而休眠的線程們。
實例演示
下面代碼演示了CountDownLatch簡單使用。演示的場景是5位運動員參加跑步比賽,發令槍打響后,5個計時器開始分別計時,直到所有運動員都到達終點。
public class CountDownLatchDemo {
public static void main(String[] args) {
Timer timer = new Timer(5);
new Thread(timer).start();
for (int athleteNo = 0; athleteNo < 5; athleteNo++) {
new Thread(new Athlete(timer, "athlete" + athleteNo)).start();
}
}
}
class Timer implements Runnable {
CountDownLatch timerController;
public Timer(int numOfAthlete) {
this.timerController = new CountDownLatch(numOfAthlete);
}
public void recordResult(String athleteName) {
System.out.println(athleteName + " has arrived");
timerController.countDown();
System.out.println("There are " + timerController.getCount() + " athletes did not reach the end");
}
@Override
public void run() {
try {
System.out.println("Start...");
timerController.await();
System.out.println("All the athletes have arrived");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class Athlete implements Runnable {
Timer timer;
String athleteName;
public Athlete(Timer timer, String athleteName) {
this.timer = timer;
this.athleteName = athleteName;
}
@Override
public void run() {
try {
System.out.println(athleteName + " start running");
long duration = (long) (Math.random() * 10);
Thread.sleep(duration * 1000);
timer.recordResult(athleteName);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
輸出結果如下所示:
Start...
athlete0 start running
athlete1 start running
athlete2 start running
athlete3 start running
athlete4 start running
athlete0 has arrived
There are 4 athletes did not reach the end
athlete3 has arrived
There are 3 athletes did not reach the end
athlete2 has arrived
athlete1 has arrived
There are 1 athletes did not reach the end
There are 2 athletes did not reach the end
athlete4 has arrived
There are 0 athletes did not reach the end
All the athletes have arrived
方法解析
1.構造方法
CountDownLatch(int count)構造一個指定計數的CountDownLatch,count為線程將要等待的操作數。
2.await()
調用await方法后,使當前線程在鎖存器(內部計數器)倒計數至零之前一直等待,進入休眠狀態,除非線程被中斷。如果當前計數遞減為零,則此方法立即返回,繼續執行。
3.await(long timeout, TimeUnit unit)
調用await方法后,使當前線程在鎖存器(內部計數器)倒計數至零之前一直等待,進入休眠狀態,除非線程被 中斷或超出了指定的等待時間。如果當前計數為零,則此方法立刻返回true值。
3.acountDown()
acountDown方法遞減鎖存器的計數,如果計數到達零,則釋放所有等待的線程。如果當前計數大于零,則將計數減少。如果新的計數為零,出于線程調度目的,將重新啟用所有的等待線程。
4.getCount()
調用此方法后,返回當前計數,即還未完成的操作數,此方法通常用于調試和測試。
源碼解析
進入源碼分析之前先看一下CountDownLatch的類圖,
Sync是CountDownLatch的一個內部類,它繼承了AbstractQueuedSynchronizer。
CountDownLatch(int count)、await()和countDown()三個方法是CountDownLatch的核心方法,本篇將深入分析這三個方法的具體實現原理。
1.CountDownLatch(int count)
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
該構造方法根據給定count參數構造一個CountDownLatch,內部創建了一個Sync實例。Sync是CountDownLatch的一個內部類,其構造方法代碼如下:
Sync(int count) {
setState(count);
}
setState方法繼承自AQS,給Sync實例的state屬性賦值。
protected final void setState(int newState) {
state = newState;
}
這個state就是CountDownLatch的內部計數器。
2.await()
當await()方法被調用時,當前線程會阻塞,直到內部計數器的值等于零或當前線程被中斷,下面深入代碼分析。
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果當前線程中斷,則拋出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
//嘗試獲取共享鎖,如果可以獲取到鎖直接返回;
//如果獲取不到鎖,執行doAcquireSharedInterruptibly
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//如果當前內部計數器等于零返回1,否則返回-1;
//內部計數器等于零表示可以獲取共享鎖,否則不可以;
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
//返回內部計數器當前值
protected final int getState() {
return state;
}
//該方法使當前線程一直等待,直到當前線程獲取到共享鎖或被中斷才返回
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//根據當前線程創建一個共享模式的Node節點
//并把這個節點添加到等待隊列的尾部
//AQS等待隊列不熟悉的可以查看AQS深入解析的內容
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
//獲取新建節點的前驅節點
final Node p = node.predecessor();
//如果前驅節點是頭結點
if (p == head) {
//嘗試獲取共享鎖
int r = tryAcquireShared(arg);
//獲取到共享鎖
if (r >= 0) {
//將前驅節點從等待隊列中釋放
//同時使用LockSupport.unpark方法喚醒前驅節點的后繼節點中的線程
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//當前節點的前驅節點不是頭結點,或不可以獲取到鎖
//shouldParkAfterFailedAcquire方法檢查當前節點在獲取鎖失敗后是否要被阻塞
//如果shouldParkAfterFailedAcquire方法執行結果是當前節點線程需要被阻塞,則執行parkAndCheckInterrupt方法阻塞當前線程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
private Node addWaiter(Node mode) {
//根據當前線程創建一個共享模式的Node節點
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
//如果尾節點不為空(等待隊列不為空),則新節點的前驅節點指向這個尾節點
//同時尾節點指向新節點
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//如果尾節點為空(等待隊列是空的)
//執行enq方法將節點插入到等待隊列尾部
enq(node);
return node;
}
//這里如果不熟悉的可以查看AQS深入解析的內容
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
private Node enq(final Node node) {
//使用循環插入尾節點,確保成功插入
for (;;) {
Node t = tail;
//尾節點為空(等待隊列是空的)
//新建節點并設置為頭結點
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
//否則,將節點插入到等待隊列尾部
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//獲取當前節點的前驅節點
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
//判斷當前節點里的線程是否需要被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前驅節點線程的狀態
int ws = pred.waitStatus;
//如果前驅節點線程的狀態是SIGNAL,返回true,需要阻塞線程
if (ws == Node.SIGNAL)
return true;
//如果前驅節點線程的狀態是CANCELLED,則設置當前節點的前去節點為"原前驅節點的前驅節點"
//因為當前節點的前驅節點線程已經被取消了
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//其它狀態的都設置前驅節點為SIGNAL狀態
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//通過使用LockSupport.park阻塞當前線程
//同時返回當前線程是否中斷
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
3.countDown()
內部計數器減一,如果計數達到零,喚醒所有等待的線程。
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
//如果內部計數器狀態值遞減后等于零
if (tryReleaseShared(arg)) {
//喚醒等待隊列節點中的線程
doReleaseShared();
return true;
}
return false;
}
//嘗試釋放共享鎖,即將內部計數器值減一
protected boolean tryReleaseShared(int releases) {
for (;;) {
//獲取內部計數器狀態值
int c = getState();
if (c == 0)
return false;
//計數器減一
int nextc = c-1;
//使用CAS修改state值
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
private void doReleaseShared() {
for (;;) {
//從頭結點開始
Node h = head;
//頭結點不為空,并且不是尾節點
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
//喚醒阻塞的線程
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
if (h == head)
break;
}
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
//通過使用LockSupport.unpark喚醒線程
LockSupport.unpark(s.thread);
}
原理總結
使用CountDownLatch(int count)構建CountDownLatch實例,將count參數賦值給內部計數器state,調用await()方法阻塞當前線程,并將當前線程封裝加入到等待隊列中,直到state等于零或當前線程被中斷;調用countDown()方法使state值減一,如果state等于零則喚醒等待隊列中的線程。
實戰經驗
實際工作中,CountDownLatch適用于如下使用場景:
客戶端的一個同步請求查詢用戶的風險等級,服務端收到請求后會請求多個子系統獲取數據,然后使用風險評估規則模型進行風險評估。如果使用單線程去完成這些操作,這個同步請求超時的可能性會很大,因為服務端請求多個子系統是依次排隊的,請求子系統獲取數據的時間是線性累加的。此時可以使用CountDownLatch,讓多個線程并發請求多個子系統,當獲取到多個子系統數據之后,再進行風險評估,這樣請求子系統獲取數據的時間就等于最耗時的那個請求的時間,可以大大減少處理時間。
面試考點
CountDownLatch和CyclicBarrier的異同?
相同點:都可以實現線程間的等待。
不同點:
1.側重點不同,CountDownLatch一般用于一個線程等待一組其它線程;而CyclicBarrier一般是一組線程間的相互等待至某同步點;
2.CyclicBarrier的計數器是可以重用的,而CountDownLatch不可以。
總結
以上是生活随笔為你收集整理的高并发编程-CountDownLatch深入解析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: web公选课第三节2020.5.18
- 下一篇: 数学建模第七节5.22-26补