[并发编程] - Executor框架#ThreadPoolExecutor源码解读03
文章目錄
- Pre
- execute源碼分析
- addWorker()解讀
- Worker解讀
Pre
[并發編程] - Executor框架#ThreadPoolExecutor源碼解讀02
說了一堆結論性的東西,作為開發人員著實是不過癮,那這里我們就來剖根問底來看下線程池是如何工作的。
execute源碼分析
ThreadPoolExecutor te = new ThreadPoolExecutor(5,10,500,TimeUnit.SECONDS,new ArrayBlockingQueue(5));for (int i = 0; i < 6; i++) {te.submit(()->{System.out.println("i m task :"+Thread.currentThread().getName());});}使用ThreadPoolExecutor 自定義了一個線程池
參數對應如下
int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue調用了 AbstractExecutorService#submit
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}最核心的方法 execute ,由子類ThredPoolExecutor實現
主要的流程,注釋中也寫的很清楚了
/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/簡單來說,在執行execute()方法時如果狀態一直是RUNNING時,的執行過程如下:
Note : 這里要注意一下addWorker(null, false);,也就是創建一個線程,但并沒有傳入任務,因為任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中獲取任務。所以,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。
addWorker()解讀
private boolean addWorker(Runnable firstTask, boolean core) {}addWorker方法的主要工作是在線程池中創建一個新的線程并執行,
- firstTask參數 用于指定新增的線程執行的第一個任務,
- core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize,false表示新增線程前需要判斷當前活動線程數是否少于maximumPoolSize
Worker解讀
addWorker中多次提到了這個Work這個類, 其實就是 線程池中的每一個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象
ThreadPoolExector中內部類 Worker
繼承了AQS,并實現了Runnable接口 ,
兩個比較重要的屬性
在調用構造方法時,需要把任務傳入,這里通過getThreadFactory().newThread(this);來新建一個線程,newThread方法傳入的參數是this,因為Worker本身繼承了Runnable接口,也就是一個線程,所以一個Worker對象在啟動的時候會調用Worker類中的run方法。
Worker繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來實現呢?可以看到tryAcquire方法,它是不允許重入的,而ReentrantLock是允許重入的
總結
以上是生活随笔為你收集整理的[并发编程] - Executor框架#ThreadPoolExecutor源码解读03的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring5源码 - 构建源码环境
- 下一篇: Spring5源码 - 01 BeanD