Phaser都不懂,还学什么多线程
前面的文章中我們講到了CyclicBarrier、CountDownLatch的使用,這里再回顧一下CountDownLatch主要用在一個線程等待多個線程執行完畢的情況,而CyclicBarrier用在多個線程互相等待執行完畢的情況。
Phaser是java 7 引入的新的并發API。他引入了新的Phaser的概念,我們可以將其看成一個一個的階段,每個階段都有需要執行的線程任務,任務執行完畢就進入下一個階段。所以Phaser特別適合使用在重復執行或者重用的情況。
基本使用
在CyclicBarrier、CountDownLatch中,我們使用計數器來控制程序的順序執行,同樣的在Phaser中也是通過計數器來控制。在Phaser中計數器叫做parties, 我們可以通過Phaser的構造函數或者register()方法來注冊。
通過調用register()方法,我們可以動態的控制phaser的個數。如果我們需要取消注冊,則可以調用arriveAndDeregister()方法。
我們看下arrive:
public int arrive() {return doArrive(ONE_ARRIVAL);}Phaser中arrive實際上調用了doArrive方法,doArrive接收一個adjust參數,ONE_ARRIVAL表示arrive,ONE_DEREGISTER表示arriveAndDeregister。
Phaser中的arrive()、arriveAndDeregister()方法,這兩個方法不會阻塞,但是會返回相應的phase數字,當此phase中最后一個party也arrive以后,phase數字將會增加,即phase進入下一個周期,同時觸發(onAdvance)那些阻塞在上一phase的線程。這一點類似于CyclicBarrier的barrier到達機制;更靈活的是,我們可以通過重寫onAdvance方法來實現更多的觸發行為。
下面看一個基本的使用:
void runTasks(List<Runnable> tasks) {final Phaser phaser = new Phaser(1); // "1" to register self// create and start threadsfor (final Runnable task : tasks) {phaser.register();new Thread() {public void run() {phaser.arriveAndAwaitAdvance(); // await all creationtask.run();}}.start();}// allow threads to start and deregister selfphaser.arriveAndDeregister();}上面的例子中,我們在執行每個Runnable之前調用register()來注冊, 然后調用arriveAndAwaitAdvance()來等待這一個Phaser周期結束。最后我們調用 phaser.arriveAndDeregister();來取消注冊主線程。
下面來詳細的分析一下運行步驟:
這一步我們初始化了一個Phaser,并且指定其現在party的個數為1。
這一步注冊Runnable task到phaser,同時將party+1。
這一步將會等待直到所有的party都arrive。這里只會將步驟2中注冊的party標記為arrive,而步驟1中初始化的party一直都沒有被arrive。
在主線程中,arrive了步驟1中的party,并且將party的個數減一。
多個Phaser周期
Phaser的值是從0到Integer.MAX_VALUE,每個周期過后該值就會加一,如果到達Integer.MAX_VALUE則會繼續從0開始。
如果我們執行多個Phaser周期,則可以重寫onAdvance方法:
protected boolean onAdvance(int phase, int registeredParties) {return registeredParties == 0;}onAdvance將會在最后一個arrive()調用的時候被調用,如果這個時候registeredParties為0的話,該Phaser將會調用isTerminated方法結束該Phaser。
如果要實現多周期的情況,我們可以重寫這個方法:
protected boolean onAdvance(int phase, int registeredParties) {return phase >= iterations || registeredParties == 0;}上面的例子中,如果phase次數超過了指定的iterations次數則就會自動終止。
我們看下實際的例子:
void startTasks(List<Runnable> tasks, final int iterations) {final Phaser phaser = new Phaser() {protected boolean onAdvance(int phase, int registeredParties) {return phase >= iterations || registeredParties == 0;}};phaser.register();for (final Runnable task : tasks) {phaser.register();new Thread() {public void run() {do {task.run();phaser.arriveAndAwaitAdvance();} while (!phaser.isTerminated());}}.start();}phaser.arriveAndDeregister(); // deregister self, don't wait}上面的例子將會執行iterations次。
本文的例子請參考https://github.com/ddean2009/learn-java-concurrency/tree/master/Phaser
更多精彩內容且看:
- 區塊鏈從入門到放棄系列教程-涵蓋密碼學,超級賬本,以太坊,Libra,比特幣等持續更新
- Spring Boot 2.X系列教程:七天從無到有掌握Spring Boot-持續更新
- Spring 5.X系列教程:滿足你對Spring5的一切想象-持續更新
- java程序員從小工到專家成神之路(2020版)-持續更新中,附詳細文章教程
更多內容請訪問flydean的博客
總結
以上是生活随笔為你收集整理的Phaser都不懂,还学什么多线程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于CompletableFuture的
- 下一篇: java中使用Semaphore构建阻塞