Java:ThreadPoolExecutor解析续--Executors
簡介
Eexecutor作為靈活且強大的異步執(zhí)行框架,其支持多種不同類型的任務執(zhí)行策略,提供了一種標準的方法將任務的提交過程和執(zhí)行過程解耦開發(fā),基于生產者-消費者模式,其提交任務的線程相當于生產者,執(zhí)行任務的線程相當于消費者,并用Runnable來表示任務,Executor的實現還提供了對生命周期的支持,以及統(tǒng)計信息收集,應用程序管理機制和性能監(jiān)視等機制。
Executors:提供了一系列靜態(tài)工廠方法用于創(chuàng)建各種線程池
Executors:通過ThreadFactory創(chuàng)建工作線程
ThreadFactory
接口ThreadFactory為產生線程的工廠
public interface ThreadFactory {Thread newThread(Runnable r); }具體實現類
DefaultThreadFactory
static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();//線程Name前綴namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);//設置為非守護線程if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}PrivilegedThreadFactory
//創(chuàng)建的新線程與當前線程具有相同的權限static class PrivilegedThreadFactory extends DefaultThreadFactory {private final AccessControlContext acc;private final ClassLoader ccl;PrivilegedThreadFactory() {super();SecurityManager sm = System.getSecurityManager();if (sm != null) {// Calls to getContextClassLoader from this class// never trigger a security check, but we check// whether our callers have this permission anyways.sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);// Fail fastsm.checkPermission(new RuntimePermission("setContextClassLoader"));}this.acc = AccessController.getContext();this.ccl = Thread.currentThread().getContextClassLoader();}public Thread newThread(final Runnable r) {return super.newThread(new Runnable() {public void run() {AccessController.doPrivileged(new PrivilegedAction<Void>() {public Void run() {Thread.currentThread().setContextClassLoader(ccl);r.run();return null;}}, acc);}});}}RejectedExecutionHandler
在使用線程池并且使用有界隊列的時候,如果隊列滿了,任務添加到線程池的時候就會有問題,可以指定處理策略。
public interface RejectedExecutionHandler {void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }針對這些問題java線程池提供了以下幾種策略:
- AbortPolicy
該策略是線程池的默認策略。使用該策略時,如果線程池隊列滿了丟掉這個任務并且拋出RejectedExecutionException異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {//不做任何處理,直接拋出異常throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}- DiscardPolicy
這個策略和AbortPolicy的slient版本,如果線程池隊列滿了,會直接丟掉這個任務并且不會有任何異常。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {//就是一個空的方法}- DiscardOldestPolicy
這個策略從字面上也很好理解,丟棄最老的。也就是說如果隊列滿了,會將最早進入隊列的任務刪掉騰出空間,再嘗試加入隊列。
因為隊列是隊尾進,隊頭出,所以隊頭元素是最老的,因此每次都是移除對頭元素后再嘗試入隊。
- CallerRunsPolicy
使用此策略,如果添加到線程池失敗,那么主線程會自己去執(zhí)行該任務,不會等待線程池中的線程去執(zhí)行。就像是個急脾氣的人,我等不到別人來做這件事就干脆自己干。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {//直接執(zhí)行run方法r.run();}}- 自定義
如果以上策略都不符合業(yè)務場景,那么可以自己定義一個拒絕策略,只要實現RejectedExecutionHandler接口,并且實現rejectedExecution方法就可以了。具體的邏輯就在rejectedExecution方法里去定義就OK了。
線程池創(chuàng)建
newFixedThreadPool
創(chuàng)建可重用且固定線程數的線程池,如果線程池中的所有線程都處于活動狀態(tài),此時再提交任務就在隊列中等待,直到有可用線程;如果線程池中的某個線程由于異常而結束時,線程池就會再補充一條新線程。
//corePoolSize==maximumPoolSize public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads,nThreads,0L, TimeUnit.MILLISECONDS,//使用一個基于FIFO排序的阻塞隊列,在所有corePoolSize線程都忙時新任務將在隊列中等待new LinkedBlockingQueue<Runnable>() ); }
newSingleThreadExecutor
創(chuàng)建一個單線程的Executor,如果該線程因為異常而結束就新建一條線程來繼續(xù)執(zhí)行后續(xù)的任務
//corePoolSize和maximumPoolSize都等于,表示固定線程池大小為1 public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService//corePoolSize和maximumPoolSize都等于,表示固定線程池大小為1(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
newScheduledThreadPool
創(chuàng)建一個可延遲執(zhí)行或定期執(zhí)行的線程池
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());}
newCachedThreadPool
創(chuàng)建可緩存的線程池,如果線程池中的線程在60秒未被使用就將被移除,在執(zhí)行新的任務時,當線程池中有之前創(chuàng)建的可用線程就重 ? ? ?用可用線程,否則就新建一條線程
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,//使用同步隊列,將任務直接提交給線程new SynchronousQueue<Runnable>()); }擴展ExecutorService
DelegatedExecutorService
代理模式,封裝其他ExecutorService,僅封裝需要的方法
static class DelegatedExecutorService extends AbstractExecutorService {private final ExecutorService e;DelegatedExecutorService(ExecutorService executor) { e = executor; }public void execute(Runnable command) { e.execute(command); }public void shutdown() { e.shutdown(); }public List<Runnable> shutdownNow() { return e.shutdownNow(); }public boolean isShutdown() { return e.isShutdown(); }public boolean isTerminated() { return e.isTerminated(); }public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {return e.awaitTermination(timeout, unit);}public Future<?> submit(Runnable task) {return e.submit(task);}public <T> Future<T> submit(Callable<T> task) {return e.submit(task);}public <T> Future<T> submit(Runnable task, T result) {return e.submit(task, result);}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {return e.invokeAll(tasks);}public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {return e.invokeAll(tasks, timeout, unit);}public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {return e.invokeAny(tasks);}public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {return e.invokeAny(tasks, timeout, unit);}}FinalizableDelegatedExecutorService
DelegatedExecutorService的子類,實現finalize方法。
static class FinalizableDelegatedExecutorServiceextends DelegatedExecutorService {FinalizableDelegatedExecutorService(ExecutorService executor) {super(executor);}protected void finalize() {super.shutdown();}}DelegatedScheduledExecutorService
DelegatedExecutorService的子類,實現schedule方法。
static class DelegatedScheduledExecutorServiceextends DelegatedExecutorServiceimplements ScheduledExecutorService {private final ScheduledExecutorService e;DelegatedScheduledExecutorService(ScheduledExecutorService executor) {super(executor);e = executor;}public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {return e.schedule(command, delay, unit);}public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {return e.schedule(callable, delay, unit);}public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {return e.scheduleAtFixedRate(command, initialDelay, period, unit);}public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);}}?
總結
以上是生活随笔為你收集整理的Java:ThreadPoolExecutor解析续--Executors的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zookeeper分布式锁避免羊群效应(
- 下一篇: 非阻塞同步算法与CAS(Compare