Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)
? 在前面個兩篇博文中,我們使用Java模擬了AND型信號量和信號量集,本文將使用Java來模擬管程,關于管程的一些理論知識,可以參考另一篇博客。
? 對于管程,在這里我們不做具體的討論了。不過對于Java和管程之間的一些事,還是很有意思的。Java中,每個對象其實都一個Monitor(java中翻譯為監視器),Java中提供的synchronized關鍵字及wait()、notify()、notifyAll()方法,都是Monitor的一部分,或者說,在Jdk1.5之前也就是JUC沒有出現之前,Java都是通過Monitor來實現并發的。Monitor在OS中或者別處的翻譯是管程,我也更傾向于翻譯為管程,Java中使用的是MESA管程,本文呢,我們就來模擬實現霍爾管和漢森管程,來加強我們對并發編程的能力和增加對線程同步問題的理解。
? 在實現之前,我們先對幾種管程的區別來簡單的說一下,這里還是使用我在進程同步機制中說到的問題:如果進程P1因x條件處于阻塞狀態,那么當進程P2執行了x.signal操作喚醒P1后,進程P1和P2此時同時處于管程中了,這是不被允許的,那么如何確定哪個執行哪個等待?這個問題也很簡單,可采用下面的兩種方式之一進行處理:
P2等待,直至P1離開管程或者等待另一個條件;
P1等待,直至P2離開管程或者等待另一個條件。
? Hoare管程采用了第一種處理方式;MESA管程采用第二種方式;Hansan管程采用了兩者的折中,它規定管程中的所有過程執行的signal操作是過程體的最后一個操作,于是,進程P2執行完signal操作后立即退出管程,因此進程P1馬上被恢復執行。
? 在下面不同管程的具體實現中,我們還是通過解決經典(一直都是他)的生產者–消費者問題來具體的解釋。并且下面所有的實現,都是使用JUC中的ReentrantLock(可重入鎖)+Condition(條件變量)來實現的。
1.霍爾管程
? 首先是霍爾管程,也是因為我們對其理論介紹的最多,并且在理論篇中給出了其wait()、signal()操作的偽代碼,因此我們首先來實現霍爾管程,我們使用內部類來實現上一文中cond和interf的數據定義,其代碼如下:
package XXX.util;import lombok.extern.slf4j.Slf4j;import java.util.ArrayList; import java.util.List; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;/*** 霍爾管程*/ @Slf4j public class MonitorsUtil {//緩沖池中緩沖區static final Integer N = 50;//緩沖池static List<Integer> buffer = new ArrayList<>(N);//互斥鎖,用以實現緩沖的互斥訪問static Lock lock = new ReentrantLock();static cond notFull = new cond(lock, "notFull");static cond notEmpty = new cond(lock, "notEmpty");//用與存放因無法進入管程的阻塞隊列&&因為調用signal阻塞自身的線程(Hoare)static interf IM = new interf(lock);public static void wait(String id, cond declar, interf IM) throws InterruptedException {//獲取鎖,需要在獲得鎖的情況下才可以操作conditiondeclar.count++;//log.info("當前condition中阻塞的線程數:【{},{},{},{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//判斷是否有進程在高優先級隊列中if (IM.nextCount > 0) {//喚醒因調用signal操作的線程IM.next.release();}log.info("線程【{}】調用wait被掛起到條件變量【{}】。", id, declar.name);//掛起時自動釋放鎖,等待進入管程的隊列可以獲得鎖并進入管程declar.condition.await();log.info("被掛起的線程【{}】被喚醒執行。", id);declar.count--;}public static void signal(String id, cond declar, interf IM) throws InterruptedException {log.info("線程【{}】執行了釋放資源", id);if (declar.count > 0) {//掛起自己后,因為調用signal掛起自己的進程數量加1IM.nextCount++;//喚醒因為條件變量而阻塞的線程declar.condition.signal();log.info("喚醒的條件變量為:【{}】", declar.name);//log.info("釋放后所有condition中阻塞的線程數:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//釋放資源后,立即把自己掛起,進入高優先級隊列-------Hoare的處理方式log.info("線程【{}】調用signal被掛起。", id);//釋放lock,不然別的線程無法進入管程lock.unlock();//將當前線程插入到next的阻塞隊列中IM.next.acquire();//再次獲取鎖-->進入管程lock.lock();log.info("被掛起的線程【{}】被喚醒執行。", id);//恢復執行后,等待調用的管程的線程數量減1IM.nextCount--;}}static class interf {//等待著進入管程的隊列Condition enter;//等待著進入管程的阻塞隊列中線程的數量int enterCount;//發出signal的進程掛起自己的信號量,信號量中記錄著等待調用管程的進程Semaphore next;//在next上等待的線程數int nextCount;interf(Lock lock) {enter = lock.newCondition();enterCount = 0;next = new Semaphore(0);nextCount = 0;}}static class cond {String name;Condition condition;int count;cond(Lock lock, String id) {condition = lock.newCondition();count = 0;name = id;}}//往緩沖區中投放消息public static void putMessage(String id, Integer item) throws InterruptedException {lock.lock();//如果緩沖池滿了,就掛起到notFull的阻塞隊列中log.info("執行了投放消息,緩沖池的消息的數量:【{}】", buffer.size());while (buffer.size() >= N) {log.info("緩沖池滿,線程【{}】阻塞", id);wait(id, notFull, IM);}//保證互斥訪問//IM.mutex.acquire();buffer.add(item);//IM.mutex.release();signal(id, notEmpty, IM);//... 一些別的操作lock.unlock();}//從緩沖區中取消息消費public static void getMessage(String id, Integer item) throws InterruptedException {//保證互斥訪問lock.lock();//如果緩沖池滿了,就掛起到notFull的阻塞隊列中log.info("執行了消費消息,緩沖池的消息的數量:【{}】", buffer.size());while (buffer.size() <= 0) {wait(id, notEmpty, IM);}item = buffer.remove(0);log.info("消費了一條消息:【{}】", item);//IM.mutex.release();signal(id, notFull, IM);//... 一些別的操作lock.unlock();}}? 上面的代碼,我們通過ReenTrantLock來控制線程互斥的訪問管程,管程提供的過程putMessage()、getMessage()通過先獲取lock,保證可以進入管程線程只有一個,對于ReenTrantLock的強大功能來說,在這里成了我模擬霍爾管程的一大“阻礙”,因為ReenTrantLock+Condition,即使線程因調用了condition的wait而阻塞,當被喚醒再次執行時,需要重新去獲取lock,如果獲取不到就要被插入到阻塞隊列中,只能等待lock被釋放才有可能執行。因此在上面的signal中,為了保證霍爾管程的規定,我們在阻塞當前線程時,需要先釋放lock鎖,再次被喚醒時再次重新獲得鎖,這也是signal中重復的進行釋放和獲取的原因。
? 另外,在我們的整個實現中,interf中設計被用來作為互斥進入管程的條件變量沒有用到,lock自身的強大幫我們把這部分工作做了,這里保留是為了和理論篇的偽代碼保持一致。
2.MESA管程
? 因為Java實現并發參考的就是MESA模型的管程,因此其實現的ReenTrantLock+Condition就可以很完美的實現MESA管程,我們將霍爾管程中的signal操作進行修改,代碼如下:
public static void signal(String id, cond declar, interf IM) throws InterruptedException {log.info("線程【{}】執行了釋放資源", id);if (declar.count > 0) {//掛起自己后,因為調用signal掛起自己的進程數量加1IM.nextCount++;//喚醒因為條件變量而阻塞的線程declar.condition.signal();log.info("喚醒的條件變量為:【{}】", declar.name);//log.info("釋放后所有condition中阻塞的線程數:【mutex:{},nextCount:{},notFull:{},notEmpty:{}】", IM.enterCount, IM.nextCount, notFull.count, notEmpty.count);//釋放資源,繼續執行,直至線程退出管程后,別的線程才可進入-------MESA的處理方式log.info("被掛起的線程【{}】被喚醒執行。", id);//恢復執行后,等待調用的管程的線程數量減1IM.nextCount--;} }? 我們可以看到,我們只需將Hoare上面那段為了讓線程將自己掛起并釋放鎖的代碼去掉就可以實現MESA管程了。
3.漢森管程
? 對于漢森管程,其實其實現和MESA是相同的,但是,根據其規定,signal應當是線程的最后一個操作,執行完signal操作后要立即退出管程,也就是說也釋放lock,讓別的線程可以進入管程,也就是其對應的操作代碼要如下:
//從緩沖區中取消息消費 public static void getMessage(String id, Integer item) throws InterruptedException {//保證互斥訪問lock.lock();//如果緩沖池滿了,就掛起到notFull的阻塞隊列中log.info("執行了消費消息,緩沖池的消息的數量:【{}】", buffer.size());while (buffer.size() <= 0) {wait(id, notEmpty, IM);}item = buffer.remove(0);log.info("消費了一條消息:【{}】", item);//IM.mutex.release();signal(id, notFull, IM);//signal操作應當是最后一個操作,此處不再允許有別的操作,應當立即退出管程,lock.unlock(); }? 也就是說,在每個過程執行完sginal后,應當立即退出管程,而不允許再執行別的操作。
4.執行結果分析
? 這里呢,只對比霍爾管程和MESA管程(因為MESA管程在執行順序上適合漢森管程一致的,只不過漢森管程在設計上變得更加嚴格,規定死了signal執行的位置),這里呢,我們先給生產者、消費者的代碼:
/*** 管程測試*/ @Slf4j public class MonitorsTest {static Integer count = 0;static class Producer extends Thread {Producer(String name) {super.setName(name);}@Overridepublic void run() {do {try {log.info("生產了一條消息:【{}】", count);MonitorsUtil.putMessage(this.getName(), count++);//Thread.sleep(1000);} catch (InterruptedException e) {log.error("生產消息時產生異常!");}} while (true);}}static class Consumer extends Thread {Consumer(String name) {super.setName(name);}@Overridepublic void run() {do {try {Integer item = -1;MonitorsUtil.getMessage(this.getName(), item);//Thread.sleep(1000);} catch (InterruptedException e) {log.error("消費消息時產生異常!");}} while (true);}}public static void main(String[] args) {Producer p1 = new Producer("p1");Producer p2 = new Producer("p2");Producer p3 = new Producer("p3");Producer p4 = new Producer("p4");Consumer c1 = new Consumer("c1");Consumer c2 = new Consumer("c2");Consumer c3 = new Consumer("c3");Consumer c4 = new Consumer("c4");p1.start();p2.start();p3.start();p4.start();c1.start();c2.start();c3.start();c4.start();} }? 這里因為為了簡單模擬消息,知識使用了整形的變量代替消息,首先我們來看下霍爾管程的執行結果,如下圖所示,我們可以看到,c4執行signal操作,在釋放完走遠后,立刻將自身阻塞。
? 下面我們來看下MESA管程的執行結果,如下圖所示,對于執行結果來說,兩種管程都可以保證并發的正確執行,但是MESA讓線程的執行更順暢,不會被頻繁的阻塞,從結果中也能體現。
? 又到了分隔線以下,本文到此就結束了,本文內容和代碼都是博主自己本人整理和編寫,如有錯誤,還請批評指正。
? 本文的所有java代碼都已通過測試,對其中有什么疑惑的,可以評論區留言,歡迎你的留言與討論;另外原創不易,如果本文對你有所幫助,還請留下個贊,以表支持。
? 如有興趣,還可以查看我的其他幾篇博客,都是OS的干貨(目錄),喜歡的話還請點贊、評論加關注_。
總結
以上是生活随笔為你收集整理的Java并发编程模拟管程(霍尔Hoare管程、汉森Hansan管程、MESA管程)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 今日头条信息流投放:今日头条怎么开户?多
- 下一篇: 一周XX思考(第12期)