创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动
前言
前三篇文章分別分析了 Netty 服務端 channel 的初始化、注冊以及綁定過程的源碼,理論上這篇文章應該開始分析新連接接入過程的源碼了,但是在看源碼的過程中,發現有一個非常重要的組件:NioEventLoop,出現得非常頻繁,以至于影響到了后面源碼的閱讀,因此決定先分析下NioEventLoop的源碼,再分析新連接接入的源碼。關于NioEventLoop這個組件的源碼分析,將會寫兩篇文章來分享。第一篇文章將主要分析NioEventLoop 的創建與啟動,第二篇將主要分析NioEventLoop 的執行流程。
在開始之前,先來思考一下一下兩個問題。
功能說明
NioEventLoop 從功能上,可以把它當做一個線程來理解,當它啟動以后,它就會不停地循環處理三種任務(從類名上也能體現出循環處理的思想:Loop)。這三種任務分別是哪三種任務呢?
NioEventLoop 類的繼承關系特別復雜,它的 UML 圖如下。
從圖中可以看到,它實現了ScheduledExecutorService接口,因此它可以實現定時任務相關的功能;同時它還繼承了SingleThreadEventExecutor類,從類名看,這是一個單線程的線程執行器。
創建流程
在 netty 中,我們通過NioEventLoopGroup來創建NioEventLoop,入口就是下面這一行代碼。
EventLoopGroup workerGroup = new NioEventLoopGroup()當使用NioEventLoopGroup的無參構造器時,netty 會默認創建2 倍 CPU 核數數量的 NioEventLoop;當使用 NioEventLoopGroup 的有參構造方法時,向構造方法中傳入一個 int 值,就表示創建指定個數的 NioEventLoop。無論是使用 NioEventLoopGroup 有參構造方法,還是無參構造方法,最終都會調用到 NioEventLoopGroup 類中的如下構造方法。
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {// 調用父類super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject()); }這個構造方法有很多參數,此時每個參數的解釋如下。
- nThreads:要創建的線程的數量,如果前面使用的是 NioEventLoopGroup 無參構造器,此時 nThreads 的值為 0,如果使用的是 NioEventLoopGroup 的有參構造方法,nThreads 的值為構造方法中傳入的值。
- executor:線程執行器,默認是 null,這個屬性的值會在后面創建 NioEventLoop 時,進行初始化。用戶可以自定義實現 executor,如果用戶自定義了,那么此時 executor 就不為 null,后面就不會再進行初始化。
- selectorProvider:SelectorProvider 類型,它是通過SelectorProvider.provider() 創建出來的,這是 JDK 中 NIO 相關的 API,會創建出一個 SelectorProvider 對象,這個對象的作用就是創建多路復用器 Selector 和服務端 channel。
- selectStrategyFactory:選擇策略工廠,通過 DefaultSelectStrategyFactory.INSTANCE 創建,INSTANCE這個常量的值又是通過new DefaultSelectStrategyFactory() 來創建的。
- RejectedExecutionHandlers.reject():返回的是一個拒絕策略,當向線程池中添加任務時,如果線程池任務隊列已滿,這個時候任務就會被拒絕,此時線程池就會執行拒絕策略。
接著又會調用父類的構造方法,NioEventLoopGroup直接繼承了MultithreadEventLoopGroup類,此時會調用到MultithreadEventLoopGroup的如下構造方法。
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args); }可以看到,構造方法的參數中,有一個 args 參數,是一個 Object 類型的可變數組。因此當在 NioEventLoopGroup 的構造方法調用到父類中時,selectorProvider、selectStrategyFactory、RejectedExecutionHandlers.reject() 都變成了 args 這個可變數組中的元素了。另外,我們從代碼中可以知道,如果前面傳遞過來的 nThread 為 0,那么就令 nThread 的值等于DEFAULT_EVENT_LOOP_THREADS,而DEFAULT_EVENT_LOOP_THREADS這個常量的值就是 2 倍的 CPU 核數;如果前面傳遞過來的 nThread 不為 0,就使用傳遞過來的 nThread。
接著繼續向上調用父類的構造器,MultithreadEventLoopGroup 繼承了MultithreadEventExecutorGroup類,因此會調用到MultithreadEventExecutorGroup的如下構造方法。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args); }其中nThreads、executor、args這些參數就是前面傳過來,這里就不多說了。然后通過DefaultEventExecutorChooserFactory.INSTANCE創建的是一個事件執行選擇工廠,INSTANCE 常量的值是通過new DefaultEventExecutorChooserFactory() 創建出來的對象。 接著又通過 this 調用了 MultithreadEventExecutorGroup 類中的另一個構造方法,接下來這個構造方法就是核心代碼了。該構造方法的代碼很長,為了方便閱讀,我進行了精簡,精簡后的源碼如下。
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {if (nThreads <= 0) {throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));}if (executor == null) {/*** 創建線程執行器:ThreadPerTaskExecutor* newDefaultThreadFactory()會創建一個線程工廠,該線程工廠的作用就是用來創建線程,同時給線程設置名稱:nioEventLoop-1-XX*/executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());}// 根據傳進來的線程數,來創建指定大小的數組大小,這個數組就是用來存放NioEventLoop對象實例children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {//出現異常標識boolean success = false;try {//創建nThreads個nioEventLoop保存到children數組中children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {// 異常處理...}}// 通過線程執行器選擇工廠來創建一個線程執行器chooser = chooserFactory.newChooser(children);// 省略部分代碼... }這個方法中,有三處主要的邏輯。第一處邏輯:當 executor 為空時,創建一個ThreadPerTaskExecutor類型的線程執行器;第二處邏輯:通過newChild(executor, args) 來創建NIoEventLoop;第三處:通過chooserFactory.newChooser(children) 來創建一個線程執行器的選擇器。下面將逐步詳細分析這三處邏輯。
創建線程執行器
if (executor == null) {/*** 創建線程執行器:ThreadPerTaskExecutor* newDefaultThreadFactory()會創建一個線程工廠,該線程工廠的作用就是用來創建線程,同時給線程設置名稱:nioEventLoop-1-XX*/executor = new ThreadPerTaskExecutor(newDefaultThreadFactory()); }在第一處核心邏輯處,首先判斷 executor 是否為空,如果用戶沒有自己指定,默認情況下,executor 是 null,因此就會通過 new 關鍵字來創建一個ThreadPerTaskExecutor類型的線程執行器。
在調用ThreadPerTaskExecutor的構造方法之前,先通過new DefaultThreadFactory() 創建了一個線程工廠,該線程工廠是DefaultThreadFactory類型,它實現了 ThreadFactory 接口,它的作用就是:當調用 threadFactory 的 newThread()方法時,就會創建出一個線程,同時給線程取一個有意義的名稱,名稱生成規則為:nioEventLoop-xx-xx。第一個 xx 的含義表示的 NiEventLoopGroup 的組號,在 netty 中可能同時創建 bossGroup 和 workerGroup 兩個線程組,所以第一個 xx 表示線程組的序號。第二個 xx 表示的是線程在線程組中的序號。如:nioEventLoop-1-1 表示的是該線程是第一個 NioEventLoopGroup 線程組的第一個線程。
ThreadPerTaskExecutor類的源碼比較簡單,它實現了Executor接口,重寫了execute() 方法,當每次調用ThreadPerTaskExecutor類的execute() 方法時,會創建一個線程,并啟動線程。這里可能會有一個疑問:每次調用execute() 方法,都會創建一個線程,豈不是意味著會創建很多線程?實際上,在每個NioEventLoop中,只會調用一次ThreadPerTaskExecutor的execute() 方法,因此對于每個NioEventLoop而言,只會創建一個線程,且當線程啟動后,就不會再調用ThreadPerTaskExecutor的execute() 方法了,也就不會造成在系統中創建多個線程。
public final class ThreadPerTaskExecutor implements Executor {private final ThreadFactory threadFactory;public ThreadPerTaskExecutor(ThreadFactory threadFactory) {if (threadFactory == null) {throw new NullPointerException("threadFactory");}this.threadFactory = threadFactory;}@Overridepublic void execute(Runnable command) {// threadFactory就是前面創建的DefaultThreadFactory// 通過線程工廠的newThread()方法來創建一個線程,并啟動線程threadFactory.newThread(command).start();} }創建 NioEventLoop
// 根據傳進來的線程數,來創建指定大小的數組大小,這個數組就是用來存放NioEventLoop對象實例 children = new EventExecutor[nThreads];for (int i = 0; i < nThreads; i ++) {//出現異常標識boolean success = false;try {//創建nThreads個nioEventLoop保存到children數組中children[i] = newChild(executor, args);success = true;} catch (Exception e) {throw new IllegalStateException("failed to create a child event loop", e);} finally {// 異常處理...} }在執行第二處核心邏輯之前,先創建了一個EventExecutor類型的數組,數組的大小就是前面傳進來的線程個數,然后將數組賦值給children屬性,這個屬性是 NioEventLoopGroup 的屬性,NioEventLoopGroup 包含一組 NioEventLoop 線程,children 屬性就是用來存放這一組 NioEventLoop 線程的。此時只是創建出了數組,但是數組中的元素都是 null,所以接下來通過 for 循環來為數組填充元素,通過newChild(executor, args) 創建出一個 NioEventLoop 對象,然后將對象賦值給數組中的元素。
當調用newChild(executor, args) 方法時,第一個參數 executor 就是上一步創建出來的ThreadPerTaskExecutor對象,第二個參數是一個可變數組,它的每一個元素是什么,有什么作用,在前面已經解釋過了。newChild(executor, args) 定義在 NioEventLoopGroup 類中,源碼如下。
protected EventLoop newChild(Executor executor, Object... args) throws Exception {/*** executor: ThreadPerTaskExecutor* args: args是一個可變數組的參數,實際上它包含三個元素,也就是前面傳遞過來的三個參數,如下:* SelectorProvider.provider()是JDK中NIO相關的API,會創建出一個SelectorProvider,它的作用就是在后面創建多路復用器Selector和服務端channel* DefaultSelectStrategyFactory.INSTANCE 是一個默認選擇策略工廠,new DefaultSelectStrategyFactory()* RejectedExecutionHandlers.reject()返回的是一個拒絕策略,當向線程池中添加任務時,如果線程池任務隊列已滿,這個時候任務就會被拒絕,然后執行拒絕策略**/return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); }可以看見,在newChild() 中直接調用了 NioEventLoop 的構造方法。NioEventLoop 的構造方法源碼如下。
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {/*** executor: ThreadPerTaskExecutor* args: args是一個可變數組的參數,實際上它包含三個元素,也就是前面傳遞過來的三個參數,如下:* SelectorProvider.provider()是JDK中NIO相關的API,會創建出一個SelectorProvider,它的作用就是在后面創建多路復用器Selector和服務端channel* DefaultSelectStrategyFactory.INSTANCE 是一個默認選擇策略工廠,new DefaultSelectStrategyFactory()* RejectedExecutionHandlers.reject()返回的是一個拒絕策略,當向線程池中添加任務時,如果線程池任務隊列已滿,這個時候任務就會被拒絕,然后執行拒絕策略**/super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);if (selectorProvider == null) {throw new NullPointerException("selectorProvider");}if (strategy == null) {throw new NullPointerException("selectStrategy");}provider = selectorProvider;// openSelector()方法會創建一個多路復用器,但是這個多路復用器的selectedKey的底層數據接口被替換了final SelectorTuple selectorTuple = openSelector();//替換了數據結構selectedKeys publicSelectedKeys的原生selectorselector = selectorTuple.selector;//子類包裝的selector 底層數據結構也是被替換了的unwrappedSelector = selectorTuple.unwrappedSelector;selectStrategy = strategy; }在 NioEventLoop 的構造方法中主要干了兩件事,一是繼續向上調用父類的構造方法,二是調用openSelector() 方法。在父類的構造方法中,會初始化兩個任務隊列:tailTasks 和 taskQueue,最終兩個屬性創建出來的都是MpscQueue類型的隊列,同時還將傳入的 executor 進行了一次包裝,通過 ThreadExecutorMap 將其包裝成了一個匿名類。(MpscQueue 是個什么東西呢?它是many producer single consumer的簡寫,意思就是同一時刻可以有多個生產者往隊列中存東西,但是同一時刻只允許一個線程從隊列中取東西。)
接著是調用openSelector() 來創建多路復用器 Selector,然后將多路復用器保存到 NioEventLoop 當中,在這一步 Netty 對多路復用器進行了優化。原生的 Selector 底層存放 SelectionKey 的數據結構是HashSet,HashSet 在極端情況下,添加操作的時間復雜度是 O(n) ,Netty 則將 HashSet 類型替換成了數組類型,這樣添加操作的時間復雜度始終是 O(1) 。openSelector()方法的源碼很長,下面以圖片的方式貼出其源碼,你也可以直接跳過源碼,看我后面的總結。
openSelector()方法的源碼很長,經過整理后,可以總結為如下幾個步驟:
- 先調用 JDK 的 API 創建多路復用器 Selector:provider.openSelector();
- 通過DISABLE_KEY_SET_OPTIMIZATION屬性判斷是否禁用優化,如果為 true,則表示不進行底層數據結構的替換,即不優化,直接返回原生的 Selector。DISABLE_KEY_SET_OPTIMIZATION常量的含義是是否禁用優化:即是否禁止替換底層數據結構,默認為 false,不禁止優化。可以通過 io.netty.noKeySetOptimization 來配置。
- 通過反射加載 SelectorImpl:Class.forName("sun.nio.ch.SelectorImpl",false,PlatformDependent.getSystemClassLoader()) ;
- 通過反射獲取原生 SelectorImpl 中的selectedKeys、publicSelectedKeys屬性(這兩個屬性的數據類型是HashSet 類型),然后再將這兩個屬性的訪問權限設置為 true,接著再通過反射,將selectedKeys、publicSelectedKeys這連個屬性的類型替換為 Netty 中自定義的數據類型:SelectedSelectionKeySet。該類型的底層數據結構是數組類型;
- 最后將SelectedSelectionKeySet封裝到 netty 自定義的多路復用器SelectedSelectionKeySetSelector中,然后將 JDK 原生的 Selector 和 Netty 自定義的 Selector 封裝到SelectorTuple中,再將SelectorTuple返回。注意:這里原生的 selector 的底層數據結構在返回時已經被替換成了數組。
至此,NioEventLoop 的創建已經完成了,總結一下創建 NioEventLoop 的創建過程干了哪些事。
- 初始化了兩個隊列:taskQueue 和 tailQueue,類型均為 MpscQueue。taskQueue 隊列是用來存放任務的隊列,后面 NioEventLoop 啟動后,就會循環的從這個隊列中取出任務執行;tailQueue 是用來存放一些收尾工作的隊列。
- 將前面傳入的ThreadPerTaskExecutor通過ThreadExecutorMap將其包裝成了一個匿名類,然后保存到 NioEventLoop 的 executor 屬性中,后面就能通過 NioEventLoop 來獲取到線程執行器,然后執行任務了。
- 將拒絕策略:RejectedExecutionHandlers.reject()和選擇策略工廠 DefaultSelectStrategyFactory.INSTANCE 保存到 NioEventLoop 中,方便后面從 NioEventLoop 中獲取。
- 將 JDK 原生的多路復用器 Selector 保存到 NioEventLoop 的unwrappedSelector屬性中,將 Netty 自定義的多路復用器 SelectedSelectionKeySetSelector 保存到 NioEventLoop 的selector屬性中。unwrappedSelector 和 selector 底層的數據類型都是數組類型。
線程執行器選擇工廠
當 NioEventLoop 全部創建完成后,就會接著執行第三處核心邏輯,這一步做的工作是通過一個選擇工廠來創建一個線程執行器的選擇器,即給 chooser 屬性賦值。看到這兒,可能有點懵,什么意思呢?為什么要創建這個選擇器呢?
Netty 的 NioEventLoopGroup 包含了一組線程,即一組 NioEventLoop,當有新的連接接入到服務端后,后面需要對這個新連接來進行 IO 事件的讀寫,那這個時候需要使用一個 NioEventLoop 來和這個新連接綁定,也就是和客戶端 channel 綁定,后續對這個客戶端 channel 的數據讀寫都是基于綁定的這個 NioEventLoop 來進行的。既然有多個 NioEventLoop 線程,那么這個時候應該從線程組中選擇哪一個 NioEventLoop 來和客戶端 channel 綁定呢?
Netty 的做法是:輪詢,第一個客戶端 channel 來了后,取線程組中的第一個線程,即 children 數組中的第一個元素;然后當第二個線程來時,取數組中的第二個元素,以此類推,循環的從 children 數組中取 NioEventLoop。這個算法很簡單,如何實現呢?就是每來一個客戶端 channel,先獲取計數器的值,然后用計數器的值對數組取模,然后再將計數器加一。
由于取模運算相對于位運算而言,是一個相對耗時的過程,因此 netty 對此進行了優化。當線程數是 2 的整數次方時,netty 就采用位運算的方式來進行取模運算;當線程數不是 2 的整數次方時,netty 就還是采用取模的方法去進行計算。這兩種計算方法分別是由兩個類來實現的:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,這兩個類都是 EventExecutorChooser 類型,翻譯過來就是事件執行器的選擇器。
而 chooser 就是這兩個選擇器的實例,究竟是PowerOfTwoEventExecutorChooser類型的實例還是GenericEventExecutorChooser類型的實例呢,這取決于 nThread 的數量。newChooser(EventExecutor[] executors) 方法的源碼如下。
public EventExecutorChooser newChooser(EventExecutor[] executors) {// executors是 new NioEventLoop() 的對象數組,// executors.length的值就是前面nThread參數的值if (isPowerOfTwo(executors.length)) {return new PowerOfTwoEventExecutorChooser(executors);} else {return new GenericEventExecutorChooser(executors);} }isPowerOfTwo(int val) 方法就是判斷傳入的值是否是 2 的整數次方。如何判斷呢?又是通過位運算。下面代碼可能不太直觀,舉個栗子:比如傳入的參數是 8,那么 8 和-8 用二進制表示就是:
8: 00000000000000000000000000001000 -8: 11111111111111111111111111111000將 8 和-8 進行與運算,結果還是 8,與原數值相等,因此 8 是 2 的整數次方。
private static boolean isPowerOfTwo(int val) {return (val & -val) == val; }至此,NioEventLoopGroup 的創建過程就結束了,那么 NioEventLoop 的創建過程也就跟著結束了。那么問題來了,我們說 NioEventLoop 實際上就是一個線程,既然是線程,它就必須先啟動,才能輪詢地執行任務,而在整個創建過程的源碼中,我們都沒有看到 NioEventLoop 線程啟動相關的代碼,那么 NioEventLoop 是什么時候啟動的呢?
啟動
NioEventLoop 啟動的觸發時機有兩個,一是在服務端啟動的過程中觸發,另一個是在新連接接入的時候。下面以服務端啟動的過程為例子,進行分析。在服務端啟動過程中,會執行如下一行代碼。
public ChannelFuture register(Channel channel) {return next().register(channel); }next() 就是 chooser 的一個方法,chooser 有兩種不同的實現:PowerOfTwoEventExecutorChooser 和 GenericEventExecutorChooser,這兩種不同的實現對next() 方法有不同的實現邏輯,區別就是:是用位運算從 children 數組中取出一個 NioEventLoop,還是通過取模的方式從 children 數組中取出一個 NioEventLoop,但是最終都是返回一個 NioEventLoop。
所以這兒實際上是執行NioEventLoop 的 register(channel)方法,這個方法一直向下執行,最終會執行到如下代碼:
eventLoop.execute(new Runnable() {@Overridepublic void run() {register0(promise);} });在這兒會調用 NioEventLoop 的 execute()方法,NioEventLoop 繼承了 SingleThreadEventExecutor,execute(task)定義在 SingleThreadEventExecutor 類中,刪減后的源碼如下。
public void execute(Runnable task) {if (task == null) {throw new NullPointerException("task");}// 判斷當前線程是否和NioEventLoop中的線程是否相等,返回true表示相等boolean inEventLoop = inEventLoop();// 將任務加入線程隊列addTask(task);if (!inEventLoop) {// 啟動線程startThread();// 省略部分代碼...}// 省略部分代碼 }可以看到,先判斷當前線程是否和 NioEventLoop 中的線程是否相等,此時由于線程是 main 線程,inEventLoop() 會返回 false,所以會進入到 if 邏輯塊中,并調用startThread() 方法來啟動的線程。
private void startThread() {// 處于為啟動狀態,才會去嘗試啟動線程if (state == ST_NOT_STARTED) {// 嘗試將ST_NOT_STARTED設置為ST_STARTEDif (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {boolean success = false;try {doStartThread();success = true;} finally {// 如果執行doStartThread()出現異常 將STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED回滾if (!success) {STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);}}}} }在doStart() 中,會先判斷 NioEventLoop 是否處于未啟動狀態,只有處于未啟動狀態才會去嘗試啟動線程。在啟動線程之前,會先利用 CAS 方法,將狀態標識為啟動狀態,CAS 成功后,然后再調用doStartThread() 方法。doStartThread() 方法精簡后的源碼如下。
private void doStartThread() {assert thread == null;//真正的啟動線程executor.execute(new Runnable() {@Overridepublic void run() {// 將此線程保存起來thread = Thread.currentThread();if (interrupted) {thread.interrupt();}boolean success = false;updateLastExecutionTime();try {// 啟動NioEventLoopSingleThreadEventExecutor.this.run();success = true;} catch (Throwable t) {logger.warn("Unexpected exception from an event executor: ", t);} finally {// 省略部分代碼....}}}); }可以發現,在doStartThread() 方法中,調用的 executor 屬性的execute() 方法,注意,此時 executor 屬性值是什么?在創建 NioEventLoop 時,創建了一個ThreadPerTaskExecutor類型的對象,然后再通過ThreadExecutorMap將其包裝成了一個匿名類,最后將這個匿名類賦值給了 executor 屬性。所以此時會調用匿名類的execute(Runnable task) 方法,而這個匿名類最最終還是調用的是ThreadPerTaskExecutor的execute(Runnable task) 方法。在前面已經簡單分析了ThreadPerTaskExecutor的execute(Runnable task) 方法,現在為了方便閱讀,再次貼出這部分代碼。
public void execute(Runnable command) {// threadFactory就是前面創建的DefaultThreadFactory// 通過線程工廠的newThread()方法來創建一個線程,并啟動線程threadFactory.newThread(command).start(); }可以看到,該execute() 方法,就是調用線程工廠的newThread(command) 方法來創建一個線程,然后調用線程的start() 的方法啟動線程。當線程啟動后,就會回調傳入的 Runnable 任務的 run()方法,所以接著會回調到doStartThread() 方法中傳入的 Runnable 的 run()方法。從doStartThread() 的源碼中可以看到,在 run()方法中先將創建出來的線程保存了起來,然后會調用 SingleThreadEventExecutor.this.run() 。這一行代碼就是啟動 NioEventLoop 線程,該方法的源碼很長,整個 NioEventLoop 的核心都在這個方法上,它實際上就是在一個無限 for 循環中,不停的去處理事件和任務。關于這個方法的源碼會在下一篇文章詳細分析。
至此,NioEventLoop 中的 Thread 線程已經啟動了,同時會連帶著 NioEventLoop 不停的在無限 for 循環中執行,也就是 NioEventLoop 啟動起來了。
總結
- 本文以new NioEventLoopGroup() 為切入點,通過分析NioEventLoopGroup的源碼,從而分析了NioEventLoop的創建過程,同時還介紹了 Netty 對 NIO 的優化。接著以服務端 channel 啟動的流程為入口,分析了 NioEventLoop 是如何啟動的。
- 默認情況下,netty 會創建 2 倍 CPU 核數數量的NioEventLoop線程,如果顯示指定了數量,則創建指定數量的NioEventLoop。
- 最后回答下文章開頭的兩個問題。
- 第一個問題:Netty 中的線程是何時啟動的?啟動時機有兩個,一個是在服務端啟動的過程中觸發,另一個是在新連接接入的時候,但是最終都是調用ThreadPerTaskExecutor類的execute(Runnable command) 方法,通過線程工廠來創建一個線程,然后調用線程的start() 方法啟動線程,當線程啟動后,又會回調傳入的 Runnable 任務的 run()方法,在任務的 run()方法中通過調用SingleThreadEventExecutor.this.run() 來調用 NioEventLoop 的 run()方法,這樣就啟動了 NioEventLoop。
- 第二個問題:Netty 中的線程是如何實現串行無鎖化的?從源碼中我們可以知道,每個 NioEventLoop 中只包含一個線程,而每個 channel 只會綁定在一個 NioEventLoop 上,一但綁定上了,后面這個 channel 的所有 IO 操作都會交由這個 NioEventLoop 線程來處理,因此不會出現多個 NioEventLoop 線程來爭奪處理 channel 的情況,因此說在 NioEventLoop 上,所有的操作都是串行處理的,不存在鎖的競爭,即串行無鎖化。可能有人會問,串行處理任務,豈不是降低了系統的吞吐量?顯然不是的,因為 netty 中有多個 NioEventLoop 線程,多個 NioEventLoop 同時串行處理,這樣服務既是多線程并行運行,各個線程間又不存在鎖的競爭,大大提高了服務性能。
總結
以上是生活随笔為你收集整理的创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 我精心珍藏的Python代码技巧
- 下一篇: 2019年Java初级和高级部分的技术面