netty worker线程数量_Dubbo线程模型
dubbo線程模型包括線程模型策略和dubbo線程池策略兩個方面,下面就依次進行分析。
dubbo線程模型
Dubbo默認的底層網絡通信使用的是Netty,服務提供方NettyServer使用兩級線程池,其中EventLoopGroup(boss)主要用來接收客戶端的鏈接請求,并把完成TCP三次握手的連接分發給EventLoopGroup(worker)來處理,注意把boss和worker線程組稱為I/O線程,前者處理IO連接事件,后者處理IO讀寫事件。
設想下,dubbo provider端的netty IO線程是如何處理業務邏輯呢?如果處理邏輯較為簡單,并且不會發起新的I/O請求,那么直接在I/O線程上處理會更快,因為這樣減少了線程池調度與上下文切換的開銷,畢竟線程切換還是有一定成本的。如果邏輯較為復雜,或者需要發起網絡通信,比如查詢數據庫,則I/O線程必須派發請求到新的線程池進行處理,否則I/O線程會被阻塞,導致處理IO請求效率降低。
那Dubbo是如何做的呢?Dubbo中根據請求的消息類是直接被I/O線程處理還是被業務線程池處理,Dubbo提供了下面幾種線程模型:
- all(AllDispatcher類):所有消息都派發到業務線程池,這些消息包括請求、響應、連接事件、斷開事件等,響應消息會優先使用對于請求所使用的線程池。
- direct(DirectDispatcher類):所有消息都不派發到業務線程池,全部在IO線程上直接執行。
- message(MessageOnlyDispatcher類):只有請求響應消息派發到業務線程池,其他消息如連接事件、斷開事件、心跳事件等,直接在I/O線程上執行。
- execution(ExecutionDispatcher類):只把請求類消息派發到業務線程池處理,但是響應、連接事件、斷開事件、心跳事件等消息直接在I/O線程上執行。
- connection(ConnectionOrderedDispatcher類):在I/O線程上將連接事件、斷開事件放入隊列,有序地逐個執行,其他消息派發到業務線程池處理。
dubbo線程池可選模型較多,下面以DirectDispatcher類進行分析,其他流程類似就不在贅述。
public class DirectChannelHandler extends WrappedChannelHandler {@Overridepublic void received(Channel channel, Object message) throws RemotingException {ExecutorService executor = getPreferredExecutorService(message);if (executor instanceof ThreadlessExecutor) {try {executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));} catch (Throwable t) {throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);}} else {handler.received(channel, message);}} }DirectDispatcher類重寫了received方法,注意 ThreadlessExecutor 被應用在調用 future.get() 之前,先調用 ThreadlessExecutor.wait(),wait 會使業務線程在一個阻塞隊列上等待,直到隊列中被加入元素。很明顯,provider側調用getPreferredExecutorService(message)返回的不是ThreadlessExecutor,所以會在當前IO線程執行執行。
其他事件,比如連接、異常、斷開等,都是在WrappedChannelHandler中默認實現:執行在當前IO線程中執行的,代碼如下:@Override public void connected(Channel channel) throws RemotingException {handler.connected(channel); } @Override public void disconnected(Channel channel) throws RemotingException {handler.disconnected(channel); } @Override public void sent(Channel channel, Object message) throws RemotingException {handler.sent(channel, message); } @Override public void caught(Channel channel, Throwable exception) throws RemotingException {handler.caught(channel, exception); }dubbo線程模型策略
了解了dubbo線程模型之后,小伙伴是不是該問:
既然有那么多的線程模型策略,dubbo線程模型具體使用的是什么策略呢?從netty啟動流程來看,初始化NettyServer時會進行加載具體的線程模型,代碼如下:
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url)); } public static ChannelHandler wrap(ChannelHandler handler, URL url) {return ChannelHandlers.getInstance().wrapInternal(handler, url); } protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))); }這里根據URL里的線程模型來選擇具體的Dispatcher實現類。在此,我們再提一下Dubbo提供的Dispatcher實現類,其默認的實現類是all,也就是AllDispatcher類。既然Dispatcher是通過SPI方式加載的,也就是用戶可以自定義自己的線程模型,只需實現Dispatcher類然后配置選擇使用自定義的Dispatcher類即可。
dubbo線程池策略
dubbo處理流程,為了盡量早地釋放Netty的I/O線程,某些線程模型會把請求投遞到線程池進行異步處理,那么這里所謂的線程池是什么樣的線程池呢?
其實這里的線程池ThreadPool也是一個擴展接口SPI,Dubbo提供了該擴展接口的一些實現,具體如下:- FixedThreadPool:創建一個具有固定個數線程的線程池。
- LimitedThreadPool:創建一個線程池,這個線程池中的線程個數隨著需要量動態增加,但是數量不超過配置的閾值。另外,空閑線程不會被回收,會一直存在。
- EagerThreadPool:創建一個線程池,在這個線程池中,當所有核心線程都處于忙碌狀態時,將創建新的線程來執行新任務,而不是把任務放入線程池阻塞隊列。
- CachedThreadPool:創建一個自適應線程池,當線程空閑1分鐘時,線程會被回收;當有新請求到來時,會創建新線程。
知道了這些線程池之后,那么是什么時候進行SPI加載對應的線程池實現呢?具體是在dubbo 線程模型獲取對應線程池時進行SPI加載的,具體邏輯在方法 org.apache.dubbo.common.threadpool.manager.DefaultExecutorRepository#createExecutor中:
private ExecutorService createExecutor(URL url) {return (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); } @SPI("fixed") public interface ThreadPool {@Adaptive({THREADPOOL_KEY})Executor getExecutor(URL url); }從代碼來看,默認的線程池策略是fixed模式的線程池,其coreSize默認為200,隊列大小為0,其代碼如下:
public class FixedThreadPool implements ThreadPool {@Overridepublic Executor getExecutor(URL url) {String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);int threads = url.getParameter(THREADS_KEY, DEFAULT_THREADS);int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));} }注:其他線程池策略和FixedThreadPool類似,只不過線程池參數不同而已,這里不再贅述。小結
從dubbo提供的幾種線程模型和線程池策略來看,基本上能滿足絕大多數場景的需求了,由于dubbo線程模型和線程池策略都是通過SPI的方式進行加載的,因此如果業務上需要,我們完全可以自定義對應的線程模型和線程池策略,只需要配置下即可。
推薦閱讀
Dubbo RPC在consumer端是如何跑起來的?mp.weixin.qq.comdubbo版的"明朝那些事兒"?mp.weixin.qq.com責任鏈的2種實現方式,你更pick哪一種?mp.weixin.qq.com網絡數據是如何傳遞給進程的?mp.weixin.qq.comLinux管道那些事兒?mp.weixin.qq.com從socket api看網絡通信流程?mp.weixin.qq.com總結
以上是生活随笔為你收集整理的netty worker线程数量_Dubbo线程模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 工商银行怎么更新身份证信息 怎样更新工商
- 下一篇: 造呼吸机的上市公司