高并发编程-线程通信_使用wait和notify进行线程间的通信
文章目錄
- 概述
- 場景
- 引子
- synchronized wait/notify機制
- synchronized wait/notify 改造
- 問題
概述
Java中線程通信協作的最常見的兩種方式:
- syncrhoized加鎖的線程的Object類的wait()/notify()/notifyAll()
- ReentrantLock類加鎖的線程的Condition類的await()/signal()/signalAll()
線程間直接的數據交換:
- 通過管道進行線程間通信:1)字節流;2)字符流
可參考: Java多線程編程核心技術
場景
場景假設:
一個工作臺,兩個工人: Worker A 和 Workder B .
約定,Worker A 生產貨物到工作臺上, Workder B 從工作臺 取走(消費)貨物。
- 當 工作臺上沒有貨物時,Worker A 才生產貨物,否則等待Worker B 取走(消費)貨物。
- 當 工作臺上有貨物時, Woker B 才從工作臺取走(消費)貨物,否則等待Worker A 生產貨物
引子
我們先來看下線程之間不通信的情況 (錯誤示例)
package com.artisan.test;public class ProduceConsumeWrongDemo {// 鎖private final Object LOCK = new Object();// 模擬多線程間需要通信的數據 iprivate int i = 0 ;public void produce() throws InterruptedException {// 加鎖synchronized (LOCK){System.out.println("produce:" + i++);Thread.sleep(1_000);}}public void consume() throws InterruptedException{// 加鎖synchronized (LOCK){System.out.println("consume:" + i);Thread.sleep(1_000);}}public static void main(String[] args) throws InterruptedException{ProduceConsumeWrongDemo pc = new ProduceConsumeWrongDemo();// 生產線程new Thread(()->{while (true){try {pc.produce();} catch (InterruptedException e) {e.printStackTrace();}}}).start();// 消費線程new Thread(()->{while (true){try {pc.consume();} catch (InterruptedException e) {e.printStackTrace();}}}).start();} }運行結果:
"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=52137:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.test.ProduceConsumeWrongDemo produce:0 produce:1 consume:2 consume:2 consume:2 produce:2 consume:3 consume:3 consume:3 produce:3 produce:4 produce:5 consume:6 .... .... .... .... .... .... ....很明顯的可以看到,數據都是錯亂的,因為沒有線程間的通信,全憑CPU調度,生產線程和消費線程都很隨意,數據一團糟糕,那該如何改進呢?
synchronized wait/notify機制
- wait()——讓當前線程 (Thread.concurrentThread()
方法所返回的線程) 釋放對象鎖并進入等待(阻塞)狀態。 - notify()——喚醒一個正在等待相應對象鎖的線程,使其進入就緒隊列,以便在當前線程釋放鎖后競爭鎖,進而得到CPU的執行。
- notifyAll()——喚醒所有正在等待相應對象鎖的線程,使它們進入就緒隊列,以便在當前線程釋放鎖后競爭鎖,進而得到CPU的執行。
為了解決上面的問題,我們先來了解下synchronized wait/notify .
-
wait()、notify()和notifyAll()方法是本地方法,并且為final方法,無法被重寫。
-
調用某個對象的wait()方法能讓當前線程阻塞,并且當前線程必須擁有此對象的monitor(即鎖). 因此調用wait()方法必須在同步塊或者同步方法中進行(synchronized塊或者synchronized方法)。如果當前線程沒有這個對象的鎖就調用wait()方法,則會拋出IllegalMonitorStateException.
-
調用某個對象的wait()方法,相當于讓當前線程交出(釋放)此對象的monitor,然后進入等待狀態,等待后續再次獲得此對象的鎖
-
調用某個對象的notify()方法能夠喚醒一個正在等待這個對象的monitor的線程,如果有多個線程都在等待這個對象的monitor,則只能喚醒其中一個線程. 同樣的,調用某個對象的notify()方法,當前線程也必須擁有這個對象的monitor,因此調用notify()方法必須在同步塊或者同步方法中進行(synchronized塊或者synchronized方法)。
-
調用notifyAll()方法能夠喚醒所有正在等待這個對象的monitor的線程
-
notify()和notifyAll()方法只是喚醒等待該對象的monitor的線程,并不決定哪個線程能夠獲取到monitor。
舉個例子: 假如有三個線程Thread1、Thread2和Thread3都在等待對象objectA的monitor,此時Thread4擁有對象objectA的monitor,當在Thread4中調用objectA.notify()方法之后,Thread1、Thread2和Thread3只有一個能被喚醒。
注意,被喚醒不等于立刻就獲取了objectA的monitor。
假若在Thread4中調用objectA.notifyAll()方法,則Thread1、Thread2和Thread3三個線程都會被喚醒,至于哪個線程接下來能夠獲取到objectA的monitor就具體依賴于操作系統的調度了。
一個線程被喚醒不代表立即獲取了對象的monitor,只有等調用完notify()或者notifyAll()并退出synchronized塊,釋放對象鎖后,其余線程才可獲得鎖執行。
synchronized wait/notify 改造
package com.artisan.test;public class ProduceConsumerDemo {// 對象監視器-鎖private final Object LOCK = new Object();// 是否生產出數據的標識private boolean isProduced = false;// volatile 確保可見性, 假設 i 就是生產者生產的數據private volatile int i = 0 ;public void produce(){// 加鎖synchronized (LOCK){if (isProduced){try {// 讓當前線程 (Thread.concurrentThread() 方法所返回的線程) 釋放對象鎖并進入等待(阻塞)狀態// 如果已經生產,則等待LOCK.wait();} catch (InterruptedException e) {e.printStackTrace();}}else{// 生產數據i++;System.out.println("Produce:" + i);// 喚醒一個正在等待相應對象鎖的線程,使其進入就緒隊列,以便在當前線程釋放鎖后競爭鎖,進而得到CPU的執行// 通知等待的Worker B 來消費數據LOCK.notify();// 將生產標識置為trueisProduced = true;}}}public void consume(){// 加鎖synchronized (LOCK){if (isProduced){// 消費數據System.out.println("Consume:" + i);// 喚醒一個正在等待相應對象鎖的線程,使其進入就緒隊列,以便在當前線程釋放鎖后競爭鎖,進而得到CPU的執行// 通知 等待的Wokrer A 生產數據LOCK.notify();// 已經消費完了,將生產標識置為falseisProduced = false;}else{try {// 讓當前線程 (Thread.concurrentThread() 方法所返回的線程) 釋放對象鎖并進入等待(阻塞)狀態// 未生產,Worker B等待LOCK.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {ProduceConsumerDemo produceConsumerDemo = new ProduceConsumerDemo();new Thread(){@Overridepublic void run() {while(true) produceConsumerDemo.produce();}}.start();new Thread(){@Overridepublic void run() {while(true) produceConsumerDemo.consume();}}.start();} }
當然了并不是絕對的上面的對應關系(這里只是為了演示),因為notify喚醒后,線程只是進入Runnable狀態,至于哪個線程能進入到running狀態,就看哪個線程能搶到CPU的資源了。 JVM規范并沒有規定哪個線程優先得到執行權,每個JVM的實現都是不同的
單個生產者 單個消費者,運行OK
..... ..... .....Produce:1171 Consume:1171 Produce:1172 Consume:1172 Produce:1173 Consume:1173 Produce:1174 Consume:1174 Produce:1175 Consume:1175 Produce:1176 Consume:1176..... ..... .....問題
單個生產者 單個消費者 上面的代碼是沒有問題的,加入有多個生產者 和多個消費者呢?
我們來復用上面的代碼來演示下 ,其他代碼保持不變,僅在main方法中改造下,兩個生產者,兩個消費者
Stream.of("P1","P2").forEach(n-> new Thread(){@Overridepublic void run() {while(true) produceConsumerDemo.produce();}}.start());Stream.of("C1","C2").forEach(n->new Thread(){@Overridepublic void run() {while(true) produceConsumerDemo.consume();}}.start());下篇博客,我們來分析下原因,并給出解決辦法
總結
以上是生活随笔為你收集整理的高并发编程-线程通信_使用wait和notify进行线程间的通信的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高并发编程-Thread_正确关闭线程的
- 下一篇: 高并发编程-线程通信_使用wait和no