多线程相关-ThreadPoolExecutor
多線程相關-ThreadPoolExecutor
應用層面:
ThreadPoolExecutor:
創建多線程池執行器:new ThreadPoolExecutor(),創建方法最終都是走的以下這個構造方法:
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters.** @param corePoolSize the number of threads to keep in the pool, even* if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the* pool* @param keepAliveTime when the number of threads is greater than* the core, this is the maximum time that excess idle threads* will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are* executed. This queue will hold only the {@code Runnable}* tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor* creates a new thread* @param handler the handler to use when execution is blocked* because the thread bounds and queue capacities are reached* @throws IllegalArgumentException if one of the following holds:<br>* {@code corePoolSize < 0}<br>* {@code keepAliveTime < 0}<br>* {@code maximumPoolSize <= 0}<br>* {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}* or {@code threadFactory} or {@code handler} is null*/public ThreadPoolExecutor(int corePoolSize,//核心線程數int maximumPoolSize,//核心線程最大數量long keepAliveTime,//超出核心線程數的其他空閑線程保留時間TimeUnit unit,//空閑時間單位BlockingQueue<Runnable> workQueue,//對列,當線程數量大于等于核心線程數時,將任務works保存進對列ThreadFactory threadFactory,//創建線程的工廠RejectedExecutionHandler handler) {//超出最大核心線程數的拒絕策略if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}創建線程池的其他方式:(返回的實際對象仍然是ThreadPoolExecutor,只不過是對構造函數的參數進行的特殊規定)
1、Executors.newFixedThreadPool(int nThreads)
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}?
Executors.newFixedThreadPool(int nThreads, ThreadFactory threadFactory)//自動以創建線程的工廠
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}2、Executors.newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}3、Executor.newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());} public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}?
源碼:
ThreadPoolExecutor
構造方法:
ThreadPoolExecutor(int corePoolSize,//核心線程數int maximumPoolSize,//核心線程最大數量long keepAliveTime,//超出核心線程數的其他空閑線程保留時間TimeUnit unit,//空閑時間單位BlockingQueue<Runnable> workQueue,//對列,當線程數量大于等于核心線程數時,將任務works保存進對列ThreadFactory threadFactory,//創建線程的工廠RejectedExecutionHandler handler) {//超出最大核心線程數的拒絕策略? ? corePoolSize:線程池的核心線程數,當線程池中的工作線程數小于核心線程數的時候,只要向線程池指派任務,線程池就會創建工作線程。
? ??maximumPoolSize:線程池最大工作線程數,當線程池中的工作線程達到最大數的時候,即使再向線程池指派任務,線程池不會創建工作線程,回執行對應的拒絕策略。
? ??keepAliveTime:當線程池的工作線程數大于核心線程數的時候,多余的核心線程數的部分線程(空閑的)可以保持keepAliveTime的空閑時間,當keepAliveTime時間內還沒有獲取到任務,這些線程后就會被回收。
? ??unit:保持空閑時間的時間單位。
? ??workQueue:任務隊列,當線程池里面核心線程都在工作的時候,再向線程池指派任務,線程池會將任務放入任務隊列里,工作線程在執行完任務后會再向任務隊列里取出任務來執行。
? ??threadFactory:創建執行任務的工作線程的線程工廠。
? ??handler:拒絕任務加入線程池的策越,當線程池里的線程已經達到最大數后,再向線程池里加派任務時,線程池會決絕執行這些任務,handler就是具體執行拒絕的對象。
線程池的大體工作思路 ?
1.當線程池小于corePoolSize時,新提交任務將創建一個新線程執行任務,即使此時線程池中存在空閑線程。?
2.當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行?
3.當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會創建新線程執行任務?
4.當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理?
5.當線程池中超過corePoolSize數的線程,空閑時間達到keepAliveTime時,關閉空閑線程?
6.當設置allowCoreThreadTimeOut(true)時,線程池中核心線程空閑時間達到keepAliveTime也將關閉
/*** The main pool control state, ctl, is an atomic integer packing* two conceptual fields* workerCount, indicating the effective number of threads* runState, indicating whether running, shutting down etc** In order to pack them into one int, we limit workerCount to* (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2* billion) otherwise representable. If this is ever an issue in* the future, the variable can be changed to be an AtomicLong,* and the shift/mask constants below adjusted. But until the need* arises, this code is a bit faster and simpler using an int.** The workerCount is the number of workers that have been* permitted to start and not permitted to stop. The value may be* transiently different from the actual number of live threads,* for example when a ThreadFactory fails to create a thread when* asked, and when exiting threads are still performing* bookkeeping before terminating. The user-visible pool size is* reported as the current size of the workers set.** The runState provides the main lifecycle control, taking on values:** RUNNING: Accept new tasks and process queued tasksrunning狀態是可以接受和處理任務* SHUTDOWN: Don't accept new tasks, but process queued tasksshutdown狀態時不能接受新的任務,但是仍可以處理對列中的任務* STOP: Don't accept new tasks, don't process queued tasks,stop狀態,不接受新任務,也不執行對列中的任務,同事中斷正在執行的任務* and interrupt in-progress tasks* TIDYING: All tasks have terminated, workerCount is zero,* the thread transitioning to state TIDYING* will run the terminated() hook methodtidying狀態,所有的工作線程全部停止,并工作線程數量為0,將調用terminated方法,進入到terninated狀態* TERMINATED: terminated() has completed終止狀態** The numerical order among these values matters, to allow* ordered comparisons. The runState monotonically increases over* time, but need not hit each state. The transitions are:*各種狀態的轉換-----* RUNNING -> SHUTDOWN* On invocation of shutdown(), perhaps implicitly in finalize()* (RUNNING or SHUTDOWN) -> STOP* On invocation of shutdownNow()* SHUTDOWN -> TIDYING* When both queue and pool are empty* STOP -> TIDYING* When pool is empty* TIDYING -> TERMINATED* When the terminated() hook method has completed** Threads waiting in awaitTermination() will return when the* state reaches TERMINATED.** Detecting the transition from SHUTDOWN to TIDYING is less* straightforward than you'd like because the queue may become* empty after non-empty and vice versa during SHUTDOWN state, but* we can only terminate if, after seeing that it is empty, we see* that workerCount is 0 (which sometimes entails a recheck -- see* below).*/private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY = (1 << COUNT_BITS) - 1;//默認的容量2^29 -1// runState is stored in the high-order bitsprivate static final int RUNNING = -1 << COUNT_BITS;private static final int SHUTDOWN = 0 << COUNT_BITS;private static final int STOP = 1 << COUNT_BITS;private static final int TIDYING = 2 << COUNT_BITS;private static final int TERMINATED = 3 << COUNT_BITS;// Packing and unpacking ctlprivate static int runStateOf(int c) { return c & ~CAPACITY; }private static int workerCountOf(int c) { return c & CAPACITY; }private static int ctlOf(int rs, int wc) { return rs | wc; }//rs:狀態 ws:數量 轉: 為什么線程池的狀態簡單的定義為 -1,0,1,2,3不就得了,為什么還要用移位操作呢? 原來這樣的,ThreadPool ctl的這個變量的設計哲學是用int的高3位 + 29個0代表狀態,,用高位000+低29位來表示線程池中工作線程的數量,太佩服了。 首先CAPACITY的值為workCount的最大容量,該值為 000 11111 11111111 11111111 11111111,29個1(默認的出事容量536870911) 我們來看一下 private static int runStateOf(int c) { return c & ~CAPACITY; } 用ctl里面的值與容量取反的方式獲取狀態值。由于CAPACITY的值為000 11111 11111111 11111111 11111111, 那取反后為111 00000 00000000 00000000 00000000, 用 c 與 該值進行與運算,這樣就直接保留了c的高三位, 然后將c的低29位設置為0,這不就是線程池狀態的存放規則嗎,絕。 根據此方法,不難得出計算workCount的方法。 private static int ctlOf(int rs, int wc) { return rs | wc; } 該方法,主要是用來更新運行狀態的。確保工作線程數量不丟失。--------->
理解:ctl初始化:1110 0000 0000 0000 0000 0000 0000 0000 (該值也就是running狀態值)-536870912capacity: 0001 1111 1111 1111 1111 1111 1111 1111 536870911當addworker()添加任務是,ctl中的value(也就是通過ctl.get()取到的值)就會加1,即: 1110 0000 0000 0000 0000 0000 0000 0001該值 & 初始容量capacity,即workerCountOf(c)方法:結果就是0000 0000 0000 0000 0000 0000 0000 0001(1),也就是線程數量為1個,同理getTask()的時候回進行-1操作?
線程池設計原理: 1)線程池的工作線程為ThreadPoolExecutors的Worker線程,無論是submit還是executor方法中傳入的Callable task,Runable參數,只是實現了Runnable接口,在線程池的調用過程,不會調用其start方法,只會調用Worker線程的start方法,然后在Worker線程的run方法中會調用入參的run方法。 2)線程的生命周期在run方法運行結束后(包括異常退出)就結束。要想重復利用線程,就要確保工作線程Worker的run方法運行在一個無限循環中,然后從任務隊列中一個一個獲取對象,如果任務隊列為空,則阻塞,當然需要提供一些控制,結束無限循環,來銷毀線程。在源碼 runWorker方法與getTask來實現。? 大概的實現思路是 如果getTask返回null,則該worker線程將被銷毀。 那getTask在什么情況下會返回false呢? 1、如果線程池的狀態為SHUTDOWN并且隊列不為空 2、如果線程池的狀態大于STOP 3、如果當前運行的線程數大于核心線程數,會返回null,已銷毀該worker線程 對keepAliveTime的理解,如果allowCoreThreadTimeOut為真,那么keepAliveTime其實就是從任務隊列獲取任務等待的超時時間,也就是workerQueue.poll(keepALiveTime, TimeUnit.NANOSECONDS) /*** Executes the given task sometime in the future. The task* may execute in a new thread or in an existing pooled thread.** If the task cannot be submitted for execution, either because this* executor has been shutdown or because its capacity has been reached,* the task is handled by the current {@code RejectedExecutionHandler}.** @param command the task to execute* @throws RejectedExecutionException at discretion of* {@code RejectedExecutionHandler}, if the task* cannot be accepted for execution* @throws NullPointerException if {@code command} is null*/public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();//從ctl中取值,該值包含狀態和數量if (workerCountOf(c) < corePoolSize) {//調用workCountOf方法得到當前的線程數量,和核心線程數比較if (addWorker(command, true))//符合,則調用addworker直接創建線程來執行(這里就是表示,當小于核心線程數時,不管有無空閑線程,都會創建新的線程)return;//創建成功直接returnc = ctl.get();}//沒有創建成功則會進行拒絕策略方面的方法判斷if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);}addWorder():
/*** Checks if a new worker can be added with respect to current* pool state and the given bound (either core or maximum). If so,* the worker count is adjusted accordingly, and, if possible, a* new worker is created and started, running firstTask as its* first task. This method returns false if the pool is stopped or* eligible to shut down. It also returns false if the thread* factory fails to create a thread when asked. If the thread* creation fails, either due to the thread factory returning* null, or due to an exception (typically OutOfMemoryError in* Thread.start()), we roll back cleanly.** @param firstTask the task the new thread should run first (or* null if none). Workers are created with an initial first task* (in method execute()) to bypass queuing when there are fewer* than corePoolSize threads (in which case we always start one),* or when the queue is full (in which case we must bypass queue).* Initially idle threads are usually created via* prestartCoreThread or to replace other dying workers.** @param core if true use corePoolSize as bound, else* maximumPoolSize. (A boolean indicator is used here rather than a* value to ensure reads of fresh values after checking other pool* state).* @return true if successful*/private boolean addWorker(Runnable firstTask, boolean core) {retry://重復執行的標記,下邊代碼有break retry(結束)和continue retry(返回周之前標記為重新執行)for (;;) {int c = ctl.get();//取碼int rs = runStateOf(c);//狀態碼// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {int wc = workerCountOf(c);if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))//進行ctl.value加1操作,成功則結束retrybreak retry;c = ctl.get(); // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);//new worker的時候,內部類中會調用工廠來新建一個線程final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;//重復鎖mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);//workers,set集合,保存著所有的workerint s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}if (workerAdded) {t.start();//workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;}getTask():
/*** Performs blocking or timed wait for a task, depending on* current configuration settings, or returns null if this worker* must exit because of any of:* 1. There are more than maximumPoolSize workers (due to* a call to setMaximumPoolSize).* 2. The pool is stopped.* 3. The pool is shutdown and the queue is empty.* 4. This worker timed out waiting for a task, and timed-out* workers are subject to termination (that is,* {@code allowCoreThreadTimeOut || workerCount > corePoolSize})* both before and after the timed wait, and if the queue is* non-empty, this worker is not the last thread in the pool.** @return task, or null if the worker must exit, in which case* workerCount is decremented*/private Runnable getTask() {boolean timedOut = false; // Did the last poll() time out?for (;;) {int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();//處于stop、tidying、terminate狀態時,循環減線程數量,回去返回對象return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;//if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {//下邊這一塊代碼控制著線程超時時間Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}}?
ThreadPoolExecutor的執行:
當第一次submit或者execute添加任務的時候,如果添加成功會調Thread.start()方法,想線程得到CPU的使用位置的時候,就會走Worker的
run()方法,該run方法會走ThreadPoolExecutor中的runWorker()方法,在這個方法中會走Runnable的run()方法。
?
關于多線程的blog
http://ifeve.com/java-threadpool/
https://blog.csdn.net/hounanjsj/article/details/73822998
https://blog.csdn.net/wangbiao007/article/details/78196413
https://blog.csdn.net/prestigeding/article/details/53929713
https://blog.csdn.net/wangbiao007/article/details/78196413
posted @ 2018-06-26 20:31 犇犇丶 閱讀(...) 評論(...) 編輯 收藏總結
以上是生活随笔為你收集整理的多线程相关-ThreadPoolExecutor的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mybatis-查询过程
- 下一篇: springboot集成mybatis源