C/C++协程实现-学习笔记
協程,又稱微線程,纖程。英文名Coroutine。
協程的概念很早就提出來了,但直到最近幾年才在某些語言(如Lua\go\C++20)中得到廣泛應用。
子程序,或者稱為函數,在所有語言中都是層級調用,比如A調用B,B在執行過程中又調用了C,C執行完畢返回,B執行完畢返回,最后是A執行完畢。
所以子程序調用是通過棧實現的,一個線程就是執行一個子程序。
子程序調用總是一個入口,一次返回,調用順序是明確的。而協程的調用和子程序不同。
協程看上去也是子程序,但執行過程中,在子程序內部可中斷,然后轉而執行別的子程序,在適當的時候再返回來接著執行。
注意,在一個子程序中中斷,去執行其他子程序,不是函數調用,有點類似CPU的中斷。比如子程序A、B:
def A():print '1'print '2'print '3'def B():print 'x'print 'y'print 'z'假設由協程執行,在執行A的過程中,可以隨時中斷,去執行B,B也可能在執行過程中中斷再去執行A,結果可能是:
12xy3z但是在A中是沒有調用B的,所以協程的調用比函數調用理解起來要難一些。
看起來A、B的執行有點像多線程,但協程的特點在于是一個線程執行,那和多線程比,
協程有何優勢?
最大的優勢就是協程極高的執行效率。因為子程序切換不是線程切換,而是由程序自身控制,因此,沒有線程切換的開銷,和多線程比,線程數量越多,協程的性能優勢就越明顯。
第二大優勢就是不需要多線程的鎖機制,因為只有一個線程,也不存在同時寫變量沖突,在協程中控制共享資源不加鎖,只需要判斷狀態就好了,所以執行效率比多線程高很多。
因為協程是一個線程執行,那怎么利用多核CPU呢?最簡單的方法是多進程+協程,既充分利用多核,又充分發揮協程的高效率,可獲得極高的性能。
Python對協程的支持還非常有限,用在generator中的yield可以一定程度上實現協程。雖然支持不完全,但已經可以發揮相當大的威力了。
來看例子:
傳統的生產者-消費者模型是一個線程寫消息,一個線程取消息,通過鎖機制控制隊列和等待,但一不小心就可能死鎖。
如果改用協程,生產者生產消息后,直接通過yield跳轉到消費者開始執行,待消費者執行完畢后,切換回生產者繼續生產,效率極高:
import timedef consumer():r = ''while True:n = yield rif not n:returnprint('[CONSUMER] Consuming %s...' % n)time.sleep(1)r = '200 OK'def produce(c):c.next()n = 0while n < 5:n = n + 1print('[PRODUCER] Producing %s...' % n)r = c.send(n)print('[PRODUCER] Consumer return: %s' % r)c.close()if __name__=='__main__':c = consumer()produce(c)執行結果:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
注意到consumer函數是一個generator(生成器),把一個consumer傳入produce后:
首先調用c.next()啟動生成器;
然后,一旦生產了東西,通過c.send(n)切換到consumer執行;
consumer通過yield拿到消息,處理,又通過yield把結果傳回;
produce拿到consumer處理的結果,繼續生產下一條消息;
produce決定不生產了,通過c.close()關閉consumer,整個過程結束。
整個流程無鎖,由一個線程執行,produce和consumer協作完成任務,所以稱為“協程”,而非線程的搶占式多任務。
?
C/C++ 協程
首先需要聲明的是,這里不打算花時間來介紹什么是協程,以及協程和線程有什么不同。如果對此有任何疑問,可以自行 google。與 Python 不同,C/C++ 語言本身是不能天然支持協程的?,F有的 C++ 協程庫均基于兩種方案:利用匯編代碼控制協程上下文的切換,以及利用操作系統提供的 API 來實現協程上下文切換。典型的例如:
- libco,Boost.context:基于匯編代碼的上下文切換
- phxrpc:基于 ucontext/Boost.context 的上下文切換
- libmill:基于 setjump/longjump 的協程切換
一般而言,基于匯編的上下文切換要比采用系統調用的切換更加高效,這也是為什么 phxrpc 在使用 Boost.context 時要比使用 ucontext 性能更好的原因。關于 phxrpc 和 libmill 具體的協程實現方式,以后有時間再詳細介紹。
libco 協程的創建和切換
在介紹 coroutine 的創建之前,我們先來熟悉一下 libco 中用來表示一個 coroutine 的數據結構,即定義在 co_routine_inner.h 中的?stCoRoutine_t:
struct stCoRoutine_t{stCoRoutineEnv_t *env; // 協程運行環境pfn_co_routine_t pfn; // 協程執行的邏輯函數void *arg; // 函數參數coctx_t ctx; // 保存協程的下文環境...char cEnableSysHook; // 是否運行系統 hook,即非侵入式邏輯char cIsShareStack; // 是否在共享棧模式void *pvEnv;stStackMem_t* stack_mem; // 協程運行時的棧空間char* stack_sp; // 用來保存協程運行時的棧空間unsigned int save_size;char* save_buffer;};我們暫時只需要了解表示協程的最簡單的幾個參數,例如協程運行環境,協程的上下文環境,協程運行的函數以及運行時棧空間。后面的?stack_sp,save_size?和?save_buffer?與 libco 共享棧模式相關,有關共享棧的內容我們后續再說
協程創建和運行
由于多個協程運行于一個線程內部的,因此當創建線程中的第一個協程時,需要初始化該協程所在的環境?stCoRoutineEnv_t,這個環境是線程用來管理協程的,通過該環境,線程可以得知當前一共創建了多少個協程,當前正在運行哪一個協程,當前應當如何調度協程:
struct stCoRoutineEnv_t{stCoRoutine_t *pCallStack[ 128 ]; // 記錄當前創建的協程int iCallStackSize; // 記錄當前一共創建了多少個協程stCoEpoll_t *pEpoll; // 該線程的協程調度器// 在使用共享棧模式拷貝棧內存時記錄相應的 coroutinestCoRoutine_t* pending_co;stCoRoutine_t* occupy_co;};上述代碼表明 libco 允許一個線程內最多創建 128 個協程,其中?pCallStack[iCallStackSize-1]?也就是棧頂的協程表示當前正在運行的協程。當調用函數?co_create?時,首先檢查當前線程中的 coroutine env 結構是否創建。這里 libco 對于每個線程內的 stCoRoutineEnv_t 并沒有使用 thread-local 的方式(例如gcc 內置的?__thread,phxrpc采用這種方式)來管理,而是預先定義了一個大的數組,并通過對應的 PID 來獲取其協程環境。:
static stCoRoutineEnv_t* g_arrCoEnvPerThread[204800]stCoRoutineEnv_t *co_get_curr_thread_env(){return g_arrCoEnvPerThread[ GetPid() ];}初始化?stCoRoutineEnv_t?時主要完成以下幾步:
當初始化完成協程環境之后,調用函數?co_create_env?來創建具體的協程,該函數初始化一個協程結構?stCoRoutine_t,設置該結構中的各項字段,例如運行的函數?pfn,運行時的棧地址等等。需要說明的就是,如果使用了非共享棧模式,則需要為該協程單獨申請??臻g,否則從共享棧中申請空間。棧空間表示如下:
struct stStackMem_t{stCoRoutine_t* occupy_co; // 使用該棧的協程int stack_size; // 棧大小char* stack_bp; // 棧底指針,棧從高地址向低地址增長[棧底在高,棧頂在低]char* stack_buffer; // 棧底};使用?co_create?創建完一個協程之后,將調用?co_resume?來將該協程激活運行:
void co_resume( stCoRoutine_t *co ){stCoRoutineEnv_t *env = co->env;// 獲取當前正在運行的協程的結構stCoRoutine_t *lpCurrRoutine = env->pCallStack[ env->iCallStackSize - 1 ];if( !co->cStart ){// 為將要運行的 co 布置上下文環境coctx_make( &co->ctx,(coctx_pfn_t)CoRoutineFunc,co,0 );co->cStart = 1;}env->pCallStack[ env->iCallStackSize++ ] = co; // 設置co為運行的線程co_swap( lpCurrRoutine, co );}函數?co_swap?的作用類似于 Unix 提供的函數?swapcontext:將當前正在運行的 coroutine 的上下文以及狀態保存到結構?lpCurrRoutine?中,并且將?co?設置成為要運行的協程,從而實現協程的切換。co_swap?具體完成三項工作:
對應于?co_resume?函數,協程主動讓出執行權則調用?co_yield?函數。co_yield?函數調用了?co_yield_env,將當前協程與當前線程中記錄的其他協程進行切換:
void co_yield_env( stCoRoutineEnv_t *env ){stCoRoutine_t *last = env->pCallStack[ env->iCallStackSize - 2 ];stCoRoutine_t *curr = env->pCallStack[ env->iCallStackSize - 1 ];env->iCallStackSize--;co_swap( curr, last);}前面我們已經提到過,pCallStack 棧頂所指向的即為當前正在運行的協程所對應的結構,因此該函數將?curr?取出來,并將當前正運行的協程上下文保存到該結構上,并切換到協程 last 上執行。接下來我們以 32-bit 的系統為例來分析 libco 是如何實現協程運行環境的切換的。
協程上下文的創建和切換
libco 使用結構?struct coctx_t?來表示一個協程的上下文環境:
struct coctx_t{#if defined(__i386__)void *regs[ 8 ];#elsevoid *regs[ 14 ];#endifsize_t ss_size;char *ss_sp;};可以看到,在 i386 的架構下,需要保存 8 個寄存器信息,以及棧指針和棧大小,究竟這 8 個寄存器如何保存,又是如何使用,需要配合后續的?coctx_swap?來理解。我們首先來回顧一下 Unix-like 系統的 stack frame layout,如果不能理解這個,那么剩下的內容就不必看了。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?? ??
?
結合上圖,我們需要知道關鍵的幾點:
了解了這些,我們就來看一下協程上下文環境的初始化函數?coctx_make:
int coctx_make( coctx_t *ctx, coctx_pfn_t pfn, const void *s, const void *s1 ){char *sp = ctx->ss_sp + ctx->ss_size - sizeof(coctx_param_t);sp = (char*)((unsigned long)sp & -16L);coctx_param_t* param = (coctx_param_t*)sp ;param->s1 = s;param->s2 = s1;memset(ctx->regs, 0, sizeof(ctx->regs));ctx->regs[ kESP ] = (char*)(sp) - sizeof(void*);ctx->regs[ kEIP ] = (char*)pfn;return 0;}這段代碼應該比較好理解,首先為函數?coctx_pfn_t?預留 2 個參數的??臻g并對其到 16 字節,之后將實參設置到預留的棧上空間中。最后在?ctx?結構中填入相應的,其中記錄 reg[kEIP] 返回地址為函數指針?pfn,記錄 reg[kESP] 為獲得的棧頂指針?sp?減去一個指針長度,這個減去的空間是為返回地址 RA 預留的。當調用?coctx_swap?時,reg[kEIP] 會被放到返回地址 RA 的位置,待?coctx_swap?執行結束,自然會跳轉到函數?pfn?處執行。
coctx_swap(ctx1, ctx2)?在 coctx_swap.S 中實現。這里可以看到,該函數并沒有使用?push %ebp; move %ebp, %esp; sub $esp N;?開頭,因此??臻g分布中不會出現?ebp?的位置。coctx_swap?函數主要分為兩段,其首先將當前的上下文環境保存到?ctx1?結構中:
leal 4(%esp), %eax // eax = old_esp + 4movl 4(%esp), %esp // 將 esp 的值設為 &ctx1(即ctx1的地址)leal 32(%esp), %esp // esp = (char*)&ctx1 + 32pushl %eax // ctx1->regs[EAX] = %eaxpushl %ebp // ctx1->regs[EBP] = %ebppushl %esi // ctx1->regs[ESI] = %esipushl %edi // ctx1->regs[EDI] = %edipushl %edx // ctx1->regs[EDX] = %edxpushl %ecx // ctx1->regs[ECX] = %ecxpushl %ebx // ctx1->regs[EBX] = %ebxpushl -4(%eax) // ctx1->regs[EIP] = RA, 注意:%eax-4=%old_esp這里需要注意指令?leal?和?movl?的區別。leal?將 eax 的值設置成為 esp 的值加 4,而?movl?將 esp 的值設為 esp+4 所指向的內存上的值,也就是參數 ctx1 的地址。之后該函數將?ctx2?中記錄的上下文恢復到 CPU 寄存器中,并跳轉到其函數地址處運行:
movl 4(%eax), %esp // 將 esp 的值設為 &ctx2(即ctx2的地址)popl %eax // %eax = ctx1->regs[EIP],也就是 &pfnpopl %ebx // %ebx = ctx1->regs[EBP]popl %ecx // %ecx = ctx1->regs[ECX]popl %edx // %edx = ctx1->regs[EDX]popl %edi // %edi = ctx1->regs[EDI]popl %esi // %esi = ctx1->regs[ESI]popl %ebp // %ebp = ctx1->regs[EBP]popl %esp // %esp = ctx1->regs[ESP],即(char*)(sp) - sizeof(void*)pushl %eax // RA = %eax = &pfn,注意此時esp已經指向了新的espxorl %eax, %eax // reset eaxret上面的代碼看起來可能有些繞:
如何使用 libco
我們首先以 libco 提供的例子 example_echosvr.cpp 來介紹應用程序如何使用 libco 來編寫服務端程序。 在 example_echosvr.cpp 的?main?函數中,主要執行如下幾步:
函數?readwrite_coroutine?在外層循環中將新創建的讀寫協程都加入到隊列?g_readwrite?中,此時這些讀寫協程都沒有具體與某個 socket 連接對應,可以將隊列?g_readwrite?看成一個 coroutine pool。當加入到隊列中之后,調用函數?co_yield_ct?函數讓出 CPU,此時控制權回到主線程。
主線程中的函數?co_eventloop?監聽網絡事件,將來自于客戶端新進的連接交由協程 accept_co 處理,關于?co_eventloop?如何喚醒 accept_co 的細節我們將在后續介紹。accept_co 調用函數?accept_routine?接收新連接,該函數的流程如下:
再次回到函數?readwrite_coroutine?中,該函數會調用?co_poll?將新建立的連接的 fd 加入到 Epoll 監聽中,并將控制流程返回到 main 協程;當有讀或者寫事件發生時,Epoll 會喚醒對應的 coroutine ,繼續執行?read?函數以及?write?函數。
上面的過程大致說明了控制流程是如何在不同的協程中切換,接下來我們介紹具體的實現細節,即如何通過 Epoll 來管理協程,以及如何對系統函數進行改造以滿足 libco 的調用。
通過 Epoll 管理和喚醒協程
Epoll 監聽 FD
上一章節中介紹了協程可以通過函數?co_poll?來將 fd 交由 Epoll 管理,待 Epoll 的相應的事件觸發時,再切換回來執行 read 或者 write 操作,從而實現由 Epoll 管理協程的功能。co_poll?函數原型如下:
int co_poll(stCoEpoll_t *ctx, struct pollfd fds[], nfds_t nfds, int timeout_ms)stCoEpoll_t?是為 libco 定制的 Epoll 相關數據結構,fds?是?pollfd?結構的文件句柄,nfds?為?fds?數組的長度,最后一個參數表示定時器時間,也就是在?timeout?毫秒之后觸發處理這些文件句柄。這里可以看到,co_poll?能夠同時將多個文件句柄同時加入到 Epoll 管理中。我們先看?stCoEpoll_t?結構:
struct stCoEpoll_t{int iEpollFd; // Epoll 主 FDstatic const int _EPOLL_SIZE = 1024 * 10; // Epoll 可以監聽的句柄總數struct stTimeout_t *pTimeout; // 時間輪定時器struct stTimeoutItemLink_t *pstTimeoutList; // 已經超時的時間struct stTimeoutItemLink_t *pstActiveList; // 活躍的事件co_epoll_res *result; // Epoll 返回的事件結果};以?stTimeout_?開頭的數據結構與 libco 的定時器管理有關,我們在后面介紹。co_epoll_res?是對 Epoll 事件數據結構的封裝,也就是每次觸發 Epoll 事件時的返回結果,在 Unix 和 MaxOS 下,libco 將使用 Kqueue 替代 Epoll,因此這里也保留了 kevent 數據結構。
struct co_epoll_res{int size;struct epoll_event *events; // for linux epollstruct kevent *eventlist; // for Unix or MacOs kqueue};co_poll?實際是對函數?co_poll_inner?的封裝。我們將?co_epoll_inner?函數的結構分為上下兩半段。在上半段中,調用?co_poll?的協程?CC?將其需要監聽的句柄數組?fds?都加入到 Epoll 管理中,并通過函數?co_yield_env?讓出 CPU;當 main 協程的事件循環?co_eventloop?中觸發了?CC?對應的監聽事件時,會恢復?CC的執行。此時,CC?將開始執行下半段,即將上半段添加的句柄?fds?從 epoll 中移除,清理殘留的數據結構,下面的流程圖簡要說明了控制流的轉移過程:
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
有了上面的基本概念,我們來看具體的實現細節。co_poll?首先在內部將傳入的文件句柄數組?fds?轉化為數據結構?stPoll_t,這一步主要是為了方便后續處理。該結構記錄了?iEpollFd,ndfs,fds?數組,以及該協程需要執行的函數和參數。有兩點需要說明的是:
co_poll?的第二步,也是最關鍵的一步,就是將 fd 數組全部加入到 Epoll 中進行監聽。協程?CC?會將每一個 epoll_event 的?data.ptr?域設置為對應的?stPollItem_t?結構。這樣當事件觸發時,可以直接從對應的?ptr中取出?stPollItem_t?結構,然后喚醒指定協程。
如果本次操作提供了 Timeout 參數,co_poll?還會將協程?CC?本次操作對應的?stPoll_t?加入到定時器隊列中。這表明在 Timeout 定時觸發之后,也會喚醒協程?CC?的執行。當整個上半段都完成后,co_poll?立即調用?co_yield_env?讓出 CPU,執行流程跳轉回到 main 協程中。
從上面的流程圖中也可以看出,當執行流程再次跳回時,表明協程?CC?添加的讀寫等監聽事件已經觸發,即可以執行相應的讀寫操作了。此時?CC?首先將其在上半段中添加的監聽事件從 Epoll 中刪除,清理殘留的數據結構,然后調用讀寫邏輯。
定時器實現
協程?CC?在將一組?fds?加入 Epoll 的同時,還能為其設置一個超時時間。在超時時間到期時,也會再次喚醒?CC?來執行。libco 使用 Timing-Wheel 來實現定時器。關于 Timing-Wheel 算法,可以參考,其優勢是 O(1) 的插入和刪除復雜度,缺點是只有有限的長度,在某些場合下不能滿足需求。
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ??
回過去看?stCoEpoll_t?結構,其中?*pTimeout?代表時間輪,通過函數?AllocateTimeout?初始化為一個固定大小(60 * 1000)的數組。根據 Timing-Wheel 的特性可知,libco 只支持最大 60s 的定時事件。而實際上,在添加定時器時,libco 要求定時時間不超過 40s。成員?pstTimeoutList?記錄在?co_eventloop?中發生超時的事件,而?pstActiveList?記錄當前活躍的事件,包括超時事件。這兩個結構都將在?co_eventloop?中進行處理。
下面我們簡要分析一下加入定時器的實現:
int AddTimeout( stTimeout_t *apTimeout, stTimeoutItem_t *apItem, unsigned long long allNow ){if( apTimeout->ullStart == 0 ) // 初始化時間輪的基準時間{apTimeout->ullStart = allNow;apTimeout->llStartIdx = 0; // 當前時間輪指針指向數組0}// 1. 當前時間不可能小于時間輪的基準時間// 2. 加入的定時器的超時時間不能小于當前時間if( allNow < apTimeout->ullStart || apItem->ullExpireTime < allNow ){return __LINE__;}int diff = apItem->ullExpireTime - apTimeout->ullStart;if( diff >= apTimeout->iItemSize ) // 添加的事件不能超過時間輪的大小{return __LINE__;}// 插入到時間輪盤的指定位置AddTail( apTimeout->pItems +(apTimeout->llStartIdx + diff ) % apTimeout->iItemSize, apItem );return 0;}定時器的超時檢查在函數?co_eventloop?中執行。
EPOLL 事件循環
main 協程通過調用函數?co_eventloop?來監聽 Epoll 事件,并在相應的事件觸發時切換到指定的協程執行。有關?co_eventloop?與 應用協程的交互過程在上一節的流程圖中已經比較清楚了,下面我們主要介紹一下?co_eventloop?函數的實現:
上文中也提到,通過?epoll_wait?返回的事件都保存在?stCoEpoll_t?結構的?co_epoll_res?中。因此?co_eventloop?首先為?co_epoll_res?申請空間,之后通過一個無限循環來監聽所有 coroutine 添加的所有事件:
for(;;){int ret = co_epoll_wait( ctx->iEpollFd,result,stCoEpoll_t::_EPOLL_SIZE, 1 );...}對于每一個觸發的事件,co_eventloop?首先通過指針域?data.ptr?取出保存的?stPollItem_t?結構,并將其添加到?pstActiveList?列表中;之后從定時器輪盤中取出所有已經超時的事件,也將其全部添加到?pstActiveList?中,pstActiveList?中的所有事件都作為活躍事件處理。
對于每一個活躍事件,co_eventloop?將通過調用對應的?pfnProcess?也就是上圖中的OnPollProcessEvent?函數來切換到該事件對應的 coroutine,將流程跳轉到該 coroutine 處執行。
最后?co_eventloop?在調用時也提供一個額外的參數來供調用者傳入一個函數指針?pfn。該函數將會在每次循環完成之后執行;當該函數返回 -1 時,將會終止整個事件循環。用戶可以利用該函數來控制 main 協程的終止或者完成一些統計需求。
?
鏈接:https://blog.csdn.net/qq_25424545/article/details/81529717
總結
以上是生活随笔為你收集整理的C/C++协程实现-学习笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C++协程
- 下一篇: B树,B-树和B+树、B*树的区别