UDT 最新源码分析(五) -- 网络数据收发
UDT 最新源碼分析 -- 網絡數據收發(fā)
- 從接口實現看 UDT 網絡收發(fā)
- UDT 發(fā)送 send / sendmsg / sendfile
- UDT 接收 recv /recvmsg /recvfile
- 從內部實現看 UDT 網絡收發(fā)
- UDT 發(fā)送工作線程
- UDT 接收工作線程
從接口實現看 UDT 網絡收發(fā)
從對外的接口實現方法來看,網絡收發(fā)過程實際上是對 m_pSndBuffer 和 m_pRcvBuffer 進行操作,而實際的網絡收發(fā)涉及到系統調度,算法實現等問題。簡單來看看代碼。
UDT 發(fā)送 send / sendmsg / sendfile
以 send 為例,外部接口調用send 其實并不是直接發(fā)送到網絡,而是將數據加入發(fā)送的 buffer 中,后續(xù)再通過調度將數據發(fā)送到網絡中去。send 僅僅針對流傳輸模式而言,其他模式不可調用此函數。對于數據包模式,應該調用 sendmsg。
CUDT::send(UDTSOCKET u, const char* buf, int len, int)
-> CUDT::send(const char* data, int len)
sendmsg 與 send 函數有非常多代碼一致,核心代碼基本上沒有變化。sendfile 中 addbuffer 變成 addBufferFromFile,其余基本沒變化。
UDT 接收 recv /recvmsg /recvfile
從接口調用 recv 實際上只是從接收緩沖中取出數據,在獲取數據會檢查當前是否流模式,如果沒數據,或啟動條件喚醒和定時等待等,也會檢查網絡連接是否正常。
int CUDT::recv(char* data, int len) {... if (0 == m_pRcvBuffer->getRcvDataSize()) // buffer 為空{... //等待條件滿足或者超時}int res = m_pRcvBuffer->readBuffer(data, len);if (m_pRcvBuffer->getRcvDataSize() <= 0){// read is not available any mores_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false); //刪除}if ((res <= 0) && (m_iRcvTimeOut >= 0))throw CUDTException(6, 3, 0);return res; }從內部實現看 UDT 網絡收發(fā)
從接口上可以看到,發(fā)送接收僅僅是將數據與buffer進行交互,看不到數據真正進行發(fā)送接收的地方。那么在內部究竟如何實現的呢?在以前的文章分析中已經提到過發(fā)送接收工作線程的概念,在這里再次看看,代碼參考 queue.cpp。
初始化的地方如下,通過調用 m_pSndQueue 和 m_pRcvQueue 調用 init 實現 worker 線程創(chuàng)建:
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock) {...CMultiplexer m;m.m_iID = s->m_SocketID;m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);try{if (NULL != udpsock)m.m_pChannel->open(*udpsock);elsem.m_pChannel->open(addr);}catch (CUDTException& e){m.m_pChannel->close();delete m.m_pChannel;throw e;}m.m_pTimer = new CTimer;m.m_pSndQueue = new CSndQueue;m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);m.m_pRcvQueue = new CRcvQueue;m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);m_mMultiplexer[m.m_iID] = m; }UDT 發(fā)送工作線程
發(fā)送線程中主要的變量有 m_pSndUList, m_pChannel, m_pTimer。線程的工作就是不停的檢查 m_pSndUList 中的UDT 實例,取出包,通過 m_pChannel 發(fā)送出去。如果取出的包時發(fā)現未到發(fā)送時間,則通過 m_pTimer sleep 剩余的時間再發(fā)送。
創(chuàng)建線程如下所示:
void CSndQueue::init(CChannel* c, CTimer* t) {m_pChannel = c;m_pTimer = t;m_pSndUList = new CSndUList;m_pSndUList->m_pWindowLock = &m_WindowLock;m_pSndUList->m_pWindowCond = &m_WindowCond;m_pSndUList->m_pTimer = m_pTimer;#ifndef WIN32if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this)){m_WorkerThread = 0;throw CUDTException(3, 1);}#elseDWORD threadID;m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);if (NULL == m_WorkerThread)throw CUDTException(3, 1);#endif }根據前面的描述,接下來理解發(fā)送工作線程運行過程。getNextProcTime 實際上就是獲取 m_pHeap[0] 的 m_llTimeStamp。這個時間就是即將要發(fā)送的數據的時間。sleepto 等待時間到達。pop 則是初始化 CPacket,然后再發(fā)送。如果 ts <= 0,代表當前并無數據需要發(fā)送,需要繼續(xù)等待。
#ifndef WIN32void* CSndQueue::worker(void* param) #elseDWORD WINAPI CSndQueue::worker(LPVOID param) #endif {CSndQueue* self = (CSndQueue*)param;while (!self->m_bClosing){uint64_t ts = self->m_pSndUList->getNextProcTime(); //獲取下一次發(fā)送時間if (ts > 0){// wait until next processing time of the first socket on the listuint64_t currtime;CTimer::rdtsc(currtime);if (currtime < ts) //時間未到self->m_pTimer->sleepto(ts); //sleep, 控制包與包之間的發(fā)送間隔// it is time to send the next pktsockaddr* addr;CPacket pkt;if (self->m_pSndUList->pop(addr, pkt) < 0)continue;self->m_pChannel->sendto(addr, pkt);}else{// wait here if there is no sockets with data to be sent#ifndef WIN32pthread_mutex_lock(&self->m_WindowLock);if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);pthread_mutex_unlock(&self->m_WindowLock);#elseWaitForSingleObject(self->m_WindowCond, INFINITE);#endif}}#ifndef WIN32return NULL;#elseSetEvent(self->m_ExitCond);return 0;#endif }Retrieve the next packet and peer address from the first entry, and reschedule it in the queue.
在線程循環(huán)塊內,出現了 pop 方法。這個方法取出 m_pHeap 中的根節(jié)點,檢查時間戳,若時間已到,在堆中刪除該節(jié)點,進入 packData。
m_pHeap 是一個以節(jié)點時間為參考建立的最小堆。所有的插入與刪除操作均為堆的操作,需要注意的是,孩子節(jié)點與根節(jié)點的對應關系。對于根節(jié)點 q 來說,左孩子序號為 2 * q + 1, 右孩子為 2 * q + 2,這也是代碼中的 p 節(jié)點值。
首先看刪除某節(jié)點的操作:
void CSndUList::remove_(const CUDT* u) {CSNode* n = u->m_pSNode;if (n->m_iHeapLoc >= 0){// remove the node from heap 最后節(jié)點與被刪節(jié)點交換m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];m_iLastEntry --;m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;int q = n->m_iHeapLoc; //被刪位置上新節(jié)點int p = q * 2 + 1; //左孩子序號while (p <= m_iLastEntry) // 存在左孩子節(jié)點{// 存在右孩子,且左孩子時間戳大于右孩子時間戳,則修改當前孩子為右孩子if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))p ++;// 如果根節(jié)點時間戳大于孩子中最小時間戳節(jié)點,則交換,并置當前節(jié)點為新的根節(jié)點的左孩子if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp){CSNode* t = m_pHeap[p];m_pHeap[p] = m_pHeap[q];m_pHeap[p]->m_iHeapLoc = p;m_pHeap[q] = t;m_pHeap[q]->m_iHeapLoc = q;q = p;p = q * 2 + 1;}elsebreak;}n->m_iHeapLoc = -1;}// the only event has been deleted, wake up immediatelyif (0 == m_iLastEntry)m_pTimer->interrupt(); }對于插入操作,只要記住節(jié)點序號關系,就很容易看明白了。父節(jié)點 p 為孩子節(jié)點 (q-1)/2。 如果還不明白,只能去復習一下堆的數據結構相關知識。
void CSndUList::insert_(int64_t ts, const CUDT* u) {CSNode* n = u->m_pSNode;// do not insert repeated nodeif (n->m_iHeapLoc >= 0) return;//插入增加到最后節(jié)點m_iLastEntry ++; m_pHeap[m_iLastEntry] = n;n->m_llTimeStamp = ts;//開始調整int q = m_iLastEntry;int p = q;while (p != 0){p = (q - 1) >> 1; //父節(jié)點if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp){CSNode* t = m_pHeap[p];m_pHeap[p] = m_pHeap[q];m_pHeap[q] = t;t->m_iHeapLoc = q;q = p;}elsebreak;}n->m_iHeapLoc = q;// an earlier event has been inserted, wake up sending workerif (n->m_iHeapLoc == 0)m_pTimer->interrupt();// first entry, activate the sending queueif (0 == m_iLastEntry){#ifndef WIN32pthread_mutex_lock(m_pWindowLock);pthread_cond_signal(m_pWindowCond); //喚醒線程pthread_mutex_unlock(m_pWindowLock);#elseSetEvent(*m_pWindowCond);#endif} }在發(fā)送線程中還有一個 packData 方法,處理了兩類 packet 的讀取,一是丟失的 packet,二是正常的順序傳輸的包。處理過程:
獲取 entertime, 更新 m_ullTimeDiff, 即記錄當前發(fā)包對應目標時間的差值,會影響到下一次發(fā)包的目標時間。UDT 以此使得發(fā)包的時間間隔始終控制在算法之中。
在 UDT 中,在開始的時候會初始化一個發(fā)包時間間隔 m_ullInterval ,這個值表示期望的發(fā)送時間間隔。初始化如下所示:
m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
m_ullInterval 并不是一個固定的值,而是根據網絡狀態(tài)進行調整。比如在 processCtrl 中 收到包類型為 4 時,就會改變。但是查找代碼可以發(fā)現,當前udt 版本不再執(zhí)行 sendCtrl(4),代碼詳見包類型為6 時,代碼已經被注釋。但是無用代碼并未刪除,如下所示。
// One way packet delay is increasing, so decrease the sending rate m_ullInterval = (uint64_t)ceil(m_ullInterval * 1.125);在擁塞控制中 CCUpdate 改變 m_ullInterva 值:
m_ullInterval = (uint64_t) (m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
m_dCongestionWindow = m_pCC->m_dCWndSize;
if (m_llMaxBW <= 0)
??return;
const double minSP = 1000000.0 / (double(m_llMaxBW) / m_iMSS) * m_ullCPUFrequency;
if (m_ullInterval < minSP)
??m_ullInterval = minSP;
在UDT中,包發(fā)送會有一個隨著網絡狀況調整的一個發(fā)送周期,也就是 m_ullInterva 值。在每一次發(fā)送包時,都會根據 m_ullInterval值計算下一次包發(fā)送的理想時間間隔,并修改 m_ullTargetTime 值。
檢查是否丟包。
-
如果丟包,就將 packet.m_iSeqNo 賦值為丟包的序號值。然后計算 offset。m_iSndLastDataAck 是在接收到最后一個 ack 時更新的序號,之前的所有包都被確認。如果 offset < 0, 表示上次確認序號大于丟包序號,即有包未收到但是被確認,可能出現錯誤。讀取數據如果失敗,就會發(fā)送丟棄請求,并更新 m_iSndCurrSeqNo。
-
如果沒有丟包,則發(fā)送一個新包。根據流窗口與擁塞窗口更新 cwnd 值。若發(fā)送包序號在窗口范圍內,則 readData 并且更新本地和 ccc中 m_iSndCurrSeqNo,更新 m_iSeqNo,檢查是否需要發(fā)送包對探測。
-
更新 packet 與 cc,更新 ts, m_ullTargetTime。包將在 worker 中被發(fā)送
UDT 接收工作線程
接收工作線程的主要工作同樣在 while 循環(huán)中完成。首先檢查是否有新的 socket 到來,如果有,則不斷加入 m_pRcvUList,同時添加到 m_pHash 中。然后再 m_UnitQueue 中查找是否存在可用的存儲塊,在此過程中如果發(fā)現已經數量太多會自動擴容。不斷的通過 recvfrom 接收包。
如果是連接請求, 將被送給 listening socket 或者 rendezvous sockets,對應將進入 listen 或者 connect 操作。否則, 根據 getFlag 判斷,進入 processData 或者 processCtrl。這也是接收數據被處理的核心函數。最后將這個 UDT實例 放入 m_pRcvUList 最后。
#ifndef WIN32void* CRcvQueue::worker(void* param) #elseDWORD WINAPI CRcvQueue::worker(LPVOID param) #endif {CRcvQueue* self = (CRcvQueue*)param;sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;CUDT* u = NULL;int32_t id;while (!self->m_bClosing){#ifdef NO_BUSY_WAITINGself->m_pTimer->tick();#endif// check waiting list, if new socket, insert it to the listwhile (self->ifNewEntry()){CUDT* ne = self->getNewEntry();if (NULL != ne){self->m_pRcvUList->insert(ne);self->m_pHash->insert(ne->m_SocketID, ne);}}// find next available slot for incoming packetCUnit* unit = self->m_UnitQueue.getNextAvailUnit();if (NULL == unit){// no space, skip this packetCPacket temp;temp.m_pcData = new char[self->m_iPayloadSize];temp.setLength(self->m_iPayloadSize);self->m_pChannel->recvfrom(addr, temp);delete [] temp.m_pcData;goto TIMER_CHECK;}unit->m_Packet.setLength(self->m_iPayloadSize);// reading next incoming packet, recvfrom returns -1 is nothing has been receivedif (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)goto TIMER_CHECK;id = unit->m_Packet.m_iID;// ID 0 is for connection request, which should be passed to the listening socket or rendezvous socketsif (0 == id){if (NULL != self->m_pListener)self->m_pListener->listen(addr, unit->m_Packet);else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){// asynchronous connect: call connect here// otherwise wait for the UDT socket to retrieve this packetif (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}else if (id > 0){if (NULL != (u = self->m_pHash->lookup(id))){if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion)){if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing){if (0 == unit->m_Packet.getFlag())u->processData(unit);elseu->processCtrl(unit->m_Packet);u->checkTimers();self->m_pRcvUList->update(u);}}}else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id))){if (!u->m_bSynRecving)u->connect(unit->m_Packet);elseself->storePkt(id, unit->m_Packet.clone());}}TIMER_CHECK:// take care of the timing event for all UDT socketsuint64_t currtime;CTimer::rdtsc(currtime);CRNode* ul = self->m_pRcvUList->m_pUList;uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();while ((NULL != ul) && (ul->m_llTimeStamp < ctime)){CUDT* u = ul->m_pUDT;if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing){u->checkTimers();self->m_pRcvUList->update(u);}else{// the socket must be removed from Hash table first, then RcvUListself->m_pHash->remove(u->m_SocketID);self->m_pRcvUList->remove(u);u->m_pRNode->m_bOnList = false;}ul = self->m_pRcvUList->m_pUList;}// Check connection requests status for all sockets in the RendezvousQueue.self->m_pRendezvousQueue->updateConnStatus();}if (AF_INET == self->m_UnitQueue.m_iIPversion)delete (sockaddr_in*)addr;elsedelete (sockaddr_in6*)addr;#ifndef WIN32return NULL;#elseSetEvent(self->m_ExitCond);return 0;#endif }checkTimers 會更新 cc 參數,并發(fā)送 ack 包,檢查連接是否中斷。在代碼中,NAK 定時器不再生效,僅僅依靠發(fā)送方的超時機制。檢測如果16個超時 且 總時間達到閾值才會認為連接掛掉。超時也會導致擁塞控制算法進行調整。
void CUDT::checkTimers() {// update CC parametersCCUpdate(); //更新發(fā)包時間間隔和擁塞窗口uint64_t currtime;CTimer::rdtsc(currtime);if ((currtime > m_ullNextACKTime) || ((m_pCC->m_iACKInterval > 0) && (m_pCC->m_iACKInterval <= m_iPktCount))){// ACK timer expired or ACK interval is reachedsendCtrl(2); //ackCTimer::rdtsc(currtime);if (m_pCC->m_iACKPeriod > 0) //更新 m_ullNextACKTimem_ullNextACKTime = currtime + m_pCC->m_iACKPeriod * m_ullCPUFrequency;elsem_ullNextACKTime = currtime + m_ullACKInt;m_iPktCount = 0;m_iLightACKCount = 1;}else if (m_iSelfClockInterval * m_iLightACKCount <= m_iPktCount){//send a "light" ACKsendCtrl(2, NULL, NULL, 4);++ m_iLightACKCount;}// we are not sending back repeated NAK anymore and rely on the sender's EXP for retransmission//if ((m_pRcvLossList->getLossLength() > 0) && (currtime > m_ullNextNAKTime))//{ // // NAK timer expired, and there is loss to be reported.// sendCtrl(3);//// CTimer::rdtsc(currtime);// m_ullNextNAKTime = currtime + m_ullNAKInt;//} //不再觸發(fā) NAK 定時器,僅僅依靠發(fā)送方的重傳超時,應該是為了減少誤丟包識別。uint64_t next_exp_time;if (m_pCC->m_bUserDefinedRTO)next_exp_time = m_ullLastRspTime + m_pCC->m_iRTO * m_ullCPUFrequency;else{uint64_t exp_int = (m_iEXPCount * (m_iRTT + 4 * m_iRTTVar) + m_iSYNInterval) * m_ullCPUFrequency;if (exp_int < m_iEXPCount * m_ullMinExpInt)exp_int = m_iEXPCount * m_ullMinExpInt;next_exp_time = m_ullLastRspTime + exp_int;}if (currtime > next_exp_time){// Haven't receive any information from the peer, is it dead?!// timeout: at least 16 expirations and must be greater than 10 secondsif ((m_iEXPCount > 16) && (currtime - m_ullLastRspTime > 5000000 * m_ullCPUFrequency)){// Connection is broken. // UDT does not signal any information about this instead of to stop quietly.// Application will detect this when it calls any UDT methods next time.m_bClosing = true;m_bBroken = true;m_iBrokenCounter = 30;// update snd U list to remove this socketm_pSndQueue->m_pSndUList->update(this);releaseSynch();// app can call any UDT API to learn the connection_broken errors_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN | UDT_EPOLL_OUT | UDT_EPOLL_ERR, true);CTimer::triggerEvent();return;}// sender: Insert all the packets sent after last received acknowledgement into the sender loss list.// recver: Send out a keep-alive packetif (m_pSndBuffer->getCurrBufSize() > 0){if ((CSeqNo::incseq(m_iSndCurrSeqNo) != m_iSndLastAck) && (m_pSndLossList->getLossLength() == 0)){// resend all unacknowledged packets on timeout, but only if there is no packet in the loss listint32_t csn = m_iSndCurrSeqNo;int num = m_pSndLossList->insert(m_iSndLastAck, csn); m_iTraceSndLoss += num;m_iSndLossTotal += num;}m_pCC->onTimeout();CCUpdate();// immediately restart transmissionm_pSndQueue->m_pSndUList->update(this);}else{sendCtrl(1); //keep-live 包}++ m_iEXPCount; //增加,如果到達16 次,進入超時處理,如果收到確認,則重置為0。// Reset last response time since we just sent a heart-beat.m_ullLastRspTime = currtime;} }再回頭看數據處理部分 processData 。
int CUDT::processData(CUnit* unit) {CPacket& packet = unit->m_Packet;// Just heard from the peer, reset the expiration count.m_iEXPCount = 1; //有收到數據,重置 EXPuint64_t currtime;CTimer::rdtsc(currtime);m_ullLastRspTime = currtime; //更新 m_ullLastRspTimem_pCC->onPktReceived(&packet); //未找到函數的實現++ m_iPktCount;// update time information, 記錄包到達的時間以及上一包時間m_pRcvTimeWindow->onPktArrival(); //記錄的目的用于計算包的到達速率,然后將計算的速率通過ACK反饋回去// check if it is probing packet pair, 用于估計鏈路容量,將計算的容量通過ACK反饋回去if (0 == (packet.m_iSeqNo & 0xF))//檢查是否為包對m_pRcvTimeWindow->probe1Arrival(); //記錄包對中第一個包的到達時間else if (1 == (packet.m_iSeqNo & 0xF))m_pRcvTimeWindow->probe2Arrival(); // 記錄探測包對的時間間隔++ m_llTraceRecv;++ m_llRecvTotal;int32_t offset = CSeqNo::seqoff(m_iRcvLastAck, packet.m_iSeqNo);if ((offset < 0) || (offset >= m_pRcvBuffer->getAvailBufSize()))return -1;if (m_pRcvBuffer->addData(unit, offset) < 0)//將數據包加入到 m_pRcvBufferreturn -1;// Loss detection.if (CSeqNo::seqcmp(packet.m_iSeqNo, CSeqNo::incseq(m_iRcvCurrSeqNo)) > 0){// If loss found, insert them to the receiver loss listm_pRcvLossList->insert(CSeqNo::incseq(m_iRcvCurrSeqNo), CSeqNo::decseq(packet.m_iSeqNo));// pack loss list for NAKint32_t lossdata[2];lossdata[0] = CSeqNo::incseq(m_iRcvCurrSeqNo) | 0x80000000;lossdata[1] = CSeqNo::decseq(packet.m_iSeqNo);// Generate loss report immediately.sendCtrl(3, NULL, lossdata, (CSeqNo::incseq(m_iRcvCurrSeqNo) == CSeqNo::decseq(packet.m_iSeqNo)) ? 1 : 2);int loss = CSeqNo::seqlen(m_iRcvCurrSeqNo, packet.m_iSeqNo) - 2;m_iTraceRcvLoss += loss;m_iRcvLossTotal += loss;}// This is not a regular fixed size packet... //an irregular sized packet usually indicates the end of a message, so send an ACK immediately if (packet.getLength() != m_iPayloadSize)CTimer::rdtsc(m_ullNextACKTime); // Update the current largest sequence number that has been received.// Or it is a retransmitted packet, remove it from receiver loss list.if (CSeqNo::seqcmp(packet.m_iSeqNo, m_iRcvCurrSeqNo) > 0)m_iRcvCurrSeqNo = packet.m_iSeqNo;elsem_pRcvLossList->remove(packet.m_iSeqNo);return 0; }接下來看控制消息的處理。這部分的內容可以參考 UDT 最新協議分析.
- 如果是一個輕量級 ACK,更新 m_iFlowWindowSize 和 m_iSndLastAck, 終止處理。
- 否則:
- 使用相同的 ACK 序號返回一個 ACK2 作為確認的確認。更新 m_ullSndLastAck2Time, m_iFlowWindowSize, m_iSndLastDataAck 和 m_iSndLastAck。\
- 更新發(fā)送丟失鏈表,移除已經被確認的所有包序號。\
- 更新RTT與RTTVar。更新ACK和NAK周期為 4 * RTT + RTTVar + SYN。\
- 更新發(fā)送端緩沖,釋放已經被確認的緩沖。\
- 更新包到達速率為:A = (A * 7 + a) / 8,其中a為ACK中攜帶的相應值。更新鏈路容量估計值:B = (B * 7 + b) / 8,其中b為ACK中攜帶的相應值。
- 更新發(fā)包間隔 m_ullInterval。
- acknowledge 根據ACK2中的ACK序號,在ACK歷史窗口中找到關聯的ACK,根據ACK2到達時間和ACK離開時間,計算rtt。
- 計算新的 RTT = (RTT * 7 + rtt) / 8,更新RTTVar = (RTTVar * 3 + abs(RTT - rtt)) / 4,更新cc中 rtt。
- 更新被確認的最大ACK序號。
- 將 NAK 中攜帶的所有序號添加到發(fā)送丟失鏈表中。通過碼率控制更新 SND 周期。重置 EXP 時間變量。更新 m_pSndUList,等待重傳。
- 在接收緩沖中標記所有屬于同一個消息的包,使得不再可讀。 在接收丟失鏈表中移除所有對應的包。
總結
以上是生活随笔為你收集整理的UDT 最新源码分析(五) -- 网络数据收发的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: SocksCap64应用程序通过SOCK
- 下一篇: Java调用ffmepg+mencode