3 v4 中心节点固定_死磕以太坊源码分析之p2p节点发现
死磕以太坊源碼分析之p2p節(jié)點(diǎn)發(fā)現(xiàn)
在閱讀節(jié)點(diǎn)發(fā)現(xiàn)源碼之前必須要理解kadmilia算法,可以參考:KAD算法詳解。
節(jié)點(diǎn)發(fā)現(xiàn)概述
節(jié)點(diǎn)發(fā)現(xiàn),使本地節(jié)點(diǎn)得知其他節(jié)點(diǎn)的信息,進(jìn)而加入到p2p網(wǎng)絡(luò)中。
以太坊的節(jié)點(diǎn)發(fā)現(xiàn)基于類似的kademlia算法,源碼中有兩個(gè)版本,v4和v5。v4適用于全節(jié)點(diǎn),通過(guò)discover.ListenUDP使用,v5適用于輕節(jié)點(diǎn)通過(guò)discv5.ListenUDP使用,本文介紹的是v4版本。
節(jié)點(diǎn)發(fā)現(xiàn)功能主要涉及 Server Table udp 這幾個(gè)數(shù)據(jù)結(jié)構(gòu),它們有獨(dú)自的事件響應(yīng)循環(huán),節(jié)點(diǎn)發(fā)現(xiàn)功能便是它們互相協(xié)作完成的。其中,每個(gè)以太坊客戶端啟動(dòng)后都會(huì)在本地運(yùn)行一個(gè)Server,并將網(wǎng)絡(luò)拓?fù)渲邢噜彽墓?jié)點(diǎn)視為Node,而Table是Node的容器,udp則是負(fù)責(zé)維持底層的連接。這些結(jié)構(gòu)的關(guān)系如下圖:
image-20201123210628944p2p服務(wù)開(kāi)啟節(jié)點(diǎn)發(fā)現(xiàn)
在P2p的server.go 的start方法中:
if?err?:=?srv.setupDiscovery();?err?!=?nil?{??return?err
?}
進(jìn)入到setupDiscovery中:
//?Discovery?V4?var?unhandled?chan?discover.ReadPacket
?var?sconn?*sharedUDPConn
?if?!srv.NoDiscovery?{
??...
??ntab,?err?:=?discover.ListenUDP(conn,?srv.localnode,?cfg)
??....
?}
discover.ListenUDP方法即開(kāi)啟了節(jié)點(diǎn)發(fā)現(xiàn)的功能.
首先解析出監(jiān)聽(tīng)地址的UDP端口,根據(jù)端口返回與之相連的UDP連接,之后返回連接的本地網(wǎng)絡(luò)地址,接著設(shè)置最后一個(gè)UDP-on-IPv4端口。到此為止節(jié)點(diǎn)發(fā)現(xiàn)的一些準(zhǔn)備工作做好,接下下來(lái)開(kāi)始UDP的監(jiān)聽(tīng):
ntab,?err?:=?discover.ListenUDP(conn,?srv.localnode,?cfg)然后進(jìn)行UDP 的監(jiān)聽(tīng),下面是監(jiān)聽(tīng)的過(guò)程:
監(jiān)聽(tīng)UDP
//?監(jiān)聽(tīng)給定的socket?上的發(fā)現(xiàn)的包func?ListenUDP(c?UDPConn,?ln?*enode.LocalNode,?cfg?Config)?(*UDPv4,?error)?{
?return?ListenV4(c,?ln,?cfg)
}
func?ListenV4(c?UDPConn,?ln?*enode.LocalNode,?cfg?Config)?(*UDPv4,?error)?{
?closeCtx,?cancel?:=?context.WithCancel(context.Background())
?t?:=?&UDPv4{
??conn:????????????c,
??priv:????????????cfg.PrivateKey,
??netrestrict:?????cfg.NetRestrict,
??localNode:???????ln,
??db:??????????????ln.Database(),
??gotreply:????????make(chan?reply),
??addReplyMatcher:?make(chan?*replyMatcher),
??closeCtx:????????closeCtx,
??cancelCloseCtx:??cancel,
??log:?????????????cfg.Log,
?}
?if?t.log?==?nil?{
??t.log?=?log.Root()
?}
?tab,?err?:=?newTable(t,?ln.Database(),?cfg.Bootnodes,?t.log)?//?
?if?err?!=?nil?{
??return?nil,?err
?}
?t.tab?=?tab
?go?tab.loop()?//
?t.wg.Add(2)
?go?t.loop()?//
?go?t.readLoop(cfg.Unhandled)?//
?return?t,?nil
}
主要做了以下幾件事:
1.新建路由表
tab,?err?:=?newTable(t,?ln.Database(),?cfg.Bootnodes,?t.log)?新建路由表做了以下幾件事:
- 初始化table對(duì)象
- 設(shè)置bootnode(setFallbackNodes)
- 節(jié)點(diǎn)第一次啟動(dòng)的時(shí)候,節(jié)點(diǎn)會(huì)與硬編碼在以太坊源碼中的bootnode進(jìn)行連接,所有的節(jié)點(diǎn)加入幾乎都先連接了它。連接上bootnode后,獲取bootnode部分的鄰居節(jié)點(diǎn),然后進(jìn)行節(jié)點(diǎn)發(fā)現(xiàn),獲取更多的活躍的鄰居節(jié)點(diǎn)
- nursery 是在 Table 為空并且數(shù)據(jù)庫(kù)中沒(méi)有存儲(chǔ)節(jié)點(diǎn)時(shí)的初始連接節(jié)點(diǎn)(上文中的 6 個(gè)節(jié)點(diǎn)),通過(guò) bootnode 可以發(fā)現(xiàn)新的鄰居
- tab.seedRand:使用提供的種子值將生成器初始化為確定性狀態(tài)
- loadSeedNodes:加載種子節(jié)點(diǎn);從保留已知節(jié)點(diǎn)的數(shù)據(jù)庫(kù)中隨機(jī)的抽取30個(gè)節(jié)點(diǎn),再加上引導(dǎo)節(jié)點(diǎn)列表中的節(jié)點(diǎn),放置入k桶中,如果K桶沒(méi)有空間,則假如到替換列表中。
2.測(cè)試鄰居節(jié)點(diǎn)連通性
首先知道UDP協(xié)議是沒(méi)有連接的概念的,所以需要不斷的ping 來(lái)測(cè)試對(duì)端節(jié)點(diǎn)是否正常,在新建路由表之后,就來(lái)到下面的循環(huán),不斷的去做上面的事。
go?tab.loop()定時(shí)運(yùn)行doRefresh、doRevalidate、copyLiveNodes進(jìn)行刷新K桶。
以太坊的k桶設(shè)置:
const?(?alpha???????????=?3??//?Kademlia并發(fā)參數(shù),?是系統(tǒng)內(nèi)一個(gè)優(yōu)化參數(shù),控制每次從K桶最多取出節(jié)點(diǎn)個(gè)數(shù),ethereum取值3
??
?bucketSize??????=?16?//?K桶大小(可容納節(jié)點(diǎn)數(shù))
??
?maxReplacements?=?10?//?每桶更換列表的大小
?hashBits??????????=?len(common.Hash{})?*?8?//每個(gè)節(jié)點(diǎn)ID長(zhǎng)度,32*8=256,?32位16進(jìn)制
?nBuckets??????????=?hashBits?/?15???????//??K桶個(gè)數(shù)
??)
首先搞清楚這三個(gè)定時(shí)器運(yùn)行的時(shí)間:
refreshInterval????=?30?*?time.MinuterevalidateInterval?=?10?*?time.Second
copyNodesInterval??=?30?*?time.Second
doRefresh
doRefresh對(duì)隨機(jī)目標(biāo)執(zhí)行查找以保持K桶已滿。如果表為空(初始引導(dǎo)程序或丟棄的有故障),則插入種子節(jié)點(diǎn)。
主要以下幾步:
從數(shù)據(jù)庫(kù)加載隨機(jī)節(jié)點(diǎn)和引導(dǎo)節(jié)點(diǎn)。這應(yīng)該會(huì)產(chǎn)生一些以前見(jiàn)過(guò)的節(jié)點(diǎn)
tab.loadSeedNodes()將本地節(jié)點(diǎn)ID作為目標(biāo)節(jié)點(diǎn)進(jìn)行查找最近的鄰居節(jié)點(diǎn)
tab.net.lookupSelf()func?(t?*UDPv4)?lookupSelf()?[]*enode.Node?{
?return?t.newLookup(t.closeCtx,?encodePubkey(&t.priv.PublicKey)).run()
}
func?(t?*UDPv4)?newLookup(ctx?context.Context,?targetKey?encPubkey)?*lookup?{
?...
??return?t.findnode(n.ID(),?n.addr(),?targetKey)
?})
?return?it
}
向這些節(jié)點(diǎn)發(fā)起findnode操作查詢離target節(jié)點(diǎn)最近的節(jié)點(diǎn)列表,將查詢得到的節(jié)點(diǎn)進(jìn)行ping-pong測(cè)試,將測(cè)試通過(guò)的節(jié)點(diǎn)落庫(kù)保存
經(jīng)過(guò)這個(gè)流程后,節(jié)點(diǎn)的K桶就能夠比較均勻地將不同網(wǎng)絡(luò)節(jié)點(diǎn)更新到本地K桶中。
unc?(t?*UDPv4)?findnode(toid?enode.ID,?toaddr?*net.UDPAddr,?target?encPubkey)?([]*node,?error)?{?t.ensureBond(toid,?toaddr)
?nodes?:=?make([]*node,?0,?bucketSize)
?nreceived?:=?0
??//?設(shè)置回應(yīng)回調(diào)函數(shù),等待類型為neighborsPacket的鄰近節(jié)點(diǎn)包,如果類型對(duì),就執(zhí)行回調(diào)請(qǐng)求
?rm?:=?t.pending(toid,?toaddr.IP,?p_neighborsV4,?func(r?interface{})?(matched?bool,?requestDone?bool)?{
??reply?:=?r.(*neighborsV4)
??for?_,?rn?:=?range?reply.Nodes?{
???nreceived++
??????//?得到一個(gè)簡(jiǎn)單的node結(jié)構(gòu)
???n,?err?:=?t.nodeFromRPC(toaddr,?rn)
???if?err?!=?nil?{
????t.log.Trace("Invalid?neighbor?node?received",?"ip",?rn.IP,?"addr",?toaddr,?"err",?err)
????continue
???}
???nodes?=?append(nodes,?n)
??}
??return?true,?nreceived?>=?bucketSize
?})
??//上面了一個(gè)管道事件,下面開(kāi)始發(fā)送真正的findnode報(bào)文,然后進(jìn)行等待了
?t.send(toaddr,?toid,?&findnodeV4{
??Target:?????target,
??Expiration:?uint64(time.Now().Add(expiration).Unix()),
?})
?return?nodes,?}
查找3個(gè)隨機(jī)的目標(biāo)節(jié)點(diǎn)
for?i?:=?0;?i?3;?i++?{??tab.net.lookupRandom()
?}
doRevalidate
doRevalidate檢查隨機(jī)存儲(chǔ)桶中的最后一個(gè)節(jié)點(diǎn)是否仍然存在,如果不是,則替換或刪除該節(jié)點(diǎn)。
主要以下幾步:
返回隨機(jī)的非空K桶中的最后一個(gè)節(jié)點(diǎn)
last,?bi?:=?tab.nodeToRevalidate()對(duì)最后的節(jié)點(diǎn)執(zhí)行Ping操作,然后等待Pong
remoteSeq,?err?:=?tab.net.ping(unwrapNode(last))如果節(jié)點(diǎn)ping通了的話,將節(jié)點(diǎn)移動(dòng)到最前面
tab.bumpInBucket(b,?last)沒(méi)有收到回復(fù),選擇一個(gè)替換節(jié)點(diǎn),或者如果沒(méi)有任何替換節(jié)點(diǎn),則刪除該節(jié)點(diǎn)
tab.replace(b,?last)copyLiveNodes
copyLiveNodes將表中的節(jié)點(diǎn)添加到數(shù)據(jù)庫(kù),如果節(jié)點(diǎn)在表中的時(shí)間超過(guò)了5分鐘。
這部分代碼比較簡(jiǎn)單,就伸展闡述。
if?n.livenessChecks?>?0?&&?now.Sub(n.addedAt)?>=?seedMinTableTime?{????tab.db.UpdateNode(unwrapNode(n))
???}
3.檢測(cè)各類信息
go?t.loop()loop循環(huán)主要監(jiān)聽(tīng)以下幾類消息:
- case
- p :=
- r :=
4. 處理UDP數(shù)據(jù)包
go?t.readLoop(cfg.Unhandled)主要有以下兩件事:
循環(huán)接收其他節(jié)點(diǎn)發(fā)來(lái)的udp消息
nbytes,?from,?err?:=?t.conn.ReadFromUDP(buf)處理接收到的UDP消息
t.handlePacket(from,?buf[:nbytes])接下來(lái)對(duì)這兩個(gè)函數(shù)進(jìn)行進(jìn)一步的解析。
接收UDP消息
接收UDP消息比較的簡(jiǎn)單,就是不斷的從連接中讀取Packet數(shù)據(jù),它有以下幾種消息:
ping:用于判斷遠(yuǎn)程節(jié)點(diǎn)是否在線。
pong:用于回復(fù)ping消息的響應(yīng)。
findnode:查找與給定的目標(biāo)節(jié)點(diǎn)相近的節(jié)點(diǎn)。
neighbors:用于回復(fù)findnode的響應(yīng),與給定的目標(biāo)節(jié)點(diǎn)相近的節(jié)點(diǎn)列表
處理UDP消息
主要做了以下幾件事:
數(shù)據(jù)包解碼
packet,?fromKey,?hash,?err?:=?decodeV4(buf)檢查數(shù)據(jù)包是否有效,是否可以處理
?packet.preverify(t,?from,?fromID,?fromKey)在校驗(yàn)這一塊,涉及不同的消息類型不同的校驗(yàn),我們來(lái)分別對(duì)各種消息進(jìn)行分析。
①:ping
②:pong
③:findNodes
④:neighbors
- 校驗(yàn)消息是否過(guò)期
- 用于回復(fù)findnode的響應(yīng),校驗(yàn)回復(fù)是否正確
- 校驗(yàn)消息是否過(guò)期
- 校驗(yàn)節(jié)點(diǎn)是否是最近的節(jié)點(diǎn)
- 校驗(yàn)消息是否過(guò)期
- 校驗(yàn)回復(fù)是否正確
- 校驗(yàn)消息是否過(guò)期
- 校驗(yàn)公鑰是否有效
處理packet數(shù)據(jù)
packet.handle(t,?from,?fromID,?hash)相同的,也會(huì)有4種消息,但是我們這邊重點(diǎn)講處理findNodes的消息:
func?(req?*findnodeV4)?handle(t?*UDPv4,?from?*net.UDPAddr,?fromID?enode.ID,?mac?[]byte)?{...
}
我們這里就稍微介紹下如何處理findnode的消息:
func?(req?*findnodeV4)?handle(t?*UDPv4,?from?*net.UDPAddr,?fromID?enode.ID,?mac?[]byte)?{?//?確定最近的節(jié)點(diǎn)
?target?:=?enode.ID(crypto.Keccak256Hash(req.Target[:]))
?t.tab.mutex.Lock()
?//最接近的返回表中最接近給定id的n個(gè)節(jié)點(diǎn)
?closest?:=?t.tab.closest(target,?bucketSize,?true).entries
?t.tab.mutex.Unlock()
?//?以每個(gè)數(shù)據(jù)包最多maxNeighbors的塊的形式發(fā)送鄰居,以保持在數(shù)據(jù)包大小限制以下。
?p?:=?neighborsV4{Expiration:?uint64(time.Now().Add(expiration).Unix())}
?var?sent?bool
?for?_,?n?:=?range?closest?{?//掃描這些最近的節(jié)點(diǎn)列表,然后一個(gè)包一個(gè)包的發(fā)送給對(duì)方
??if?netutil.CheckRelayIP(from.IP,?n.IP())?==?nil?{
???p.Nodes?=?append(p.Nodes,?nodeToRPC(n))
??}
??if?len(p.Nodes)?==?maxNeighbors?{
???t.send(from,?fromID,?&p)//給對(duì)方發(fā)送?neighborsPacket?包,里面包含節(jié)點(diǎn)列表
???p.Nodes?=?p.Nodes[:0]
???sent?=?true
??}
?}
?if?len(p.Nodes)?>?0?||?!sent?{
??t.send(from,?fromID,?&p)
?}
}
首先先確定最近的節(jié)點(diǎn),再一個(gè)包一個(gè)包的發(fā)給對(duì)方,并校驗(yàn)節(jié)點(diǎn)的IP,最后把有效的節(jié)點(diǎn)發(fā)送給請(qǐng)求方。
涉及的結(jié)構(gòu)體:
UDP
- conn :接口,包括了從UDP中讀取和寫(xiě)入,關(guān)閉UDP連接以及獲取本地地址。
- netrestrict:IP網(wǎng)絡(luò)列表
- localNode:本地節(jié)點(diǎn)
- tab:路由表
Table
buckets:所有節(jié)點(diǎn)都加到這個(gè)里面,按照距離
nursery:啟動(dòng)節(jié)點(diǎn)
rand:隨機(jī)來(lái)源
ips:跟蹤IP,確保IP中最多N個(gè)屬于同一網(wǎng)絡(luò)范圍
net: UDP 傳輸?shù)慕涌?/p>
- 返回本地節(jié)點(diǎn)
- 將enrRequest發(fā)送到給定的節(jié)點(diǎn)并等待響應(yīng)
- findnode向給定節(jié)點(diǎn)發(fā)送一個(gè)findnode請(qǐng)求,并等待該節(jié)點(diǎn)最多發(fā)送了k個(gè)鄰居
- 返回查找最近的節(jié)點(diǎn)
- 將ping消息發(fā)送到給定的節(jié)點(diǎn),然后等待答復(fù)
以下是table的結(jié)構(gòu)圖:
image-20201112104254003思維導(dǎo)圖
思維導(dǎo)圖獲取地址
image-20201123211034861關(guān)于我
微信號(hào):mindcarver ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? 加我探討問(wèn)題或商務(wù)合作
個(gè)人博客網(wǎng)站:https://mindcarver.cn/ ? ? ? ? ? ? ? ?最新技術(shù)博客
github開(kāi)源項(xiàng)目:https://github.com/blockchainGuide ? ? ? ? ? ?持續(xù)會(huì)更新區(qū)塊鏈最新技術(shù)
QQ群:1057497867
- END -總結(jié)
以上是生活随笔為你收集整理的3 v4 中心节点固定_死磕以太坊源码分析之p2p节点发现的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: vc 只有顶级窗口可以弹出窗口_如何在M
- 下一篇: python init方法是不是私有方法