BeetleX.FastHttpApi之控制器调度设计
? ? 為了可以更靈活地在Webapi應(yīng)用服務(wù)中分配線程資源,BeetleX.FastHttpApi在線程調(diào)度上直接細(xì)化到Action級別;組件不僅可以精準(zhǔn)控制每個(gè)Action的最大RPS限制,還能精細(xì)到控制使用多少線程資源來處理這些API的請求。接下來詳細(xì)講解組件針對這一塊的實(shí)現(xiàn)結(jié)構(gòu)和代碼。
需求
? ? ? ? 為什么要做到這么精細(xì)的控制呢?如果有足夠資源那是不用考慮這方面的問題;但實(shí)際應(yīng)用中資源不足是經(jīng)常需要面對的問題。在整個(gè)服務(wù)中往往有些API非常占用資源,這個(gè)時(shí)候就希望通過簡單配置來控制API使用的線程數(shù)達(dá)到一個(gè)理想的資源分配結(jié)果。在控制上最直接的辦法是控制對應(yīng)的RPS數(shù)量,但有時(shí)候希望以線程資源的方式來分配。
使用
????????組件可以在控制器的Action上根據(jù)需求標(biāo)記對應(yīng)的限制屬性
設(shè)計(jì)實(shí)現(xiàn)
????????接下來看一下BeetleX.FastHttpApi組件代碼是如何進(jìn)行工作的。由于需要線程控制,那自然就需要一個(gè)隊(duì)列;組件提供一個(gè)NextQueue的隊(duì)列來完成這方面的工作,每個(gè)NextQueue會(huì)分配一個(gè)線程來處理。
public class NextQueue : IDisposable{public NextQueue(){mQueue = new System.Collections.Concurrent.ConcurrentQueue<IEventWork>();ID = System.Threading.Interlocked.Increment(ref mID);}public long ID { get; set; }private static long mID;private readonly object _workSync = new object();private bool _doingWork;private int mCount;private System.Collections.Concurrent.ConcurrentQueue<IEventWork> mQueue;public int Count => mCount;//添加任務(wù)到隊(duì)列中public void Enqueue(IEventWork item){mQueue.Enqueue(item);System.Threading.Interlocked.Increment(ref mCount);lock (_workSync){//當(dāng)前隊(duì)列是否工作中if (!_doingWork){//獲取一個(gè)線程進(jìn)行工作System.Threading.ThreadPool.QueueUserWorkItem(OnStart);_doingWork = true;}}}private void OnError(Exception e, IEventWork work){try{Error?.Invoke(e, work);}catch{}}public static Action<Exception, IEventWork> Error { get; set; }private async void OnStart(object state){while (true){//獲取隊(duì)列任務(wù)并執(zhí)行while (mQueue.TryDequeue(out IEventWork item)){System.Threading.Interlocked.Decrement(ref mCount);using (item){try{//等待任務(wù)執(zhí)行await item.Execute();}catch (Exception e_){OnError(e_, item);}}}lock (_workSync){//隊(duì)列為空跑出線程if (mQueue.IsEmpty){try{Unused?.Invoke();}catch { }_doingWork = false;return;}}}}public Action Unused { get; set; }public void Dispose(){while (mQueue.TryDequeue(out IEventWork work)){try{work.Dispose();}catch{}}}}NextQueue是一個(gè)支持異步任務(wù)的處理隊(duì)列,它確保添加進(jìn)來的任務(wù)都是有序執(zhí)行,即使任務(wù)內(nèi)部處理的任務(wù)是異步。
ActionContext
????????該對象是用于執(zhí)行控制器方法,包括webapi控制器和Websocket控制器。在這里只講述控制怎樣調(diào)度執(zhí)行的,更詳細(xì)了解可以查看
https://github.com/beetlex-io/FastHttpApi/blob/master/src/ActionContext.cs
主要講解一下Execute方法是怎樣調(diào)用控制器方法的
internal async Task Execute(IActionResultHandler resultHandler){//驗(yàn)證RPSif (Handler.ValidateRPS()){Handler.IncrementRequest();//是否存在隊(duì)列控制配置if (Handler.ThreadQueue == null || Handler.ThreadQueue.Type == ThreadQueueType.None){if?(Handler.Async)//異步方法{await OnAsyncExecute(resultHandler);}else{//同步方法OnExecute(resultHandler);}}else{//配置了隊(duì)列控制r妊ActionTask actionTask = new ActionTask(this, resultHandler,new TaskCompletionSource<object>());//獲取異步隊(duì)列var queue = Handler.ThreadQueue.GetQueue(this.HttpContext);//階列是否有效,為了安全隊(duì)列都有最大等待數(shù)限制,超過就拒絕處理if (Handler.ThreadQueue.Enabled(queue)){this.HttpContext.Queue = queue;//把當(dāng)前任務(wù)插入隊(duì)列queue.Enqueue(actionTask);//等待隊(duì)執(zhí)行結(jié)果通知await actionTask.CompletionSource.Task;}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of queue limit!"), EventArgs.LogType.Warring, 500);}}}else{Handler.IncrementError();resultHandler.Error(new Exception($"{Handler.SourceUrl} process error,out of max rps!"), EventArgs.LogType.Warring, 509);}}GetQueue
????????應(yīng)該方法根據(jù)當(dāng)前請示信息和配置來獲取對應(yīng)的異步隊(duì)列
ActionTask
????????方法異步任務(wù)對象,隊(duì)列會(huì)有序地執(zhí)行相關(guān)對象,這對象的實(shí)現(xiàn)非常簡單。
總結(jié)
????????到這里整個(gè)線程調(diào)度的核心就介紹完成了,如果不了解一些基礎(chǔ)知識(shí)會(huì)感覺完成這些功能很復(fù)雜,其實(shí)都是一些基礎(chǔ)功能的應(yīng)用;?完成這些功能主要涉及幾個(gè)基礎(chǔ)知識(shí)分別是:隊(duì)列,線程池和用于處理異步回調(diào)的TaskCompletionSource對象。
BeetleX開源跨平臺(tái)通訊框架(支持TLS)
提供高性能服務(wù)和大數(shù)據(jù)處理解決方案
https://beetlex.io
總結(jié)
以上是生活随笔為你收集整理的BeetleX.FastHttpApi之控制器调度设计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: sql server和mysql的区别是
- 下一篇: ASP.Net服务性能优化原则