Semaphore 源码分析
需要提前了解的知識點: AbstractQueuedSynchronizer 實現(xiàn)原理
類介紹
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調各個線程,以保證合理的使用公共資源。比如控制用戶的訪問量,同一時刻只允許1000個用戶同時使用系統(tǒng),如果超過1000個并發(fā),則需要等待。
使用場景
比如模擬一個停車場停車信號,假設停車場只有兩個車位,一開始兩個車位都是空的。這時如果同時來了兩輛車,看門人允許它們進入停車場,然后放下車攔。以后來的車必須在入口等待,直到停車場中有車輛離開。這時,如果有一輛車離開停車場,看門人得知后,打開車攔,放入一輛,如果又離開一輛,則又可以放入一輛,如此往復。
public class SemaphoreDemo {private static Semaphore s = new Semaphore(2);public static void main(String[] args) {ExecutorService pool = Executors.newCachedThreadPool();pool.submit(new ParkTask("1"));pool.submit(new ParkTask("2"));pool.submit(new ParkTask("3"));pool.submit(new ParkTask("4"));pool.submit(new ParkTask("5"));pool.submit(new ParkTask("6"));pool.shutdown();}static class ParkTask implements Runnable {private String name;public ParkTask(String name) {this.name = name;}@Overridepublic void run() {try {s.acquire();System.out.println("Thread "+this.name+" start...");TimeUnit.SECONDS.sleep(new Random().nextInt(10));} catch (InterruptedException e) {e.printStackTrace();} finally {s.release();}}} }Semaphore 源碼分析
Semaphore 通過使用內部類Syn繼承AQS來實現(xiàn)。
支持公平鎖和非公平鎖。內部使用的AQS的共享鎖。
具體實現(xiàn)可參考 AbstractQueuedSynchronizer 源碼分析
Semaphore 的結構如下:
Semaphore構造
public Semaphore(int permits) {sync = new NonfairSync(permits); }public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits); }構造方法指定信號量的許可數(shù)量,默認采用的是非公平鎖,也只可以指定為公平鎖。
permits賦值給AQS中的state變量。
acquire:可響應中斷的獲得信號量
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }獲得信號量方法,這兩個方法支持 Interrupt中斷機制,可使用acquire() 方法每次獲取一個信號量,也可以使用acquire(int permits) 方法獲取指定數(shù)量的信號量 。
acquire:不可響應中斷的獲取信號量
public void acquireUninterruptibly() {sync.acquireShared(1); }public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits); }這兩個方法不響應Interrupt中斷機制,其它功能同acquire方法機制。
tryAcquire 方法,嘗試獲得信號量
public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0; }public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }嘗試獲得信號量有三個方法。
1. 嘗試獲取信號量,如果獲取成功則返回true,否則馬上返回false,不會阻塞當前線程。
2. 嘗試獲取信號量,如果在指定的時間內獲得信號量,則返回true,否則返回false
3. 嘗試獲取指定數(shù)量的信號量,如果在指定的時間內獲得信號量,則返回true,否則返回false。
release 釋放信號量
public void release() {sync.releaseShared(1); }調用AQS中的releaseShared方法,使得state每次減一來控制信號量。
availablePermits方法,獲取當前剩余的信號量數(shù)量
public int availablePermits() {return sync.getPermits(); }//=========Sync類======== final int getPermits() {return getState();}該方法返回AQS中state變量的值,當前剩余的信號量個數(shù)
drainPermits方法
public int drainPermits() {return sync.drainPermits(); }//=========Sync類======== final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;} }獲取并返回立即可用的所有許可。Sync類的drainPermits方法,獲取1個信號量后將可用的信號量個數(shù)置為0。例如總共有10個信號量,已經使用了5個,再調用drainPermits方法后,可以獲得一個信號量,剩余4個信號量就消失了,總共可用的信號量就變成6個了。
reducePermits 方法
protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction); }//=========Sync類======== final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;} }該方法是protected 方法,減少信號量個數(shù)
判斷AQS等待隊列中是否還有Node
public final boolean hasQueuedThreads() {return sync.hasQueuedThreads(); }//=========AbstractQueuedSynchronizer類======== public final boolean hasQueuedThreads() {//頭結點不等于尾節(jié)點就說明鏈表中還有元素return head != tail; }getQueuedThreads方法
protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads(); }//=========AbstractQueuedSynchronizer類======== public final Collection<Thread> getQueuedThreads() {ArrayList<Thread> list = new ArrayList<Thread>();for (Node p = tail; p != null; p = p.prev) {Thread t = p.thread;if (t != null)list.add(t);}return list; }該方法獲取AQS中等待隊列中所有未獲取信號量的線程相關的信息(等待獲取信號量的線程相關信息)。
本人簡書blog地址:http://www.jianshu.com/u/1f0067e24ff8????
點擊這里快速進入簡書
總結
以上是生活随笔為你收集整理的Semaphore 源码分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: CountDownLatch 源码分析
- 下一篇: 深入理解 Synchronized