生活随笔
收集整理的這篇文章主要介紹了
转: Executor类
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
Executor框架是指java 5中引入的一系列并發庫中與executor相關的一些功能類,其中包括線程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。
他們的關系為:
?
并發編程的一種編程方式是把任務拆分為一些列的小任務,即Runnable,然后在提交給一個Executor執行,Executor.execute(Runnalbe)?。Executor在執行時使用內部的線程池完成操作。
一、創建線程池
Executors類,提供了一系列工廠方法用于創先線程池,返回的線程池都實現了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
創建固定數目線程的線程池。
public static ExecutorService newCachedThreadPool()
創建一個可緩存的線程池,調用execute?將重用以前構造的線程(如果線程可用)。如果現有線程沒有可用的,則創建一個新線程并添加到池中。終止并從緩存中移除那些已有 60 秒鐘未被使用的線程。
public static ExecutorService newSingleThreadExecutor()
創建一個單線程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
創建一個支持定時及周期性的任務執行的線程池,多數情況下可用來替代Timer類。
Java代碼??
Executor?executor?=?Executors.newFixedThreadPool(10);??Runnable?task?=?new?Runnable()?{??????@Override??????public?void?run()?{??????????System.out.println("task?over");??????}??};??executor.execute(task);????executor?=?Executors.newScheduledThreadPool(10);??ScheduledExecutorService?scheduler?=?(ScheduledExecutorService)?executor;??scheduler.scheduleAtFixedRate(task,?10,?10,?TimeUnit.SECONDS);?? ?二、ExecutorService與生命周期
ExecutorService擴展了Executor并添加了一些生命周期管理的方法。一個Executor的生命周期有三種狀態,運行?,關閉?,終止。Executor創建時處于運行狀態。當調用ExecutorService.shutdown()后,處于關閉狀態,isShutdown()方法返回true。這時,不應該再想Executor中添加任務,所有已添加的任務執行完畢后,Executor處于終止狀態,isTerminated()返回true。
如果Executor處于關閉狀態,往Executor提交任務會拋出unchecked exception RejectedExecutionException。
Java代碼??
ExecutorService?executorService?=?(ExecutorService)?executor;??while?(!executorService.isShutdown())?{??????try?{??????????executorService.execute(task);??????}?catch?(RejectedExecutionException?ignored)?{????????????????}??}??executorService.shutdown();?? ?三、使用Callable,Future返回結果
Future<V>代表一個異步執行的操作,通過get()方法可以獲得操作的結果,如果異步操作還沒有完成,則,get()會使當前線程阻塞。FutureTask<V>實現了Future<V>和Runable<V>。Callable代表一個有返回值得操作。
Java代碼??
Callable<Integer>?func?=?new?Callable<Integer>(){??????public?Integer?call()?throws?Exception?{??????????System.out.println("inside?callable");??????????Thread.sleep(1000);??????????return?new?Integer(8);??????}?????????};????????FutureTask<Integer>?futureTask??=?new?FutureTask<Integer>(func);??Thread?newThread?=?new?Thread(futureTask);??newThread.start();????try?{??????System.out.println("blocking?here");??????Integer?result?=?futureTask.get();??????System.out.println(result);??}?catch?(InterruptedException?ignored)?{??}?catch?(ExecutionException?ignored)?{??}?? ?ExecutoreService提供了submit()方法,傳遞一個Callable,或Runnable,返回Future。如果Executor后臺線程池還沒有完成Callable的計算,這調用返回Future對象的get()方法,會阻塞直到計算完成。
例子:并行計算數組的和。
Java代碼??
package?executorservice;????import?java.util.ArrayList;??import?java.util.List;??import?java.util.concurrent.Callable;??import?java.util.concurrent.ExecutionException;??import?java.util.concurrent.ExecutorService;??import?java.util.concurrent.Executors;??import?java.util.concurrent.Future;??import?java.util.concurrent.FutureTask;????public?class?ConcurrentCalculator?{????????private?ExecutorService?exec;??????private?int?cpuCoreNumber;??????private?List<Future<Long>>?tasks?=?new?ArrayList<Future<Long>>();????????????class?SumCalculator?implements?Callable<Long>?{??????????private?int[]?numbers;??????????private?int?start;??????????private?int?end;????????????public?SumCalculator(final?int[]?numbers,?int?start,?int?end)?{??????????????this.numbers?=?numbers;??????????????this.start?=?start;??????????????this.end?=?end;??????????}????????????public?Long?call()?throws?Exception?{??????????????Long?sum?=?0l;??????????????for?(int?i?=?start;?i?<?end;?i++)?{??????????????????sum?+=?numbers[i];??????????????}??????????????return?sum;??????????}??????}????????public?ConcurrentCalculator()?{??????????cpuCoreNumber?=?Runtime.getRuntime().availableProcessors();??????????exec?=?Executors.newFixedThreadPool(cpuCoreNumber);??????}????????public?Long?sum(final?int[]?numbers)?{??????????????????for?(int?i?=?0;?i?<?cpuCoreNumber;?i++)?{??????????????int?increment?=?numbers.length?/?cpuCoreNumber?+?1;??????????????int?start?=?increment?*?i;??????????????int?end?=?increment?*?i?+?increment;??????????????if?(end?>?numbers.length)??????????????????end?=?numbers.length;??????????????SumCalculator?subCalc?=?new?SumCalculator(numbers,?start,?end);??????????????FutureTask<Long>?task?=?new?FutureTask<Long>(subCalc);??????????????tasks.add(task);??????????????if?(!exec.isShutdown())?{??????????????????exec.submit(task);??????????????}??????????}??????????return?getResult();??????}????????????public?Long?getResult()?{??????????Long?result?=?0l;??????????for?(Future<Long>?task?:?tasks)?{??????????????try?{??????????????????????????????????Long?subSum?=?task.get();??????????????????result?+=?subSum;??????????????}?catch?(InterruptedException?e)?{??????????????????e.printStackTrace();??????????????}?catch?(ExecutionException?e)?{??????????????????e.printStackTrace();??????????????}??????????}??????????return?result;??????}????????public?void?close()?{??????????exec.shutdown();??????}??}?? ?Main
Java代碼??
int[]?numbers?=?new?int[]?{?1,?2,?3,?4,?5,?6,?7,?8,?10,?11?};??ConcurrentCalculator?calc?=?new?ConcurrentCalculator();??Long?sum?=?calc.sum(numbers);??System.out.println(sum);??calc.close();?? ?四、CompletionService
在剛在的例子中,getResult()方法的實現過程中,迭代了FutureTask的數組,如果任務還沒有完成則當前線程會阻塞,如果我們希望任意字任務完成后就把其結果加到result中,而不用依次等待每個任務完成,可以使CompletionService。生產者submit()執行的任務。使用者take()已完成的任務,并按照完成這些任務的順序處理它們的結果?。也就是調用CompletionService的take方法是,會返回按完成順序放回任務的結果,CompletionService內部維護了一個阻塞隊列BlockingQueue,如果沒有任務完成,take()方法也會阻塞。修改剛才的例子使用CompletionService:
Java代碼??
public?class?ConcurrentCalculator2?{????????private?ExecutorService?exec;??????private?CompletionService<Long>?completionService;??????????private?int?cpuCoreNumber;????????????class?SumCalculator?implements?Callable<Long>?{??????????......??????}????????public?ConcurrentCalculator2()?{??????????cpuCoreNumber?=?Runtime.getRuntime().availableProcessors();??????????exec?=?Executors.newFixedThreadPool(cpuCoreNumber);??????????completionService?=?new?ExecutorCompletionService<Long>(exec);??????????}????????public?Long?sum(final?int[]?numbers)?{??????????????????for?(int?i?=?0;?i?<?cpuCoreNumber;?i++)?{??????????????int?increment?=?numbers.length?/?cpuCoreNumber?+?1;??????????????int?start?=?increment?*?i;??????????????int?end?=?increment?*?i?+?increment;??????????????if?(end?>?numbers.length)??????????????????end?=?numbers.length;??????????????SumCalculator?subCalc?=?new?SumCalculator(numbers,?start,?end);???????????????if?(!exec.isShutdown())?{??????????????????completionService.submit(subCalc);??????????????????}????????????????????????}??????????return?getResult();??????}????????????public?Long?getResult()?{??????????Long?result?=?0l;??????????for?(int?i?=?0;?i?<?cpuCoreNumber;?i++)?{??????????????????????????try?{??????????????????Long?subSum?=?completionService.take().get();??????????????????result?+=?subSum;?????????????????????????}?catch?(InterruptedException?e)?{??????????????????e.printStackTrace();??????????????}?catch?(ExecutionException?e)?{??????????????????e.printStackTrace();??????????????}??????????}??????????return?result;??????}????????public?void?close()?{??????????exec.shutdown();??????}??} ?
轉載于:https://www.cnblogs.com/codershuai/p/4413988.html
總結
以上是生活随笔為你收集整理的转: Executor类的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。