SequoiaDB 系列之六 :源码分析之coord节点
好久不見。
?
在上一篇SequoiaDB 系列之五?? :源碼分析之main函數(shù),有講述進(jìn)程開始運(yùn)行時(shí),會(huì)根據(jù)自身的角色,來初始化不同的CB(控制塊,control block)。
在之前的一篇SequoiaDB 系列之四?? :架構(gòu)簡(jiǎn)析中,我們簡(jiǎn)單過了一遍SequoiaDB的架構(gòu)和各個(gè)節(jié)點(diǎn)的角色。
今天來看看SequoiaDB的coord角色。
首先,需要有個(gè)大致的輪廓:
coord節(jié)點(diǎn)主要承擔(dān)代理的角色。作為SequoiaDB集群對(duì)外的接頭人,它轉(zhuǎn)發(fā)消息給其它節(jié)點(diǎn),組合(combine)不同節(jié)點(diǎn)返回的數(shù)據(jù),把結(jié)果返回給client。
catalog節(jié)點(diǎn)主要存儲(chǔ)meta數(shù)據(jù),比如集群中有哪些組,每個(gè)組的狀態(tài);每個(gè)組上有哪些節(jié)點(diǎn),有哪些集合(Collection),哪些集合是主子表等等。
data節(jié)點(diǎn)主要是管理存儲(chǔ)的數(shù)據(jù),它接受coord轉(zhuǎn)發(fā)過來的CRUD等操作,并記錄同步日志(最終一致性)。
?
在注冊(cè)CB的函數(shù)中:
void _pmdController::registerCB( SDB_ROLE dbrole ) {if ( SDB_ROLE_DATA == dbrole ){...}else if ( SDB_ROLE_COORD == dbrole ){PMD_REGISTER_CB( sdbGetTransCB() ) ; // TRANSPMD_REGISTER_CB( sdbGetCoordCB() ) ; // COORDPMD_REGISTER_CB( sdbGetFMPCB () ) ; // FMP}...// 每個(gè)節(jié)點(diǎn)都會(huì)注冊(cè)的控制塊PMD_REGISTER_CB( sdbGetDMSCB() ) ; // DMSPMD_REGISTER_CB( sdbGetRTNCB() ) ; // RTNPMD_REGISTER_CB( sdbGetSQLCB() ) ; // SQLPMD_REGISTER_CB( sdbGetAggrCB() ) ; // AGGRPMD_REGISTER_CB( sdbGetPMDController() ) ; // CONTROLLER }?coord注冊(cè)這幾個(gè)CB之后,就開始注冊(cè)和啟動(dòng)服務(wù):
具體函數(shù)在_KRCB::init()中,不再表述。_KRCB::init()會(huì)根據(jù)節(jié)點(diǎn)的角色,啟動(dòng)不同的服務(wù)。
客戶端連接到coord,coord便會(huì)啟動(dòng)一個(gè)線程,為該連接服務(wù)。
1 INT32 pmdTcpListenerEntryPoint ( pmdEDUCB *cb, void *pData ) 2 { 3 ... 4 5 while ( !cb->isDisconnected() && !pListerner->isClosed() ) 6 { 7 SOCKET s ; 8 rc = pListerner->accept ( &s, NULL, NULL ) ; 9 if ( SDB_TIMEOUT == rc || SDB_TOO_MANY_OPEN_FD == rc ) 10 { 11 rc = SDB_OK ; 12 continue ; 13 } 14 if ( rc && PMD_IS_DB_DOWN ) 15 { 16 rc = SDB_OK ; 17 goto done ; 18 } 19 else if ( rc ) 20 { 21 PD_LOG ( PDERROR, "Failed to accept socket in TcpListener(rc=%d)", 22 rc ) ; 23 if ( pListerner->isClosed() ) 24 { 25 break ; 26 } 27 else 28 { 29 continue ; 30 } 31 } 32 33 cb->incEventCount() ; 34 ++mondbcb->numConnects ; 35 void *pData = NULL ; 36 *((SOCKET *) &pData) = s ; 37 if ( !krcb->isActive() ) 38 { 39 ossSocket newsock ( &s ) ; 40 newsock.close () ; 41 continue ; 42 } 43 44 rc = eduMgr->startEDU ( EDU_TYPE_AGENT, pData, &agentEDU ) ; 45 if ( rc ) 46 { 47 PD_LOG( ( rc == SDB_QUIESCED ? PDWARNING : PDERROR ), 48 "Failed to start edu, rc: %d", rc ) ; 49 ossSocket newsock ( &s ) ; 50 newsock.close () ; 51 continue ; 52 } 53 } //while ( ! cb->isDisconnected() ) 54 55 ... 56 }服務(wù)線程監(jiān)聽到client的連接,啟動(dòng)一個(gè)EDU_TYPE_AGENT類型的線程,單獨(dú)為client服務(wù)。
?
下面講述coord節(jié)點(diǎn)的最主要的功能——消息轉(zhuǎn)發(fā)
coord的啟動(dòng)初,會(huì)初始化一些必要的全局變量。在SequoiaDB中,會(huì)初始化很多command,拿創(chuàng)建集合空間來說,在文件SequoiaDB/engine/rtn/rtnCoord.cpp 中
1 RTN_COORD_CMD_BEGIN 2 ... 3 4 RTN_COORD_CMD_ADD( COORD_CMD_LISTCOLLECTIONSPACES, rtnCoordCMDListCollectionSpace ) 5 6 ... 7 RTN_COORD_OP_END?嗯,上面的代碼有點(diǎn)MFC中消息映射的感覺。
來看看 RTN_COORD_CMD_ADD 宏:
1 #define RTN_COORD_CMD_ADD( cmdName, cmdClass ) {\ 2 rtnCoordCommand *pObj = SDB_OSS_NEW cmdClass();\ 3 _cmdMap.insert ( COORD_CMD_MAP::value_type (cmdName, pObj ));}宏主要是new一個(gè)對(duì)象,再把對(duì)象插入到_cmdMap中,這樣在程序初始化時(shí)候,便會(huì)有一系列的command對(duì)象存儲(chǔ)在_cmdMap中。
另外,對(duì)SequoiaDB而言,所有的command操作,都是在查詢操作的基礎(chǔ)上做的,服務(wù)端用一些方法區(qū)別開是真正的查詢,還是command。SequoiaDB的命令,是以$開頭的字符串。
前提簡(jiǎn)述完畢,現(xiàn)在假設(shè)client連接上了coord,coord也創(chuàng)建了一個(gè)線程,為這個(gè)client服務(wù)。
1 INT32 _pmdLocalSession::run() 2 { 3 INT32 rc = SDB_OK ; 4 UINT32 msgSize = 0 ; 5 CHAR *pBuff = NULL ; 6 INT32 buffSize = 0 ; 7 pmdEDUMgr *pmdEDUMgr = NULL ; 8 9 if ( !_pEDUCB ) 10 { 11 rc = SDB_SYS ; 12 goto error ; 13 } 14 15 pmdEDUMgr = _pEDUCB->getEDUMgr() ; 16 17 while ( !_pEDUCB->isDisconnected() && !_socket.isClosed() ) 18 { 19 _pEDUCB->resetInterrupt() ; 20 _pEDUCB->resetInfo( EDU_INFO_ERROR ) ; 21 _pEDUCB->resetLsn() ; 22 23 rc = recvData( (CHAR*)&msgSize, sizeof(UINT32) ) ; // 收取數(shù)據(jù)包的前四個(gè)字節(jié),代表該數(shù)據(jù)包有多大 24 if ( rc ) 25 { 26 if ( SDB_APP_FORCED != rc ) 27 { 28 PD_LOG( PDERROR, "Session[%s] failed to recv msg size, " 29 "rc: %d", sessionName(), rc ) ; 30 } 31 break ; 32 } 33 34 if ( msgSize == (UINT32)MSG_SYSTEM_INFO_LEN ) // 如果包長(zhǎng)度是 MSG_SYSTEM_INFO_LEN (-1),則這是一個(gè)請(qǐng)求系統(tǒng)信息包,coord會(huì)返回本機(jī)的字節(jié)序列給client 35 { // 每個(gè)連接的第一個(gè)包,一定是長(zhǎng)度標(biāo)記為 MSG_SYSTEM_INFO_LEN 的包,否則字節(jié)序不正確,所有的數(shù)據(jù)都不能保證能正確解析(萬一數(shù)據(jù)庫(kù)運(yùn)行在大端機(jī)上呢) 36 rc = _recvSysInfoMsg( msgSize, &pBuff, buffSize ) ; 37 if ( rc ) 38 { 39 break ; 40 } 41 rc = _processSysInfoRequest( pBuff ) ; 42 if ( rc ) 43 { 44 break ; 45 } 46 47 _setHandshakeReceived() ; 48 } 49 else if ( msgSize < sizeof(MsgHeader) || msgSize > SDB_MAX_MSG_LENGTH ) // 對(duì)包的大小做出了限制,包長(zhǎng)超過某值或者小于某值的包,都會(huì)導(dǎo)致連接中斷 50 { 51 PD_LOG( PDERROR, "Session[%s] recv msg size[%d] is less than " 52 "MsgHeader size[%d] or more than max msg size[%d]", 53 sessionName(), msgSize, sizeof(MsgHeader), 54 SDB_MAX_MSG_LENGTH ) ; 55 rc = SDB_INVALIDARG ; 56 break ; 57 } 58 else 59 { 60 pBuff = getBuff( msgSize + 1 ) ; 61 if ( !pBuff ) 62 { 63 rc = SDB_OOM ; 64 break ; 65 } 66 buffSize = getBuffLen() ; 67 *(UINT32*)pBuff = msgSize ; 68 rc = recvData( pBuff + sizeof(UINT32), 69 msgSize - sizeof(UINT32), 70 PMD_RECV_DATA_AFTER_LENGTH_TIMEOUT ) ; // 到此處,說明程序可以愉快的接受client的發(fā)送的數(shù)據(jù)包了 71 if ( rc ) 72 { 73 if ( SDB_APP_FORCED != rc ) 74 { 75 PD_LOG( PDERROR, "Session[%s] failed to recv msg[len: %u], " 76 "rc: %d", sessionName(), msgSize - sizeof(UINT32), 77 rc ) ; 78 } 79 break ; 80 } 81 82 _pEDUCB->incEventCount() ; 83 pBuff[ msgSize ] = 0 ; 84 if ( SDB_OK != ( rc = pmdEDUMgr->activateEDU( _pEDUCB ) ) ) 85 { 86 PD_LOG( PDERROR, "Session[%s] activate edu failed, rc: %d", 87 sessionName(), rc ) ; 88 break ; 89 } 90 rc = _processMsg( (MsgHeader*)pBuff ) ; // 收到數(shù)據(jù)包,開始處理,該函數(shù)在結(jié)合代碼講解 91 if ( rc ) 92 { 93 break ; 94 } 95 if ( SDB_OK != ( rc = pmdEDUMgr->waitEDU( _pEDUCB ) ) ) 96 { 97 PD_LOG( PDERROR, "Session[%s] wait edu failed, rc: %d", 98 sessionName(), rc ) ; 99 break ; 100 } 101 } 102 } // end while 103 104 done: 105 disconnect() ; 106 return rc ; 107 error: 108 goto done ; 109 }?_processMsg方法:
1 INT32 _pmdLocalSession::_processMsg( MsgHeader * msg ) 2 { 3 INT32 rc = SDB_OK ; 4 const CHAR *pBody = NULL ; 5 INT32 bodyLen = 0 ; 6 rtnContextBuf contextBuff ; 7 INT32 opCode = msg->opCode ; 8 9 rc = _onMsgBegin( msg ) ; // 對(duì)數(shù)據(jù)包做前期處理,例如改數(shù)據(jù)包是不是需要返回,(若出錯(cuò))需不需要回滾,并初始化好回復(fù)的數(shù)據(jù)包頭部 10 if ( SDB_OK == rc ) 11 { 12 rc = _processor->processMsg( msg, contextBuff, // 我是項(xiàng)目經(jīng)理,這個(gè)包就交給processor處理去吧,我要的是結(jié)果。 13 _replyHeader.contextID, // processor在不同的節(jié)點(diǎn)中,指向不同的對(duì)象(咦,這不是多態(tài)么?),因此也有不同的處理方式 14 _needReply ) ; 15 pBody = contextBuff.data() ; // pBody指向要返回的數(shù)據(jù),避免拷貝(提高執(zhí)行效率) 16 bodyLen = contextBuff.size() ; // 數(shù)據(jù)長(zhǎng)度,不表 17 _replyHeader.numReturned = contextBuff.recordNum() ; // 返回的數(shù)據(jù)共有多少條記錄 18 _replyHeader.startFrom = (INT32)contextBuff.getStartFrom() ; // 應(yīng)該從哪一條開始讀 19 if ( SDB_OK != rc ) 20 { 21 if ( _needRollback ) // 當(dāng)執(zhí)行過程中例如(insert, delete等),出錯(cuò)了,需要把數(shù)據(jù)復(fù)原 22 { 23 INT32 rcTmp = rtnTransRollback( eduCB(), getDPSCB() ) ; 24 if ( rcTmp ) 25 { 26 PD_LOG( PDERROR, "Session[%s] failed to rollback trans " 27 "info, rc: %d", sessionName(), rcTmp ) ; 28 } 29 _needRollback = FALSE ; 30 } 31 } 32 } 33 34 if ( _needReply ) // 需要回復(fù),那就再處理一下把 35 { 36 if ( rc && bodyLen == 0 ) // 執(zhí)行過程出錯(cuò),那就返回出錯(cuò)信息 37 { 38 _errorInfo = utilGetErrorBson( rc, _pEDUCB->getInfo( 39 EDU_INFO_ERROR ) ) ; 40 pBody = _errorInfo.objdata() ; 41 bodyLen = (INT32)_errorInfo.objsize() ; 42 _replyHeader.numReturned = 1 ; 43 } 44 _replyHeader.header.opCode = MAKE_REPLY_TYPE(opCode) ; // 填充回復(fù)數(shù)據(jù)包中的字段 45 _replyHeader.flags = rc ; 46 _replyHeader.header.messageLength = sizeof( _replyHeader ) + 47 bodyLen ; 48 49 INT32 rcTmp = _reply( &_replyHeader, pBody, bodyLen ) ; // 把包發(fā)送給client 50 if ( rcTmp ) 51 { 52 PD_LOG( PDERROR, "Session[%s] failed to send response, rc: %d", 53 sessionName(), rcTmp ) ; 54 disconnect() ; 55 } 56 } 57 58 _onMsgEnd( rc, msg ) ; 59 rc = SDB_OK ; 60 61 return rc ; 62 }?coord節(jié)點(diǎn)上的processor,是pmdCoordProcessor的一個(gè)實(shí)例,是用來做數(shù)據(jù)轉(zhuǎn)發(fā)的,不同于真正做數(shù)據(jù)處理的pmdDataProcessor。
1 INT32 _pmdCoordProcessor::processMsg( MsgHeader *msg, 2 rtnContextBuf &contextBuff, 3 INT64 &contextID, 4 BOOLEAN &needReply ) 5 { 6 ... 7 8 rc = _processCoordMsg( msg, _replyHeader, contextBuff ) ; // 轉(zhuǎn)給另一個(gè)函數(shù)(_processCoordMsg)處理,下面講述 9 if ( SDB_COORD_UNKNOWN_OP_REQ == rc ) 10 { 11 contextBuff.release() ; 12 rc = _pmdDataProcessor::processMsg( msg, contextBuff, // 如果上一個(gè)函數(shù)處理后,返回的錯(cuò)誤是一個(gè) SDB_COORD_UNKNOWN_OP_REQ類型,則交給pmdDataProcessor處理 13 contextID, needReply ) ; 14 } 15 ... 16 }?pmdCoordProcessor的處理過程
1 INT32 _pmdCoordProcessor::_processCoordMsg( MsgHeader *msg, 2 MsgOpReply &replyHeader, 3 rtnContextBuf &contextBuff ) 4 { 5 INT32 rc = SDB_OK ; 6 if ( NULL != _pErrorObj ) 7 { 8 SDB_OSS_DEL _pErrorObj ; 9 _pErrorObj = NULL ; 10 } 11 if ( NULL != _pResultBuff ) 12 { 13 _pResultBuff = NULL ; 14 } 15 CoordCB *pCoordcb = _pKrcb->getCoordCB(); 16 rtnCoordProcesserFactory *pProcesserFactory 17 = pCoordcb->getProcesserFactory(); 18 19 if ( MSG_AUTH_VERIFY_REQ == msg->opCode ) 20 { 21 rc = SDB_COORD_UNKNOWN_OP_REQ ; 22 goto done ; 23 } 24 else if ( MSG_BS_INTERRUPTE == msg->opCode || 25 MSG_BS_INTERRUPTE_SELF == msg->opCode || 26 MSG_BS_DISCONNECT == msg->opCode ) 27 { 28 } 29 else if ( !getClient()->isAuthed() ) // 沒有用用戶和密碼登錄,就收到了數(shù)據(jù)包的,就先嘗試用默認(rèn)的用戶名和密碼,先取得數(shù)據(jù)庫(kù)的授權(quán),否則無法做操作 30 { 31 rc = getClient()->authenticate( "", "" ) ; 32 if ( rc ) 33 { 34 goto done ; 35 } 36 } 37 38 switch ( msg->opCode ) // 開始檢查client要做什么樣的操作了 39 { 40 case MSG_BS_GETMORE_REQ : // get more操作,coord不做處理,先標(biāo)記成 SDB_COORD_UNKNOWN_OP_REQ,交給其它地方處理 41 rc = SDB_COORD_UNKNOWN_OP_REQ ; 42 break ; 43 case MSG_BS_QUERY_REQ: // 查詢操作,這個(gè)是重點(diǎn)。所有的command 44 { 45 MsgOpQuery *pQueryMsg = ( MsgOpQuery * )msg ; 46 CHAR *pQueryName = pQueryMsg->name ; 47 SINT32 queryNameLen = pQueryMsg->nameLength ; 48 if ( queryNameLen > 0 && '$' == pQueryName[0] ) // 如果查詢的name字段,是用$開頭的字符串,則認(rèn)為這個(gè)是command,要走command處理 49 { 50 rtnCoordCommand *pCmdProcesser = 51 pProcesserFactory->getCommandProcesser( pQueryMsg ) ; // 找到command的對(duì)象,上文中有描述所有的command都在初始化的時(shí)候,存入_cmdMap中 52 if ( NULL != pCmdProcesser ) 53 { 54 rc = pCmdProcesser->execute( ( CHAR *)msg, // 找到了,就開始command處理了 55 msg->messageLength, 56 eduCB(), 57 replyHeader, 58 &contextBuff ) ; 59 break ; 60 } 61 } 62 // 如果沒有找到,則走入 default代碼塊 63 } 64 default: 65 { 66 rtnContextBase *pContext = NULL ; 67 rtnCoordOperator *pOperator = 68 pProcesserFactory->getOperator( msg->opCode ) ; // 交給operator處理,operator是類似于command的幾個(gè)特殊的處理對(duì)象,數(shù)量比較少,此處不表 69 rc = pOperator->execute( ( CHAR* )msg, // 轉(zhuǎn)發(fā)給對(duì)應(yīng)的operator類實(shí)例 70 msg->messageLength, 71 eduCB(), 72 replyHeader, 73 &contextBuff ) ; 74 ... 75 } 76 }?以創(chuàng)建集合空間的command為例,看看 rtnCoordCMDListCollectionSpace 的 execute做了什么:
INT32 rtnCoordCMDCreateCollectionSpace::execute( CHAR *pReceiveBuffer,SINT32 packSize,pmdEDUCB *cb,MsgOpReply &replyHeader,rtnContextBuf *buf ){...MsgOpQuery *pCreateReq = (MsgOpQuery *)pReceiveBuffer; // 構(gòu)造一個(gè) MSG_CAT_CREATE_COLLECTION_SPACE_REQ 的數(shù)據(jù)包pCreateReq->header.routeID.value = 0;pCreateReq->header.TID = cb->getTID();pCreateReq->header.opCode = MSG_CAT_CREATE_COLLECTION_SPACE_REQ; // 數(shù)據(jù)包的類型 rc = executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent,cb, NULL, NULL ) ;if ( rc ){PD_LOG ( PDERROR, "create collectionspace failed, rc = %d", rc ) ;goto error ;}done :replyHeader.flags = rc ;PD_TRACE_EXITRC ( SDB_RTNCOCMDCRCS_EXE, rc ) ;return rc;error :goto done ;}?該函數(shù)的主體,構(gòu)造了另外一個(gè)數(shù)據(jù)包,然后執(zhí)行 executeOnCataGroup ( (CHAR*)pCreateReq, pRouteAgent, cb, NULL, NULL ) ;這一句上。跟進(jìn)這一函數(shù):
1 INT32 rtnCoordCommand::executeOnCataGroup ( CHAR *pBuffer, 2 netMultiRouteAgent *pRouteAgent, 3 pmdEDUCB *cb, 4 rtnContextCoord *pContext, 5 CoordGroupList *pGroupList, 6 std::vector<BSONObj> *pReplyObjs ) 7 { 8 INT32 rc = SDB_OK; 9 ... 10 retry : 11 rc = rtnCoordGetCatGroupInfo( cb, isNeedRefresh, catGroupInfo ); // 查詢catalog的信息,主要是獲取到catalog組的主節(jié)點(diǎn)的服務(wù)地址 12 if ( rc ) 13 { 14 probe = 100 ; 15 goto error ; 16 PD_LOG ( PDERROR, "Execute on catalogue node failed, failed to get " 17 "catalogue group info(rc=%d)", rc ); 18 } 19 rc = rtnCoordSendRequestToPrimary( pBuffer, catGroupInfo, sendNodes, // 跟了這么久,做了那么多的準(zhǔn)備,這一句才是真開始了,有興趣可以自己看一下 :) 20 pRouteAgent, MSG_ROUTE_CAT_SERVICE, 21 cb ); 22 if ( rc ) 23 { 24 probe = 200 ; 25 goto error ; 26 } 27 rc = rtnCoordGetReply( cb, sendNodes, replyQue, // 等待并收取遠(yuǎn)程節(jié)點(diǎn)處理的返回信息 28 MAKE_REPLY_TYPE(((MsgHeader*)pBuffer)->opCode) ) ; 29 ... 30 }?
?rtnCoordSendRequestToPrimary就不再詳細(xì)跟進(jìn)描述了,根據(jù)函數(shù)名,大致就可以了解一個(gè)大概,是把數(shù)據(jù)發(fā)送到指定組(此處是catalog組)的主節(jié)點(diǎn)。
coord上的其它c(diǎn)ommand或者operator也是采用類似的方法來轉(zhuǎn)發(fā)消息給其它節(jié)點(diǎn),就不再一一贅述了。
?
綜合全文的講述,coord處理client請(qǐng)求的流程
發(fā)送請(qǐng)求給coord節(jié)點(diǎn)
?? coord先揪出這個(gè)請(qǐng)求是做什么
????? 交給對(duì)應(yīng)的command處理
??????? ?查詢(本地緩存或者遠(yuǎn)程獲取的)catalog信息
???????? 把消息轉(zhuǎn)成節(jié)點(diǎn)間的內(nèi)部消息
???????? 轉(zhuǎn)發(fā)給目標(biāo)節(jié)點(diǎn)
???????? 然后等待返回?cái)?shù)據(jù)
???? 再把返回?cái)?shù)據(jù)交給處理線程
線程把返回結(jié)果發(fā)送給client
?
=====>THE END<=====?
?
轉(zhuǎn)載于:https://www.cnblogs.com/tynia/p/coord.html
總結(jié)
以上是生活随笔為你收集整理的SequoiaDB 系列之六 :源码分析之coord节点的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: linux模拟tcp测试工具,TCP测试
- 下一篇: TCP/UDP测试工具大全