Java 7之多线程- Semaphore--转载
Semaphore用于保存當前可用許可的數量。是通過共享鎖實現的。根據共享鎖的獲取原則,Semaphore分為"公平信號量"和"非公平信號量"。
"公平信號量"和"非公平信號量"的釋放信號量的機制是一樣的!不同的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對于公平信號量而言,如果當前線程不在隊列的頭部,則排隊等候;而對于非公平信號量而言,無論當前線程是不是在隊列的頭部,它都會直接獲取信號量。該差異具體的體現在,它們的tryAcquireShared()函數的實現不同。
如果要使用Semaphore對象時,首先通過構造函數取得對象,如下:
public Semaphore(int permits) { // 構造函數默認使用非公平的方式獲取sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { // 構造函數中指定獲取的方式sync = fair ? new FairSync(permits) : new NonfairSync(permits); }然后就可以調用Semaphore對象進行信號量的獲取了,如下:
public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1); }public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits); }1、公平信號量的獲取
?
首先來看公平信號量的獲取,方法如下:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException {// 如果線程是中斷狀態,則拋出異常if (Thread.interrupted())throw new InterruptedException();// 嘗試獲取共享鎖;獲取成功則直接返回,獲取失敗,則通過doAcquireSharedInterruptibly()獲取。if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg); }如果tryAcquireShared()方法獲取失敗,通常會返回一個小于0的數量。Semaphore中公平鎖對應的tryAcquireShared()實現如下:
protected int tryAcquireShared(int acquires) {for (;;) {// 判斷當前線程是不是隊列中的第一個線程。若是的話,則返回-1,跳出死循環if (hasQueuedPredecessors())return -1;int available = getState(); // 獲取當前可用的信號量的許可數// 設置獲得acquires個信號量許可之后,剩余的信號量許可數int remaining = available - acquires;// 如果剩余的信號量許可數>=0”,則設置可以獲得的信號量許可數為remaining。if (remaining < 0 || compareAndSetState(available, remaining))return remaining;}}返回的是remaining,如果為-1,表示獲取失敗。如果為>=0,則預示了其他共享獲取操作能否成功。
如上方法獲取失敗后,調用doAcquireSharedInterruptibly()方法,源碼如下:
private void doAcquireSharedInterruptibly(long arg) throws InterruptedException {// 創建當前線程的Node節點,且Node中記錄的鎖是共享鎖類型;并將該節點添加到隊列末尾。final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {// 獲取上一個節點,如果上一節點是隊列的表頭,則嘗試獲取共享鎖final Node p = node.predecessor();if (p == head) {long r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 當前線程一直等待,直到獲取到共享鎖。// 如果線程在等待過程中被中斷過,則再次中斷該線程(還原之前的中斷狀態)。if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);} }2、非公平信號量的獲取
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); }在這個方法里調用了如下方法:
final int nonfairTryAcquireShared(int acquires) { // 非公平方式獲取共享鎖的一定量許可for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining))// 如果remaining>=0,則通過CAS方式更新當前許可的數量return remaining;}}判斷當前剩余的信號量許可數,返回小于0的數表示獲取失敗,大于等于0表示成功。
?
非公平和公平主要體現在tryAcquireShared()方法的實現上。
(1)非公平獲取 如果當前可用的信號量許可大于等于請求數,則通過CAS修改剩余許可量并返回,如果小于的話,返回小于0的數,表示獲取失敗。
(2)公平獲取 在獲取時還會判斷。如果當前線程不在隊列的頭部,則返回-1,排隊等候;然后再去判斷信號量許可。
?
3、公平信號量的釋放
?
調用如下方法來釋放信號量許可,如下:
public void release() {sync.releaseShared(1); }public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits); }調用無參數的release()方法默認只釋放一個信號量許可,而下面的可以指定:
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}首先調用tryReleaseShared()方法去釋放,源代碼如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {// 獲取可以獲得的信號量的許可數int current = getState();// 獲取釋放releases個信號量許可之后,剩余的信號量許可數int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");// 設置可以獲得的信號量的許可數為nextif (compareAndSetState(current, next))return true;} }如果tryReleaseShared()嘗試釋放共享鎖失敗,則會調用doReleaseShared()去釋放共享鎖。doReleaseShared()的源碼如下
private void doReleaseShared() {for (;;) {// 獲取CLH隊列的頭節點Node h = head;// 如果頭節點不為null,并且頭節點不等于tail節點。if (h != null && h != tail) {// 獲取頭節點對應的線程的狀態int ws = h.waitStatus;// 如果頭節點對應的線程是SIGNAL狀態,則意味著“頭節點的下一個節點所對應的線程”需要被unpark喚醒。if (ws == Node.SIGNAL) {// 設置“頭節點對應的線程狀態”為空狀態。失敗的話,則繼續循環。if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;// 喚醒“頭節點的下一個節點所對應的線程”。 unparkSuccessor(h);}// 如果頭節點對應的線程是空狀態,則設置“文件點對應的線程所擁有的共享鎖”為其它線程獲取鎖的空狀態。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue; // loop on failed CAS }// 如果頭節點發生變化,則繼續循環。否則,退出循環。if (h == head) // loop if head changedbreak;} }4、非公平信號量的釋放
protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires); } final int nonfairTryAcquireShared(int acquires) {for (;;) {// 設置可以獲得的信號量的許可數int available = getState();// 設置獲得acquires個信號量許可之后,剩余的信號量許可數int remaining = available - acquires;// 如果剩余的信號量許可數>=0,則設置可以獲得的信號量許可數為remainingif (remaining < 0 ||compareAndSetState(available, remaining))return remaining;} }舉個例子,如下:
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class SemaphoreTest1 { private static final int SEM_MAX = 10;public static void main(String[] args) { Semaphore sem = new Semaphore(SEM_MAX);//創建線程池ExecutorService threadPool = Executors.newFixedThreadPool(3);//在線程池中執行任務threadPool.execute(new MyThread(sem, 5));threadPool.execute(new MyThread(sem, 4));threadPool.execute(new MyThread(sem, 7));//關閉池 threadPool.shutdown();} }class MyThread extends Thread {private volatile Semaphore sem; // 信號量private int count; // 申請信號量的大小 MyThread(Semaphore sem, int count) {this.sem = sem;this.count = count;}public void run() {try {// 從信號量中獲取count個許可 sem.acquire(count);Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " acquire count="+count);} catch (InterruptedException e) {e.printStackTrace();} finally {// 釋放給定數目的許可,將其返回到信號量。 sem.release(count);System.out.println(Thread.currentThread().getName() + " release " + count + "");}} }某一次運行后的結果如下:
pool-1-thread-1 acquire count=5 pool-1-thread-2 acquire count=4 pool-1-thread-1 release 5 pool-1-thread-2 release 4 pool-1-thread-3 acquire count=7 pool-1-thread-3 release 7原文地址:http://www.2cto.com/kf/201402/278471.html
轉載于:https://www.cnblogs.com/davidwang456/p/4086361.html
總結
以上是生活随笔為你收集整理的Java 7之多线程- Semaphore--转载的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IO流--转载
- 下一篇: java 并发编程第七章:取消和关闭