Fabric源码分析-共识模块
正好這些天要有一個需求要幫客戶魔改Fabric-v0.6,把一些hyperchain的高級特性移植過去,借此機會把之前看過的源碼在梳理一下。
下面就是對Fabric共識模塊的源碼分析和梳理,代碼都是以Fabric-v0.6-preview為例,在1.0及后續版本中都移除了PBFT部分,用了更好的SBFT,目前這一部分還在開發中。
目錄結構
可以看到共識模塊目錄如下。
consensus ├── controller ├── executor ├── helper │ └── persist ├── noops ├── pbft └── util└── events目錄含義如下
- controller 用來控制Fabric選擇什么樣的共識算法,默認是noops。
 - executor 封裝了消息隊列中對交易的處理。
 - helper 對外提供接口調用和數據持久化接口。
 - noops 提供了如何編寫Fabric共識算法的Demo。
 - pbft PBFT算法的具體實現。
 - util 實現了一個peer節點到共識算法的一個消息通道,和一個消息隊列。
 
流程概覽
Fabric網絡通過一個EventLoop和共識算法進行交互,所有的操作都通過對事件循環中的事件監聽進行推進。
整體流程如下圖所示。
Consensus模塊接口
fabric/consensus/consensus.go對外提供共識模塊的方法調用。
其中最核心也是每個算法必須實現的接口是Consenter。
type ExecutionConsumer interface {Executed(tag interface{}) Committed(tag interface{}, target *pb.BlockchainInfo) RolledBack(tag interface{}) StateUpdated(tag interface{}, target *pb.BlockchainInfo) }type Consenter interface {RecvMsg(msg *pb.Message, senderHandle *pb.PeerID) errorExecutionConsumer }接口的具體實現在fabric/consensus/pbft/external.go。
因為對交易的操作都是異步的,所以必須手動實現Executed,Committed,RolledBack,StateUpdated方法來監聽對應動作的完成。
RecvMsg方法用來從不用的peer節點接收消息。
初始化共識模塊
共識算法引擎在peer啟動的時候初始化,初始化的具體函數如下所示。
// consensus/helper/engine.go func GetEngine(coord peer.MessageHandlerCoordinator) (peer.Engine, error) {var err errorengineOnce.Do(func() {engine = new(EngineImpl)engine.helper = NewHelper(coord)engine.consenter = controller.NewConsenter(engine.helper)engine.helper.setConsenter(engine.consenter)engine.peerEndpoint, err = coord.GetPeerEndpoint()engine.consensusFan = util.NewMessageFan()go func() {logger.Debug("Starting up message thread for consenter")for msg := range engine.consensusFan.GetOutChannel() {engine.consenter.RecvMsg(msg.Msg, msg.Sender)}}()})return engine, err }GetEngine的作用是進行共識模塊的初始化,同時啟動一個goroutine等待消息進入。
具體的engine.consenter是在consensus/controller/controller.go里選擇。
// consensus/controller/controller.go func NewConsenter(stack consensus.Stack) consensus.Consenter {plugin := strings.ToLower(viper.GetString("peer.validator.consensus.plugin"))if plugin == "pbft" {logger.Infof("Creating consensus plugin %s", plugin)return pbft.GetPlugin(stack)}logger.Info("Creating default consensus plugin (noops)")return noops.GetNoops(stack)}默認選擇的是noops,如果需要添加自己編寫的共識模塊需要在這里自行添加判斷。
noops 只是演示如何編寫Fabric共識模塊,不要用在生產環境。如果選擇了PBFT則會調用consensus/pbft/pbft.go進行初始化。
使用PBFT的batch模式啟動時會調用newObcBatch進行PBFT算法初始化。
PBFT只有batch一種模式。 // consensus/pbft/batch.go func newObcBatch(id uint64, config *viper.Viper, stack consensus.Stack) *obcBatch {var err error...op.manager = events.NewManagerImpl() op.manager.SetReceiver(op)etf := events.NewTimerFactoryImpl(op.manager)op.pbft = newPbftCore(id, config, op, etf)op.manager.Start()blockchainInfoBlob := stack.GetBlockchainInfoBlob()op.externalEventReceiver.manager = op.manager...return op }newObcBatch主要做了這幾項工作
- 初始化了eventLoop的消息隊列。
 - 設置了消息的接收者,用來處理對應的消息。
 - 創建監聽消息超時的定時器。
 - 初始化pbft算法。
 - 啟動消息隊列,不斷監聽事件的到來并且分發給接收者處理。
 
消息處理
Fabric的共識消息是通過eventLoop注射給對應處理函數的。
// consensus/util/events/events.go func SendEvent(receiver Receiver, event Event) {next := eventfor {next = receiver.ProcessEvent(next)if next == nil {break}} }func (em *managerImpl) Inject(event Event) {if em.receiver != nil {SendEvent(em.receiver, event)} }func (em *managerImpl) eventLoop() {for {select {case next := <-em.events:em.Inject(next)case <-em.exit:logger.Debug("eventLoop told to exit")return}} }eventLoop函數不斷的從em.events里取出事件,通過Inject注射給對應的接收者,注意,通過SendEvent注射給接收者的ProcessEvent方法。
SendEvent函數實現非常有意思,如果receiver.ProcessEvent的返回不為nil則不斷的調用receiver.ProcessEvent直到找到對應的消息處理函數,在ProcessEvent函數中,其余case均為事件處理函數,唯獨pbftMessage依賴SendEvent發送消息給其余函數處理。
// consensus/pbft/pbft-core.go func (instance *pbftCore) ProcessEvent(e events.Event) events.Event {...case *pbftMessage:return pbftMessageEvent(*et)case pbftMessageEvent:msg := etlogger.Debugf("Replica %d received incoming message from %v", instance.id, msg.sender)next, err := instance.recvMsg(msg.msg, msg.sender)if err != nil {break}return nextcase *RequestBatch:err = instance.recvRequestBatch(et)case *PrePrepare:err = instance.recvPrePrepare(et)... }可以看到*pbftMessage和pbftMessageEvent這兩個case通過recvMsg的返回值又把消息分發給其余case,非常巧妙。
PBFT算法的不同階段都會按著上面的流程映射到不同的處理函數往前推進,本質上是一個狀態機。
至此Fabric的Consensus模塊主要流程已經梳理清楚,熟悉了這個流程以后再結合PBFT算法的過程就可以很容易在此基礎上添加新的功能了。
https://zhuanlan.zhihu.com/p/35255567
總結
以上是生活随笔為你收集整理的Fabric源码分析-共识模块的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: 第三篇 - EOS DAWN-V3.0.
 - 下一篇: Fabric学习笔记-PBFT算法