多线程之线程池探索
多線程應用執行時JDK提供了線程的載體,線程池,通過線程池管理線程,優化線程的執行,有效合理利用資源。而JDK提供的線程池有四大類:FixedThreadPool,SingleThreadExecutor,CachedThreadPool,ScheduledThreadPool。這四種池各有特點?,F在一一來看。
第一,FixedThreadPool,來自jdk的解釋是這樣的:創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。在任意點,在大多數 nThreads 線程會處于處理任務的活動狀態。如果在所有線程處于活動狀態時提交附加任務,則在有可用線程之前,附加任務將在隊列中等待。如果在關閉前的執行期間由于失敗而導致任何線程終止,那么一個新線程將代替它執行后續的任務(如果需要)。在某個線程被顯式地關閉之前,池中的線程將一直存在。而通過源碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
參數說明:第一個nThreads,線程池的線程數。第二個nThreads是線程池中允許的最大線程數。
0L表示多余最大線程數-線程池的線程數的這些線程在閑置情況下允許的存活時間。TimeUnit時間單位。 LinkedBlockingQueue<Runnable> 一個基于已鏈接節點的、范圍任意的 blocking queue。備注:還有可能有一個參數threadFactory用來創建線程。
可以看到線程池有一個LinkedBlockingQueue<Runnable>隊列來存放過量的任務(也就是JDK中所說的附加任務)。所以,這個固定大小的線程池的好處在于,無論是流量高峰還是沒有訪問裝態,都會最多有nThreads在線程池(如果某一瞬間,某些線程down掉后,沒來的急新建,那就少于這個值了),這樣的話能夠盡量保證消耗的內存空間較少,能夠避免一些線程數猛增帶來的OOM問題。
第二:SingleThreadExecutor。來自jdk的解釋:創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個線程,那么如果需要,一個新線程將代替它執行后續的任務)。可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的。
源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
說明:只是池大小為1了。
分析:此種池的特點在于永遠只有一個有效線程在運行,也就成了多個任務串行執行了,對于前后任務有順序的多任務可能有所幫助。
第三:CachedThreadPool。來自jdk的解釋:創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。對于執行很多短期異步任務的程序而言,這些線程池通??商岣叱绦蛐阅?。調用 execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。
參見創建源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
說明:這個池建立參數上比較特殊,它允許最大 Integer.MAX_VALUE個線程共存,并且設置了60秒線程的閑置限制。并采用了奇怪的SynchronousQueue。這個隊列是這樣的,看jdk api文檔:
一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。不能在同步隊列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用于迭代。隊列的頭 是嘗試添加到隊列中的首個已排隊插入線程的元素;如果沒有這樣的已排隊線程,則沒有可用于移除的元素并且 poll() 將會返回 null。對于其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空 collection。此隊列不允許 null 元素。
分析:這種池的特點在于能根據任務的數量設置池中線程多少,并能在一段時間后清除閑置線程,但是風險在于允許太多的線程存在,這就會導致線程創建的資源消耗過多。
第四:ScheduledThreadPool。來自jdk的解釋:創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。
創建源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
該池最大的特點在于允許你定時delay后按照一定的速率隔時執行一些任務。
簡要的理解了這幾種池之后通過幾個demo來進一步認識下哈。 源碼是王道。恩直接看吧:
這個例子主要是理解下四個池的特點,針對runnable和callable任務的一些測試。
package ThreadSPools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadPoolDemo {
public static void main(String[] args){
Thread main = Thread.currentThread();
final HashMap<Integer, ArrayList<Integer>> map = new HashMap<Integer,ArrayList<Integer>>();
HashMap<Integer, ArrayList<Integer>> resultMap = new HashMap<Integer,ArrayList<Integer>>();
for(int i=0;i<300;i++){
ArrayList<Integer> arrayList= new ArrayList<Integer>();
for(int i1=0;i1<10000;i1++){
int p = (int) (Math.random()*10000);
arrayList.add(p);
}
map.put(i, arrayList);
}
//ExecutorService executor = Executors.newFixedThreadPool(2);
//ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newCachedThreadPool();
//ExecutorService executor = Executors.newScheduledThreadPool(4);
long start = System.currentTimeMillis();
for(final int key : map.keySet()){
Callable<ArrayList<Integer>> task = new Callable<ArrayList<Integer>>(){
@Override
public ArrayList<Integer>call()
throws Exception {
Collections.sort(map.get(key));
return map.get(key);
}
};
Future<ArrayList<Integer>> future=executor.submit(task);
try {
resultMap.put(key, future.get());
System.out.println(key+" "+resultMap.get(key).subList(0, 10));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
try {
main.sleep(6000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Excuting this totally costs "+(end - start));
executor.shutdown();
}
}
另外看一個自己修正的簡要池,主要是用來測RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>。
源碼:
package ThreadSPools;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
//一個類似于cachedPool的ThreadPool 主要用來測試任務的RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>
public class ThreadPoolOwnedByShuofengDemo extends ThreadPoolExecutor{
public ThreadPoolOwnedByShuofengDemo(int nThreads,int maxnThreads,
long keepaliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadfactory,RejectedExecutionHandler handler){
super(nThreads,maxnThreads,keepaliveTime,
TimeUnit.SECONDS, workQueue,
threadfactory,handler);
}
public static void main(String[]args){
ThreadPoolOwnedByShuofengDemo pool =
new ThreadPoolOwnedByShuofengDemo(5,10,30,TimeUnit.SECONDS,
//使用這個隊列時,大於這個隊列的容量后,基本上直接拋出RejectedExecutionException
//new ArrayBlockingQueue<Runnable>(10)
//new LinkedBlockingQueue<Runnable>()//使用這個隊列時,基本上RejectedExecutionHandler沒用了
//沒有空間冗餘,也就是說當任務數大於當前可執行的數量時,一般都直接拋出RejectedExecutionException
//偶爾比較幸運當期執行的任務完成,那就會執行后提交的一個,不過這個概率貌似很小一樣,除非提交任務的時間點靠後
new SynchronousQueue<Runnable>()
,
Executors.defaultThreadFactory(),
//當多于 maxnThread +ArrayBlockingQueue大小時拋出異常RejectedExecutionException
new AbortPolicy()
//當任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,將任務退回給調用線程執行
//new CallerRunsPolicy()
//任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,后來任務被丟掉
//new DiscardPolicy()
//任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,最早提交還未被執行的任務被丟掉
//new DiscardOldestPolicy()
);
final CopyOnWriteArrayList<String> arraylist = new CopyOnWriteArrayList<String>();
//測試runnable task
// for(int i=0;i<50;i++){
// Runnable task = new Runnable(){
//
// @Override
// public void run() {
//
// System.out.println(Thread.currentThread().getName()+" hello,this is shuofengTask.");
// try {
// Thread.currentThread().sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
//
// };
// pool.submit(task);
// }
//測試Callable task
for(int i=0;i<10;i++){
final int id= i;
Callable task = new Callable(){
final String taskname= id+"號task";
@Override
public Object call() throws Exception {
String s =taskname+" hello,this is shuofengTask.";
arraylist.add(s);
try {
Thread.currentThread().sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return arraylist;
}
};
pool.submit(task);
}
try {
Thread.currentThread().sleep(1000);//sleep時間要合理 保證在池關閉前任務已經都執行完了
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(arraylist.size());
System.out.println(arraylist.subList(0, arraylist.size()));
pool.shutdown();//優雅的關閉池資源
}
}
再看一下最奇怪的同步隊列
package SychQueue;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
//特點在于每一個消費者都要等生產者放入字符串之后才能讀取。而且同步隊列沒有容量,如果消費者不取的話,則會被阻塞。
public class SynchronousQueueDemo {
public static void main(String[]args){
final List<String> msg = Arrays.asList("start","one","two","three");
final BlockingQueue<String> queue = new SynchronousQueue<String>();
ExecutorService executor = Executors.newCachedThreadPool();
Runnable producerTask = new Runnable(){
final List<String> waitingRecievingMsg = msg;
@Override
public void run() {
try {
for(String s:waitingRecievingMsg){
queue.put(s);
Thread.currentThread().sleep(1000);
}
queue.put("end");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
executor.submit(producerTask);
Runnable consumerTask = new Runnable(){
String recievedMsg;
@Override
public void run() {
try {
;
while((recievedMsg = queue.take())!=null&& !recievedMsg.equals("end")){
System.out.println(recievedMsg);
Thread.currentThread().sleep(3000);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
executor.submit(consumerTask);
executor.shutdown();
}
}
說明:
線程池其實也是比較簡單的 ,只要記住將任務按照需要組裝成runnale 或者callable形式(兩者的區別在于有無返回值),然后選擇合適的池類型,將任務提交就可以了。只是要注意多個任務的時候 控制任務的到達等,然后要注意的就是合適的時機關閉線程池了。
第一,FixedThreadPool,來自jdk的解釋是這樣的:創建一個可重用固定線程數的線程池,以共享的無界隊列方式來運行這些線程。在任意點,在大多數 nThreads 線程會處于處理任務的活動狀態。如果在所有線程處于活動狀態時提交附加任務,則在有可用線程之前,附加任務將在隊列中等待。如果在關閉前的執行期間由于失敗而導致任何線程終止,那么一個新線程將代替它執行后續的任務(如果需要)。在某個線程被顯式地關閉之前,池中的線程將一直存在。而通過源碼
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
參數說明:第一個nThreads,線程池的線程數。第二個nThreads是線程池中允許的最大線程數。
0L表示多余最大線程數-線程池的線程數的這些線程在閑置情況下允許的存活時間。TimeUnit時間單位。 LinkedBlockingQueue<Runnable> 一個基于已鏈接節點的、范圍任意的 blocking queue。備注:還有可能有一個參數threadFactory用來創建線程。
可以看到線程池有一個LinkedBlockingQueue<Runnable>隊列來存放過量的任務(也就是JDK中所說的附加任務)。所以,這個固定大小的線程池的好處在于,無論是流量高峰還是沒有訪問裝態,都會最多有nThreads在線程池(如果某一瞬間,某些線程down掉后,沒來的急新建,那就少于這個值了),這樣的話能夠盡量保證消耗的內存空間較少,能夠避免一些線程數猛增帶來的OOM問題。
第二:SingleThreadExecutor。來自jdk的解釋:創建一個使用單個 worker 線程的 Executor,以無界隊列方式來運行該線程。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個線程,那么如果需要,一個新線程將代替它執行后續的任務)。可保證順序地執行各個任務,并且在任意給定的時間不會有多個線程是活動的。
源碼:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
說明:只是池大小為1了。
分析:此種池的特點在于永遠只有一個有效線程在運行,也就成了多個任務串行執行了,對于前后任務有順序的多任務可能有所幫助。
第三:CachedThreadPool。來自jdk的解釋:創建一個可根據需要創建新線程的線程池,但是在以前構造的線程可用時將重用它們。對于執行很多短期異步任務的程序而言,這些線程池通??商岣叱绦蛐阅?。調用 execute 將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。因此,長時間保持空閑的線程池不會使用任何資源。
參見創建源碼:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
說明:這個池建立參數上比較特殊,它允許最大 Integer.MAX_VALUE個線程共存,并且設置了60秒線程的閑置限制。并采用了奇怪的SynchronousQueue。這個隊列是這樣的,看jdk api文檔:
一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。同步隊列沒有任何內部容量,甚至連一個隊列的容量都沒有。不能在同步隊列上進行 peek,因為僅在試圖要移除元素時,該元素才存在;除非另一個線程試圖移除某個元素,否則也不能(使用任何方法)插入元素;也不能迭代隊列,因為其中沒有元素可用于迭代。隊列的頭 是嘗試添加到隊列中的首個已排隊插入線程的元素;如果沒有這樣的已排隊線程,則沒有可用于移除的元素并且 poll() 將會返回 null。對于其他 Collection 方法(例如 contains),SynchronousQueue 作為一個空 collection。此隊列不允許 null 元素。
分析:這種池的特點在于能根據任務的數量設置池中線程多少,并能在一段時間后清除閑置線程,但是風險在于允許太多的線程存在,這就會導致線程創建的資源消耗過多。
第四:ScheduledThreadPool。來自jdk的解釋:創建一個線程池,它可安排在給定延遲后運行命令或者定期地執行。
創建源碼:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
該池最大的特點在于允許你定時delay后按照一定的速率隔時執行一些任務。
簡要的理解了這幾種池之后通過幾個demo來進一步認識下哈。 源碼是王道。恩直接看吧:
這個例子主要是理解下四個池的特點,針對runnable和callable任務的一些測試。
package ThreadSPools;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ThreadPoolDemo {
public static void main(String[] args){
Thread main = Thread.currentThread();
final HashMap<Integer, ArrayList<Integer>> map = new HashMap<Integer,ArrayList<Integer>>();
HashMap<Integer, ArrayList<Integer>> resultMap = new HashMap<Integer,ArrayList<Integer>>();
for(int i=0;i<300;i++){
ArrayList<Integer> arrayList= new ArrayList<Integer>();
for(int i1=0;i1<10000;i1++){
int p = (int) (Math.random()*10000);
arrayList.add(p);
}
map.put(i, arrayList);
}
//ExecutorService executor = Executors.newFixedThreadPool(2);
//ExecutorService executor = Executors.newSingleThreadExecutor();
ExecutorService executor = Executors.newCachedThreadPool();
//ExecutorService executor = Executors.newScheduledThreadPool(4);
long start = System.currentTimeMillis();
for(final int key : map.keySet()){
Callable<ArrayList<Integer>> task = new Callable<ArrayList<Integer>>(){
@Override
public ArrayList<Integer>call()
throws Exception {
Collections.sort(map.get(key));
return map.get(key);
}
};
Future<ArrayList<Integer>> future=executor.submit(task);
try {
resultMap.put(key, future.get());
System.out.println(key+" "+resultMap.get(key).subList(0, 10));
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
long end = System.currentTimeMillis();
try {
main.sleep(6000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("Excuting this totally costs "+(end - start));
executor.shutdown();
}
}
另外看一個自己修正的簡要池,主要是用來測RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>。
源碼:
package ThreadSPools;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
//一個類似于cachedPool的ThreadPool 主要用來測試任務的RejectedExecutionHandler策略以及選取的BlockingQueue<Runnable>
public class ThreadPoolOwnedByShuofengDemo extends ThreadPoolExecutor{
public ThreadPoolOwnedByShuofengDemo(int nThreads,int maxnThreads,
long keepaliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
ThreadFactory threadfactory,RejectedExecutionHandler handler){
super(nThreads,maxnThreads,keepaliveTime,
TimeUnit.SECONDS, workQueue,
threadfactory,handler);
}
public static void main(String[]args){
ThreadPoolOwnedByShuofengDemo pool =
new ThreadPoolOwnedByShuofengDemo(5,10,30,TimeUnit.SECONDS,
//使用這個隊列時,大於這個隊列的容量后,基本上直接拋出RejectedExecutionException
//new ArrayBlockingQueue<Runnable>(10)
//new LinkedBlockingQueue<Runnable>()//使用這個隊列時,基本上RejectedExecutionHandler沒用了
//沒有空間冗餘,也就是說當任務數大於當前可執行的數量時,一般都直接拋出RejectedExecutionException
//偶爾比較幸運當期執行的任務完成,那就會執行后提交的一個,不過這個概率貌似很小一樣,除非提交任務的時間點靠後
new SynchronousQueue<Runnable>()
,
Executors.defaultThreadFactory(),
//當多于 maxnThread +ArrayBlockingQueue大小時拋出異常RejectedExecutionException
new AbortPolicy()
//當任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,將任務退回給調用線程執行
//new CallerRunsPolicy()
//任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,后來任務被丟掉
//new DiscardPolicy()
//任務數瞬間多于 maxnThread +ArrayBlockingQueue大小時,最早提交還未被執行的任務被丟掉
//new DiscardOldestPolicy()
);
final CopyOnWriteArrayList<String> arraylist = new CopyOnWriteArrayList<String>();
//測試runnable task
// for(int i=0;i<50;i++){
// Runnable task = new Runnable(){
//
// @Override
// public void run() {
//
// System.out.println(Thread.currentThread().getName()+" hello,this is shuofengTask.");
// try {
// Thread.currentThread().sleep(1000);
// } catch (InterruptedException e) {
// // TODO Auto-generated catch block
// e.printStackTrace();
// }
// }
//
// };
// pool.submit(task);
// }
//測試Callable task
for(int i=0;i<10;i++){
final int id= i;
Callable task = new Callable(){
final String taskname= id+"號task";
@Override
public Object call() throws Exception {
String s =taskname+" hello,this is shuofengTask.";
arraylist.add(s);
try {
Thread.currentThread().sleep(1);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return arraylist;
}
};
pool.submit(task);
}
try {
Thread.currentThread().sleep(1000);//sleep時間要合理 保證在池關閉前任務已經都執行完了
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(arraylist.size());
System.out.println(arraylist.subList(0, arraylist.size()));
pool.shutdown();//優雅的關閉池資源
}
}
再看一下最奇怪的同步隊列
package SychQueue;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
//特點在于每一個消費者都要等生產者放入字符串之后才能讀取。而且同步隊列沒有容量,如果消費者不取的話,則會被阻塞。
public class SynchronousQueueDemo {
public static void main(String[]args){
final List<String> msg = Arrays.asList("start","one","two","three");
final BlockingQueue<String> queue = new SynchronousQueue<String>();
ExecutorService executor = Executors.newCachedThreadPool();
Runnable producerTask = new Runnable(){
final List<String> waitingRecievingMsg = msg;
@Override
public void run() {
try {
for(String s:waitingRecievingMsg){
queue.put(s);
Thread.currentThread().sleep(1000);
}
queue.put("end");
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
executor.submit(producerTask);
Runnable consumerTask = new Runnable(){
String recievedMsg;
@Override
public void run() {
try {
;
while((recievedMsg = queue.take())!=null&& !recievedMsg.equals("end")){
System.out.println(recievedMsg);
Thread.currentThread().sleep(3000);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
executor.submit(consumerTask);
executor.shutdown();
}
}
說明:
線程池其實也是比較簡單的 ,只要記住將任務按照需要組裝成runnale 或者callable形式(兩者的區別在于有無返回值),然后選擇合適的池類型,將任務提交就可以了。只是要注意多個任務的時候 控制任務的到達等,然后要注意的就是合適的時機關閉線程池了。
總結
- 上一篇: 出门没带本子记的单词|10:20~10:
- 下一篇: 每日词根——es/ed(吃)