高并发编程-线程生产者消费者的综合示例
生活随笔
收集整理的這篇文章主要介紹了
高并发编程-线程生产者消费者的综合示例
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
文章目錄
- 需求
- 實(shí)現(xiàn)
需求
需求: 假設(shè)有10個(gè)線程,最多同時(shí)運(yùn)行5個(gè)
要求: 不使用線程池,使用synchronized-wait¬ifyAll機(jī)制
實(shí)現(xiàn)
詳見注釋
package com.artisan.test;import java.time.LocalTime; import java.util.*;/*** 需求: 假設(shè)有10個(gè)線程,最多同時(shí)運(yùn)行5個(gè)* 要求: 不使用線程池,使用synchronized-wait¬ifyAll機(jī)制*/ public class ExerciseDemo {// 鎖 Monitorprivate static final LinkedList<Control> CONTROLLIST = new LinkedList();// 同時(shí)運(yùn)行的最大線程數(shù)private static final int MAX_THREADS = 5;/*** 創(chuàng)建線程* @param threadName 線程名稱* @return*/public static Thread createWorkThread(String threadName){return new Thread(() ->{// 加鎖synchronized (CONTROLLIST){Optional.of(Thread.currentThread().getName() + " GOT LOCK ,BEGIN..." + LocalTime.now().withNano(0)).ifPresent(System.out::println);// 使用while// 當(dāng)集合中運(yùn)行的線程數(shù)量大于5時(shí),wait,放棄鎖,不執(zhí)行while (CONTROLLIST.size() >= MAX_THREADS){try {Optional.of(Thread.currentThread().getName() + " WAIT").ifPresent(System.out::println);CONTROLLIST.wait();} catch (InterruptedException e) {e.printStackTrace();}}// 加入到LinkedList最后CONTROLLIST.addLast(new Control());}//模擬每個(gè)線程的業(yè)務(wù),假設(shè)需要10秒才能結(jié)束Optional.of(Thread.currentThread().getName() + " working..." + LocalTime.now().withNano(0)).ifPresent(System.out::println);try {Thread.sleep(10_000);} catch (InterruptedException e) {e.printStackTrace();}// 輸出業(yè)務(wù)完成Optional.of(Thread.currentThread().getName() + " END..." + LocalTime.now().withNano(0)).ifPresent(System.out::println);// 加鎖synchronized (CONTROLLIST){// 移除最上面的線程CONTROLLIST.removeFirst();// 喚醒其他所有等待的線程CONTROLLIST.notifyAll();}},threadName);}/*** 主流程* @param args*/public static void main(String[] args) {List<Thread> workers = new ArrayList();Arrays.asList("T1","T2","T3","T4","T5","T6","T7","T8","T9","T10").stream().map(ExerciseDemo::createWorkThread).forEach(t->{// 啟動(dòng)線程t.start();// 加入到集合列表,待后續(xù)一起joinworkers.add(t);});// 比那里保存線程的集合,10個(gè)線程 joinworkers.stream().forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});// 全部完成后,輸出結(jié)束標(biāo)識(shí)Optional.of("DONE").ifPresent(System.out::println);}/*** 沒啥實(shí)質(zhì)做用,僅僅是個(gè)控制標(biāo)識(shí)*/static class Control{}}運(yùn)行日志
"E:\Program Files\Java\jdk1.8.0_161\bin\java" "-javaagent:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\lib\idea_rt.jar=60076:E:\Program Files\JetBrains\IntelliJ IDEA 2017.2.4\bin" -Dfile.encoding=UTF-8 -classpath "E:\Program Files\Java\jdk1.8.0_161\jre\lib\charsets.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\deploy.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\access-bridge-64.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\cldrdata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\dnsns.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jaccess.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\jfxrt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\localedata.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\nashorn.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunec.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunjce_provider.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunmscapi.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\sunpkcs11.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\ext\zipfs.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\javaws.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jce.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfr.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jfxswt.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\jsse.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\management-agent.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\plugin.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\resources.jar;E:\Program Files\Java\jdk1.8.0_161\jre\lib\rt.jar;D:\IdeaProjects\mvc\target\classes" com.artisan.test.ExerciseDemo T1 GOT LOCK ,BEGIN...00:03:41 T10 GOT LOCK ,BEGIN...00:03:41 T9 GOT LOCK ,BEGIN...00:03:41 T8 GOT LOCK ,BEGIN...00:03:41 T1 working...00:03:41 T7 GOT LOCK ,BEGIN...00:03:41 T8 working...00:03:41 T7 working...00:03:41 T10 working...00:03:41 T9 working...00:03:41 T6 GOT LOCK ,BEGIN...00:03:41 T6 WAIT T5 GOT LOCK ,BEGIN...00:03:41 T5 WAIT T4 GOT LOCK ,BEGIN...00:03:41 T4 WAIT T3 GOT LOCK ,BEGIN...00:03:41 T3 WAIT T2 GOT LOCK ,BEGIN...00:03:41 T2 WAIT T10 END...00:03:51 T8 END...00:03:51 T2 working...00:03:51 T4 WAIT T3 working...00:03:51 T5 WAIT T6 WAIT T1 END...00:03:51 T6 working...00:03:51 T5 WAIT T4 WAIT T9 END...00:03:51 T4 working...00:03:51 T5 WAIT T7 END...00:03:51 T5 working...00:03:51 T2 END...00:04:01 T3 END...00:04:01 T6 END...00:04:01 T4 END...00:04:01 T5 END...00:04:01 DONEProcess finished with exit code 0首先主線程中 初始化10個(gè)線程,分別命名為T1 … T10,先把這10個(gè)線程臨時(shí)存放到集合
遍歷集合,分別join . 不能在上一步的地方j(luò)oin , 這樣的話就只能一個(gè)線程 一個(gè)線程的執(zhí)行了(join會(huì)阻塞當(dāng)前線程)
10個(gè)線程全部完成后,打印DONE
完成主要部分的編碼后,就需要關(guān)注thread具體的業(yè)務(wù)邏輯了 : 定義一個(gè)鎖 LinkedList<Control> ,當(dāng)線程獲取到鎖,就將Control添加到Monitor中,如果大于規(guī)定的線程數(shù),則wait
業(yè)務(wù)部分并行執(zhí)行,當(dāng)一個(gè)線程完成后,獲取鎖,從Monitor中移除一個(gè)Control, 然后notifyAll所有正在等待的線程
符合需求 ,OK
總結(jié)
以上是生活随笔為你收集整理的高并发编程-线程生产者消费者的综合示例的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 高并发编程-深入分析wait和sleep
- 下一篇: 高并发编程-自定义带有超时功能的锁