twitter storm源码走读(五)
TridentTopology創(chuàng)建過程詳解
從用戶層面來看TridentTopology,有兩個重要的概念一是Stream,另一個是作用于Stream上的各種Operation。在實(shí)現(xiàn)層面來看,無論是stream,還是后續(xù)的operation都會轉(zhuǎn)變成為各個Node,這些Node之間的關(guān)系通過重要的數(shù)據(jù)結(jié)構(gòu)圖來維護(hù)。具體到TridentTopology,實(shí)現(xiàn)圖的各種操作的組件是jgrapht。
說到圖,兩個基本的概念會閃現(xiàn)出來,一是結(jié)點(diǎn),二是描述結(jié)點(diǎn)之間關(guān)系的邊。要想很好的理解TridentTopology就需要緊盯圖中結(jié)點(diǎn)和邊的變化。
TridentTopology在轉(zhuǎn)換成為普通的StormTopology時,需要將原始的圖分成各個group,每個group將運(yùn)行于一個獨(dú)立的bolt中。TridentTopology又是如何知道哪些node應(yīng)該在同一個group,哪些應(yīng)該處在另一個group中的呢;如何來確定每個group的并發(fā)度(parallismHint)的呢。這些問題的解決都與jgrapht分不開。
關(guān)于jgrapht的更多信息,請參考其官方網(wǎng)站?http://jgrapht.org
概要
在TridentTopology中向圖中添加結(jié)點(diǎn)的api有三種:
其中addNode在創(chuàng)建stream是使用,addSourcedStateNode在partitionPersist時使用到,其它的operation使用到的是addSourcedNode.
addNode與其它兩個方法的一個重要區(qū)別還在于,addNode是不需要添加邊(Edge),而其它兩個API需要往圖中添加edge,以確定該node的源是哪個。
TridentTopology
| 1 2 3 4 | public?TridentTopology() { ????????_graph =?new?DefaultDirectedGraph(new?ErrorEdgeFactory()); ????????_gen =?new?UniqueIdGen(); ????} |
?在TridentTopology的構(gòu)造函數(shù)中,創(chuàng)建了DAG(有向無環(huán)圖)。利用這個_graph來作為容器以存儲后續(xù)過程中創(chuàng)建的各個node及它們之間的關(guān)系。
newStream
?newStream會為DAG(有向無環(huán)圖)中創(chuàng)建源結(jié)點(diǎn),其調(diào)用關(guān)系如下所示。
- newStream
- addNode
- registerNode
- addNode
?
each
作用于stream上的Operation有很多,以each為例來看新的operation是如何轉(zhuǎn)換成為node添加到_graph中的。
//Stream.javapublic Stream each(Fields inputFields, Function function, Fields functionFields) {projectionValidation(inputFields);return _topology.addSourcedNode(this,new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new EachProcessor(inputFields, function)));}
調(diào)用關(guān)系描述如下
- Stream::each
- TridentTopology::addSourcedNode
- TridentTopology::registerSourcedNode
registerSourcedNode的實(shí)現(xiàn)如下
protected void registerSourcedNode(List<Stream> sources, Node newNode) {registerNode(newNode);int streamIndex = 0;for(Stream s: sources) {_graph.addEdge(s._node, newNode, new IndexedEdge(s._node, newNode, streamIndex));streamIndex++;} }注意此處添加edge是,是有索引的,這樣可以區(qū)別處理的先后順序。
在Stream中含有成員變量_node,表示stream最近停泊的node,有了該變量添加edge才成為了可能。
?
partitionPersist
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {projectionValidation(inputFields);String id = _topology.getUniqueStateId();ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),_name,functionFields,functionFields,new PartitionPersistProcessor(id, inputFields, updater));n.committer = true;n.stateInfo = new NodeStateInfo(id, stateSpec);return _topology.addSourcedStateNode(this, n);}調(diào)用關(guān)系
- Stream::partitionPersist
- TridentTopology::addSourcedStateNode
- TridentTopology::registerSourcedNode
與addNode及addSourcedNode不同的是,addSourcedStateNode返回的是TridentState而非Stream。
既然談到了TridentState就不得不談到其另一面Stream::stateQuery,
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {projectionValidation(inputFields);String stateId = state._node.stateInfo.id;Node n = new ProcessorNode(_topology.getUniqueStreamId(),_name,TridentUtils.fieldsConcat(getOutputFields(), functionFields),functionFields,new StateQueryProcessor(stateId, inputFields, function));_topology._colocate.get(stateId).add(n);return _topology.addSourcedNode(this, n);}從此處可以看出stateQueryNode最起碼有兩個inputStream,一是從TridentState而來表示狀態(tài)已經(jīng)改變,另一個是處于drpcStream這個方面的上一跳結(jié)點(diǎn)。
build
TridentTopology::build是將TridentTopology轉(zhuǎn)變?yōu)镾tormTopology的過程,這一過程中最重要的一環(huán)就是將_graph中含有的node進(jìn)行分組。
grouping
算法邏輯概述
- 將boltNodes中的每個boltNode作為一個group加入全部加入initialGroups
- 以graph和initialGroups作為入?yún)?chuàng)建GraphGrouper
- 分組的過程其實(shí)就是進(jìn)行合并的過程,詳見GraphGrouper::mergeFully()
- 如果從當(dāng)前group1的輸出目的地都是屬于group2,則將group1,group2合并
- 如果當(dāng)前group1的所有輸入源都是來自于group2,則將group1,group2合并
- 將需要合并的group1,group2作為入?yún)?chuàng)建新的group,同時將group1,group2從已有的集合出移除
GraphGrouper::merge()
private void merge(Group g1, Group g2) {Group newGroup = new Group(g1, g2);currGroups.remove(g1);currGroups.remove(g2);currGroups.add(newGroup);for(Node n: newGroup.nodes) {groupIndex.put(n, newGroup);}}在group之間添加partitionNode
// add identity partitions between groupsfor(IndexedEdge<Node> e: new HashSet<IndexedEdge>(graph.edgeSet())) {if(!(e.source instanceof PartitionNode) && !(e.target instanceof PartitionNode)) { Group g1 = grouper.nodeGroup(e.source);Group g2 = grouper.nodeGroup(e.target);// g1 being null means the source is a spout nodeif(g1==null && !(e.source instanceof SpoutNode))throw new RuntimeException("Planner exception: Null source group must indicate a spout node at this phase of planning");if(g1==null || !g1.equals(g2)) {graph.removeEdge(e);PartitionNode pNode = makeIdentityPartition(e.source);graph.addVertex(pNode);graph.addEdge(e.source, pNode, new IndexedEdge(e.source, pNode, 0));graph.addEdge(pNode, e.target, new IndexedEdge(pNode, e.target, e.index)); }}}
_graph中所有的node在變換過后,變成兩組元素,一是spoutNodes,另一個是合并后的mergedGroup.
spoutNodes中的每個元素作為spout添加到TridentTopologyBuilder的_spouts數(shù)組中,mergedGroup中的每個group添加到TridentTopologyBuilder的_bolt數(shù)組中。在TridentTopologyBuilder::build()中最主要的事情是為每個_spouts和_bolts數(shù)組中的成員添加grouping關(guān)系。
小結(jié)
到目前為止,通過兩篇文章分析了TridentTopology的創(chuàng)建過程及其運(yùn)行時在每個TridentBoltExecutor中的消息傳遞情況。接下來會分析TridentTopology提供的API實(shí)現(xiàn)及其作用場景。
總結(jié)
以上是生活随笔為你收集整理的twitter storm源码走读(五)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 转:java的各个拓展类库的推荐方案
- 下一篇: .net core入门之web应用