实现生产者消费者的三种方式
文章目錄
- wait/notify的消息通知機制
- 預備知識
- wait/notify消息通知潛在的一些問題
- notify過早通知
- 等待wait的條件發生變化
- 假死狀態
- wait/notifyAll實現生產者-消費者
- 使用Lock中Condition的await/signalAll實現生產者-消費者
- 使用BlockingQueue實現生產者-消費者
生產者-消費者模式是一個十分經典的多線程并發協作的模式,弄懂生產者-消費者問題能夠讓我們對并發編程的理解加深。所謂生產者-消費者問題,實際上主要是包含了兩類線程,一種是生產者線程用于生產數據,另一種是消費者線程用于消費數據,為了解耦生產者和消費者的關系,通常會有一個共享的數據區域,就像是一個倉庫,生產者生產數據之后直接放置在共享數據區中,并不需要關心消費者的行為;而消費者只需要從共享數據區中去獲取數據,就不再需要關心生產者的行為。但是,這個共享數據區域中應該具備這樣的線程間并發協作的功能:
在實現生產者消費者問題時,可以采用三種方式:
-
使用Object的wait/notify的消息通知機制;
-
使用Lock的Condition的await/signal的消息通知機制;
-
使用BlockingQueue實現。
本文主要將這三種實現方式進行總結歸納。
wait/notify的消息通知機制
預備知識
Java 中,可以通過配合調用 Object 對象的 wait() 方法和 notify()方法或 notifyAll() 方法來實現線程間的通信。在線程中調用 wait() 方法,將阻塞當前線程,直至等到其他線程調用了 notify() 方法或 notifyAll() 方法進行通知之后,當前線程才能從wait()方法出返回,繼續執行下面的操作。
wait
該方法用來將當前線程置入休眠狀態,直到接到通知或被中斷為止。在調用 wait()之前,線程必須要獲得該對象的對象監視器鎖,即只能在同步方法或同步塊中調用 wait()方法。調用wait()方法之后,當前線程會釋放鎖。如果調用wait()方法時,線程并未獲取到鎖的話,則會拋出IllegalMonitorStateException異常,這是以個RuntimeException。如果再次獲取到鎖的話,當前線程才能從wait()方法處成功返回。
notify
該方法也要在同步方法或同步塊中調用,即在調用前,線程也必須要獲得該對象的對象級別鎖,如果調用 notify()時沒有持有適當的鎖,也會拋出 IllegalMonitorStateException。
該方法任意從WAITTING狀態的線程中挑選一個進行通知,使得調用wait()方法的線程從等待隊列移入到同步隊列中,等待有機會再一次獲取到鎖,從而使得調用wait()方法的線程能夠從wait()方法處退出。調用notify后,當前線程不會馬上釋放該對象鎖,要等到程序退出同步塊后,當前線程才會釋放鎖。
notifyAll
該方法與 notify ()方法的工作方式相同,重要的一點差異是:
notifyAll 使所有原來在該對象上 wait 的線程統統退出WAITTING狀態,使得他們全部從等待隊列中移入到同步隊列中去,等待下一次能夠有機會獲取到對象監視器鎖。
wait/notify消息通知潛在的一些問題
notify過早通知
notify 通知的遺漏很容易理解,即 threadA 還沒開始 wait 的時候,threadB 已經 notify 了,這樣,threadB 通知是沒有任何響應的,當 threadB 退出 synchronized 代碼塊后,threadA 再開始 wait,便會一直阻塞等待,直到被別的線程打斷。比如在下面的示例代碼中,就模擬出notify早期通知帶來的問題:
public class EarlyNotifyDemo1 {private static String lockObject = "";public static void main(String[] args) {WaitThread waitThread = new WaitThread(lockObject);NotifyThread notifyThread = new NotifyThread(lockObject);notifyThread.start();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}waitThread.start();}static class WaitThread extends Thread {private String lock;public WaitThread(String lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {try {System.out.println(Thread.currentThread().getName() + " 進去代碼塊");System.out.println(Thread.currentThread().getName() + " 開始wait");lock.wait();System.out.println(Thread.currentThread().getName() + " 結束wait");} catch (InterruptedException e) {e.printStackTrace();}}}}static class NotifyThread extends Thread {private String lock;public NotifyThread(String lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println(Thread.currentThread().getName() + " 進去代碼塊");System.out.println(Thread.currentThread().getName() + " 開始notify");lock.notify();System.out.println(Thread.currentThread().getName() + " 結束開始notify");}}} }輸出結果
Thread-1 進去代碼塊 Thread-1 開始notify Thread-1 結束開始notify Thread-0 進去代碼塊 Thread-0 開始wait示例中開啟了兩個線程,一個是WaitThread,另一個是NotifyThread。NotifyThread會先啟動,先調用notify方法。然后WaitThread線程才啟動,調用wait方法,但是由于通知過了,wait方法就無法再獲取到相應的通知,因此WaitThread會一直在wait方法出阻塞,這種現象就是通知過早的現象。針對這種現象,解決方法,一般是添加一個狀態標志,讓waitThread調用wait方法前先判斷狀態是否已經改變了沒,如果通知早已發出的話,WaitThread就不再去wait。對上面的代碼進行更正:
public class EarlyNotifyDemo2 {private static String lockObject = "";private static boolean isWait = true;public static void main(String[] args) {WaitThread waitThread = new WaitThread(lockObject);NotifyThread notifyThread = new NotifyThread(lockObject);notifyThread.start();try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}waitThread.start();}static class WaitThread extends Thread {private String lock;public WaitThread(String lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {try {while (isWait) {System.out.println(Thread.currentThread().getName() + " 進去代碼塊");System.out.println(Thread.currentThread().getName() + " 開始wait");lock.wait();System.out.println(Thread.currentThread().getName() + " 結束wait");}} catch (InterruptedException e) {e.printStackTrace();}}}}static class NotifyThread extends Thread {private String lock;public NotifyThread(String lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println(Thread.currentThread().getName() + " 進去代碼塊");System.out.println(Thread.currentThread().getName() + " 開始notify");lock.notifyAll();isWait = false;System.out.println(Thread.currentThread().getName() + " 結束開始notify");}}} }這段代碼只是增加了一個isWait狀態變量,NotifyThread調用notify方法后會對狀態變量進行更新,在WaitThread中調用wait方法之前會先對狀態變量進行判斷,在該示例中,調用notify后將狀態變量isWait改變為false,因此,在WaitThread中while對isWait判斷后就不會執行wait方法,從而避免了Notify過早通知造成遺漏的情況。
總結:在使用線程的等待/通知機制時,一般都要配合一個 boolean 變量值(或者其他能夠判斷真假的條件),在 notify 之前改變該 boolean 變量的值,讓 wait 返回后能夠退出 while 循環(一般都要在 wait 方法外圍加一層 while 循環,以防止早期通知),或在通知被遺漏后,不會被阻塞在 wait 方法處。這樣便保證了程序的正確性
等待wait的條件發生變化
如果線程在等待時接受到了通知,但是之后等待的條件發生了變化,并沒有再次對等待條件進行判斷,也會導致程序出現錯誤。
下面用一個例子來說明這種情況
public class ConditionChangeDemo1 {private static List<String> lockObject = new ArrayList();public static void main(String[] args) {Consumer consumer1 = new Consumer(lockObject);Consumer consumer2 = new Consumer(lockObject);Productor productor = new Productor(lockObject);consumer1.start();consumer2.start();productor.start();}static class Consumer extends Thread {private List<String> lock;public Consumer(List lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {try {//這里使用if的話,就會存在wait條件變化造成程序錯誤的問題if (lock.isEmpty()) {System.out.println(Thread.currentThread().getName() + " list為空");System.out.println(Thread.currentThread().getName() + " 調用wait方法");lock.wait();System.out.println(Thread.currentThread().getName() + " wait方法結束");}String element = lock.remove(0);System.out.println(Thread.currentThread().getName() + " 取出第一個元素為:" + element);} catch (InterruptedException e) {e.printStackTrace();}}}}static class Productor extends Thread {private List<String> lock;public Productor(List lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println(Thread.currentThread().getName() + " 開始添加元素");lock.add(Thread.currentThread().getName());lock.notifyAll();}}}}輸出結果
Thread-0 list為空 Thread-0 調用wait方法 Thread-2 開始添加元素 Thread-1 取出第一個元素為:Thread-2 Thread-0 wait方法結束 Exception in thread "Thread-0" java.lang.IndexOutOfBoundsException: Index: 0, Size: 0異常原因分析:在這個例子中一共開啟了3個線程,Consumer1,Consumer2以及Productor。首先Consumer1調用了wait方法后,線程處于了WAITTING狀態,并且將對象鎖釋放出來。因此,Consumer2能夠獲取對象鎖,從而進入到同步代塊中,當執行到wait方法時,同樣的也會釋放對象鎖。因此,productor能夠獲取到對象鎖,進入到同步代碼塊中,向list中插入數據后,通過notifyAll方法通知處于WAITING狀態的Consumer1和Consumer2線程。consumer1得到對象鎖后,從wait方法出退出,刪除了一個元素讓List為空,方法執行結束,退出同步塊,釋放掉對象鎖。這個時候Consumer2獲取到對象鎖后,從wait方法退出,繼續往下執行,這個時候Consumer2再執行lock.remove(0);就會出錯,因為List由于Consumer1刪除一個元素之后已經為空了。
解決方案:通過上面的分析,可以看出Consumer2報異常是因為線程從wait方法退出之后沒有再次對wait條件進行判斷,因此,此時的wait條件已經發生了變化。解決辦法就是,在wait退出之后再對條件進行判斷即可。
public class ConditionChangeDemo2 {private static List<String> lockObject = new ArrayList();public static void main(String[] args) {Consumer consumer1 = new Consumer(lockObject);Consumer consumer2 = new Consumer(lockObject);Productor productor = new Productor(lockObject);consumer1.start();consumer2.start();productor.start();}static class Consumer extends Thread {private List<String> lock;public Consumer(List lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {try {//這里使用if的話,就會存在wait條件變化造成程序錯誤的問題while (lock.isEmpty()) {System.out.println(Thread.currentThread().getName() + " list為空");System.out.println(Thread.currentThread().getName() + " 調用wait方法");lock.wait();System.out.println(Thread.currentThread().getName() + " wait方法結束");}String element = lock.remove(0);System.out.println(Thread.currentThread().getName() + " 取出第一個元素為:" + element);} catch (InterruptedException e) {e.printStackTrace();}}}}static class Productor extends Thread {private List<String> lock;public Productor(List lock) {this.lock = lock;}@Overridepublic void run() {synchronized (lock) {System.out.println(Thread.currentThread().getName() + " 開始添加元素");lock.add(Thread.currentThread().getName());lock.notifyAll();}}}}輸出結果
Thread-0 list為空 Thread-0 調用wait方法 Thread-2 開始添加元素 Thread-1 取出第一個元素為:Thread-2 Thread-0 wait方法結束 Thread-0 list為空 Thread-0 調用wait方法上面的代碼與之前的代碼僅僅只是將 wait 外圍的 if 語句改為 while 循環即可,這樣當 list 為空時,線程便會繼續等待,而不會繼續去執行刪除 list 中元素的代碼。
總結:在使用線程的等待/通知機制時,一般都要在 while 循環中調用 wait()方法,因此配合使用一個 boolean 變量(或其他能判斷真假的條件,如本文中的 list.isEmpty()),滿足 while 循環的條件時,進入 while 循環,執行 wait()方法,不滿足 while 循環的條件時,跳出循環,執行后面的代碼。
假死狀態
現象:如果是多消費者和多生產者情況,如果使用notify方法可能會出現“假死”的情況,即喚醒的是同類線程。
原因分析:假設當前多個生產者線程會調用wait方法阻塞等待,當其中的生產者線程獲取到對象鎖之后使用notify通知處于WAITTING狀態的線程,如果喚醒的仍然是生產者線程,就會造成所有的生產者線程都處于等待狀態。
解決辦法:將notify方法替換成notifyAll方法,如果使用的是lock的話,就將signal方法替換成signalAll方法。
總結
在Object提供的消息通知機制應該遵循如下這些條件:
基本的使用范式如下:
// The standard idiom for calling the wait method in Java synchronized (sharedObject) { while (condition) { sharedObject.wait(); // (Releases lock, and reacquires on wakeup) } // do action based upon condition e.g. take or put into queue }wait/notifyAll實現生產者-消費者
利用wait/notifyAll實現生產者和消費者代碼如下:
public class ProductorConsumerDemo1 {public static void main(String[] args) {LinkedList linkedList = new LinkedList();ExecutorService service = Executors.newFixedThreadPool(15);for (int i = 0; i < 5; i++) {service.submit(new Productor(linkedList, 8));}for (int i = 0; i < 10; i++) {service.submit(new Consumer(linkedList));}}static class Productor implements Runnable {private List<Integer> list;private int maxLength;public Productor(List list, int maxLength) {this.list = list;this.maxLength = maxLength;}@Overridepublic void run() {while (true) {synchronized (list) {try {while (list.size() == maxLength) {System.out.println("生產者" + Thread.currentThread().getName() + " list以達到最大容量,進行wait");list.wait();System.out.println("生產者" + Thread.currentThread().getName() + " 退出wait");}Random random = new Random();int i = random.nextInt();System.out.println("生產者" + Thread.currentThread().getName() + " 生產數據" + i);list.add(i);list.notifyAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}}}static class Consumer implements Runnable {private List<Integer> list;public Consumer(List list) {this.list = list;}@Overridepublic void run() {while (true) {synchronized (list) {try {while (list.isEmpty()) {System.out.println("消費者" + Thread.currentThread().getName() + " list為空,進行wait");list.wait();System.out.println("消費者" + Thread.currentThread().getName() + " 退出wait");}Integer element = list.remove(0);System.out.println("消費者" + Thread.currentThread().getName() + " 消費數據:" + element);list.notifyAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}}}}}輸出結果
生產者pool-1-thread-2 生產數據-703210513 生產者pool-1-thread-2 生產數據-1025434820 生產者pool-1-thread-2 生產數據70070412 生產者pool-1-thread-2 生產數據-598504371 生產者pool-1-thread-2 生產數據-716978999 生產者pool-1-thread-2 生產數據-1175198461 生產者pool-1-thread-2 生產數據-1212912406 生產者pool-1-thread-2 生產數據-332467186 生產者pool-1-thread-2 list以達到最大容量,進行wait 消費者pool-1-thread-15 消費數據:-703210513 消費者pool-1-thread-15 消費數據:-1025434820 消費者pool-1-thread-15 消費數據:70070412 消費者pool-1-thread-15 消費數據:-598504371 消費者pool-1-thread-15 消費數據:-716978999 消費者pool-1-thread-15 消費數據:-1175198461 消費者pool-1-thread-15 消費數據:-1212912406 消費者pool-1-thread-15 消費數據:-332467186 消費者pool-1-thread-15 list為空,進行wait 消費者pool-1-thread-14 list為空,進行wait 消費者pool-1-thread-13 list為空,進行wait 消費者pool-1-thread-11 list為空,進行wait 消費者pool-1-thread-12 list為空,進行wait 消費者pool-1-thread-10 list為空,進行wait 消費者pool-1-thread-9 list為空,進行wait 消費者pool-1-thread-8 list為空,進行wait 消費者pool-1-thread-7 list為空,進行wait 消費者pool-1-thread-6 list為空,進行wait 生產者pool-1-thread-5 生產數據84590545 生產者pool-1-thread-5 生產數據-1631754695使用Lock中Condition的await/signalAll實現生產者-消費者
參照Object的wait和notify/notifyAll方法,Condition也提供了同樣的方法:
針對wait方法
void await() throws InterruptedException:當前線程進入等待狀態,如果其他線程調用condition的signal或者signalAll方法并且當前線程獲取Lock從await方法返回,如果在等待狀態中被中斷會拋出被中斷異常;
long awaitNanos(long nanosTimeout):當前線程進入等待狀態直到被通知,中斷或者超時;
boolean await(long time, TimeUnit unit)throws InterruptedException:同第二種,支持自定義時間單位
boolean awaitUntil(Date deadline) throws InterruptedException:當前線程進入等待狀態直到被通知,中斷或者到了某個時間
針對notify方法
void signal():喚醒一個等待在condition上的線程,將該線程從等待隊列中轉移到同步隊列中,如果在同步隊列中能夠競爭到Lock則可以從等待方法中返回。
void signalAll():與signal的區別在于能夠喚醒所有等待在condition上的線程
也就是說wait—>await,notify---->Signal。另外,關于lock中condition消息通知的原理解析可以看這篇文章。
如果采用lock中Conditon的消息通知原理來實現生產者-消費者問題,原理同使用wait/notifyAll一樣。直接上代碼:
public class ProductorConsumerDemo2 {private static ReentrantLock lock = new ReentrantLock();private static Condition full = lock.newCondition();private static Condition empty = lock.newCondition();public static void main(String[] args) {LinkedList linkedList = new LinkedList();ExecutorService service = Executors.newFixedThreadPool(15);for (int i = 0; i < 5; i++) {service.submit(new Productor(linkedList, 8, lock));}for (int i = 0; i < 10; i++) {service.submit(new Consumer(linkedList, lock));}}static class Productor implements Runnable {private List<Integer> list;private int maxLength;private Lock lock;public Productor(List list, int maxLength, Lock lock) {this.list = list;this.maxLength = maxLength;this.lock = lock;}@Overridepublic void run() {while (true) {lock.lock();try {while (list.size() == maxLength) {System.out.println("生產者" + Thread.currentThread().getName() + " list以達到最大容量,進行wait");full.await();System.out.println("生產者" + Thread.currentThread().getName() + " 退出wait");}Random random = new Random();int i = random.nextInt();System.out.println("生產者" + Thread.currentThread().getName() + " 生產數據" + i);list.add(i);empty.signalAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}static class Consumer implements Runnable {private List<Integer> list;private Lock lock;public Consumer(List list, Lock lock) {this.list = list;this.lock = lock;}@Overridepublic void run() {while (true) {lock.lock();try {while (list.isEmpty()) {System.out.println("消費者" + Thread.currentThread().getName() + " list為空,進行wait");empty.await();System.out.println("消費者" + Thread.currentThread().getName() + " 退出wait");}Integer element = list.remove(0);System.out.println("消費者" + Thread.currentThread().getName() + " 消費數據:" + element);full.signalAll();Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}}}}輸出結果
生產者pool-1-thread-1 生產數據-1633842993 生產者pool-1-thread-1 生產數據1337251950 生產者pool-1-thread-1 生產數據1310879631 生產者pool-1-thread-1 生產數據-214297115 生產者pool-1-thread-1 生產數據738937512 生產者pool-1-thread-1 生產數據13060041 生產者pool-1-thread-1 生產數據-957049554 生產者pool-1-thread-1 生產數據-1062017880 生產者pool-1-thread-1 list以達到最大容量,進行wait 生產者pool-1-thread-2 list以達到最大容量,進行wait 生產者pool-1-thread-3 list以達到最大容量,進行wait 生產者pool-1-thread-4 list以達到最大容量,進行wait 生產者pool-1-thread-5 list以達到最大容量,進行wait 消費者pool-1-thread-6 消費數據:-1633842993 消費者pool-1-thread-6 消費數據:1337251950 消費者pool-1-thread-6 消費數據:1310879631 消費者pool-1-thread-6 消費數據:-214297115 消費者pool-1-thread-6 消費數據:738937512 消費者pool-1-thread-6 消費數據:13060041 消費者pool-1-thread-6 消費數據:-957049554 消費者pool-1-thread-6 消費數據:-1062017880 消費者pool-1-thread-6 list為空,進行wait 消費者pool-1-thread-7 list為空,進行wait 消費者pool-1-thread-8 list為空,進行wait 消費者pool-1-thread-9 list為空,進行wait 消費者pool-1-thread-10 list為空,進行wait 消費者pool-1-thread-11 list為空,進行wait 消費者pool-1-thread-12 list為空,進行wait 消費者pool-1-thread-13 list為空,進行wait 消費者pool-1-thread-14 list為空,進行wait 消費者pool-1-thread-15 list為空,進行wait 生產者pool-1-thread-1 退出wait 生產者pool-1-thread-1 生產數據1949864858 生產者pool-1-thread-1 生產數據-1693880970使用BlockingQueue實現生產者-消費者
由于BlockingQueue內部實現就附加了兩個阻塞操作。即當隊列已滿時,阻塞向隊列中插入數據的線程,直至隊列中未滿;當隊列為空時,阻塞從隊列中獲取數據的線程,直至隊列非空時為止。關于BlockingQueue更多細節可以看這篇文章??梢岳肂lockingQueue實現生產者-消費者為題,阻塞隊列完全可以充當共享數據區域,就可以很好的完成生產者和消費者線程之間的協作。
public class ProductorConsumerDmoe3 {private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>();public static void main(String[] args) {ExecutorService service = Executors.newFixedThreadPool(15);for (int i = 0; i < 5; i++) {service.submit(new Productor(queue));}for (int i = 0; i < 10; i++) {service.submit(new Consumer(queue));}}static class Productor implements Runnable {private BlockingQueue queue;public Productor(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Random random = new Random();int i = random.nextInt();System.out.println("生產者" + Thread.currentThread().getName() + "生產數據" + i);queue.put(i);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}static class Consumer implements Runnable {private BlockingQueue queue;public Consumer(BlockingQueue queue) {this.queue = queue;}@Overridepublic void run() {try {while (true) {Integer element = (Integer) queue.take();System.out.println("消費者" + Thread.currentThread().getName() + "正在消費數據" + element);Thread.sleep(1000);}} catch (InterruptedException e) {e.printStackTrace();}}}}輸出結果
生產者pool-1-thread-2生產數據-1056722868 生產者pool-1-thread-1生產數據-1217947426 生產者pool-1-thread-3生產數據590686437 生產者pool-1-thread-4生產數據1782376429 生產者pool-1-thread-5生產數據1558897279 消費者pool-1-thread-6正在消費數據-1056722868 消費者pool-1-thread-7正在消費數據-1217947426 消費者pool-1-thread-8正在消費數據590686437 消費者pool-1-thread-9正在消費數據1782376429 消費者pool-1-thread-10正在消費數據1558897279 生產者pool-1-thread-4生產數據1977644261 生產者pool-1-thread-3生產數據182370155 消費者pool-1-thread-11正在消費數據1977644261 生產者pool-1-thread-2生產數據949821636 生產者pool-1-thread-5生產數據1931032717 消費者pool-1-thread-13正在消費數據949821636 生產者pool-1-thread-1生產數據873417555 消費者pool-1-thread-14正在消費數據1931032717 消費者pool-1-thread-12正在消費數據182370155 消費者pool-1-thread-15正在消費數據873417555可以看出,使用BlockingQueue來實現生產者-消費者很簡潔,這正是利用了BlockingQueue插入和獲取數據附加阻塞操作的特性。
關于生產者-消費者實現的三中方式,到這里就全部總結出來,如果覺得不錯的話,請點贊,也算是給我的鼓勵,在此表示感謝!
總結
以上是生活随笔為你收集整理的实现生产者消费者的三种方式的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: threejs指南针【控制中心计算角度】
- 下一篇: 【机器学习】--神经网络(NN)