storm的消息格式分析
storm的消息格式分析
@(STORM)[storm]
- storm的消息格式分析
- 一ITuple接口
- 二core-storm的消息格式
- 三trident的消息格式- 一trident中tuple的基本用法調用全流程- 1用戶代碼
- 2TridentTupleView的源碼
 
- 二trident的tuple基本架構- 1主要涉及的個類
- 2數據定位基本流程
- 3其它說明
 
- 三TridentTuple接口
- 四TridentTupleView- 1成員變量
- 2主要方法
- 3內部類
- 4ProjectionFactory
- 5FreshOutputFactory
- 6OperationOutputFactory
- 7RootFactory
 
- 五ValuePointer- 1成員變量構造函數
- 22個方法
 
 
- 一trident中tuple的基本用法調用全流程
Tuple是storm中的消息,本文從用戶調用到最終源碼介紹了一個tuple的獲取與創建過程。
一、ITuple接口
storm的消息稱為一個Tuple,所有的消息格式都必須實現Ituple接口。它主要定義了如何獲取消息中的內容的各種方法,它的完整定義如下:
public interface ITuple {public int size();public boolean contains(String field);public Fields getFields();public int fieldIndex(String field);public List<Object> select(Fields selector);public Object getValue(int i);public String getString(int i);public Integer getInteger(int i);public Long getLong(int i);public Boolean getBoolean(int i);public Short getShort(int i);public Byte getByte(int i);public Double getDouble(int i);public Float getFloat(int i);public byte[] getBinary(int i);public Object getValueByField(String field);public String getStringByField(String field);public Integer getIntegerByField(String field);public Long getLongByField(String field);public Boolean getBooleanByField(String field);public Short getShortByField(String field);public Byte getByteByField(String field);public Double getDoubleByField(String field);public Float getFloatByField(String field);public byte[] getBinaryByField(String field);public List<Object> getValues(); }ITuple接口的實現類主要有2個: 
 * Tuple:用于core-storm 
 * TridentTuple:用于trident
下面2部分分別介紹這2種實現。
二、core-storm的消息格式
三、trident的消息格式
在Trident中,一個Bolt節點中可能含有多個操作,各個操作之間需要進行消息傳遞。通常,操作或者產生新的域或者對原來的域進行過濾,若每次對輸入的消息進行復制,則效率不高。
Trident利用TridentTupleView對象對消息進行封裝。例如,新產生的消息由2部分組成,一部分來自輸入,另一部分則由計算得到。TridentTupleView并不會創建一個新的消息,而是將這2部分合并,通過更新內部索引使得從外部看到如同一個消息一樣。這樣便節省了消息的拷貝和新對象創建等方面的負擔,從而提高了效率。
(一)trident中tuple的基本用法&調用全流程
1、用戶代碼
String sentence = tuple.getString(0); String sentence = tuple.getStringByField("sentence");用戶可以通過上面2種方式中的一種來取得TridentTuple中的某一個field的值。 
 下面我們以第1種方法為例繼續分析。
2、TridentTupleView的源碼
(1)其實就是調用getValue(i)方法,只是做了個類型轉換。 @Override public String getString(int i) {return (String) getValue(i); } //取得ValuePointer對象,然后調用getValueByPointer()方法 @Override public Object getValue(int i) {return getValueByPointer(_index[i]); } //真正通過索引來找到數據的方法。 private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index); }(二)trident的tuple基本架構
1、主要涉及的個類
- TridentTuple接口:繼承自ITuple接口及List接口。
- TridentTupleView類:實現了TridentTuple類,并繼承自AbstractList。
- ValuePointer:TridentTupleView的索引數據結構,用于指定哪個位置或者哪個field對應哪個實際的數據(IPersistentVector數據)。
- IPersistentVector:實際的數據對象。注意這是一個集合,位于clojure自帶的clojure.lang.PersistentVector包中。
2、數據定位基本流程
(1)用戶代碼通過get(i)或者getValueByField("fieldName")來取得TridentTupleView中的數據。如果確定類型的話,還可以使用getString(i), getStringByField(“fieldName”)等方法。 
 (2)對于前者,從ValuePointer[] _index來取得ValuePointer對象。對于后者,通過Map<String, ValuePointer> _fieldIndex來取得ValuePointer對象。 
 (3)ValuePointer就是實際數據的索引,它先根據public int delegateIndex取得這是在_哪個IPersistentVector對象中,然后通過protected int index,或者protected String field來定位到IPersistentVector中的某個元素,這就是實際的數據。
3、其它說明
(1)上面的流程簡單的說就是就是通過delegagteIndex,以及index/field來定位到一個ValuePointer對象,而ValuePointer對象是實際數據的索引,所以可以通過這個索引找到實際的數據。 
 (2)一個TridentTupleView可能有多個IPersistentVector對象,而每個IPersistentVector對象有多個元素,每個元素對應一個field的實際數據。
一個TridentTupleView只有一個_delegates的,但它包括多個delegate,可以通過_delegate.nth(i)來定位。ValuePointer中的_delegateIndex就是作這個使用的。其具體的數據結構由clojure來實現,就不細說了。
(三)TridentTuple接口
public interface TridentTuple extends ITuple, List<Object> {public static interface Factory extends Serializable {Map<String, ValuePointer> getFieldIndex();List<String> getOutputFields();int numDelegates();} }它有一個內部接口,內部接口的三個方法分別表示: 
 * field和ValuePointer的映射關系 
 * 所有的輸出filed組成的List 
 * 有多少個IPersistentVector對象。注意一個TridentTupleView可能有多個IPersistentVector對象。
(四)TridentTupleView
TridentTupleView主要定義了(1)如何構建一個消息(2)如何讀取一個消息內的具體內容。
1、成員變量
ValuePointer[] _index; Map<String, ValuePointer> _fieldIndex; IPersistentVector _delegates;每個TridentTupleView都會保存著索引信息ValuePointer,以及實現的數據 IPersistentVector。其它方法主要就是通過ValuePointer的索引信息如何在IPersistentVector中找到實際的數據。
其中_delegates可以理解為有多個delegate,然后通過nth(i)的方法定位到具體的一個,這具體的一個delegate本身也是一個集合。這是clojure.lang包自帶的類,不分析其實現了。
2、主要方法
正如上面據說的,TridentTupleView中的方法主要用于獲取TridentTupleView中的數據。比如:
@Overridepublic Integer getInteger(int i) {return (Integer) getValue(i);}不管是使用哪種方法獲取消息內容,最終都是調用這個方法:
private Object getValueByPointer(ValuePointer ptr) {return ((List<Object>)_delegates.nth(ptr.delegateIndex)).get(ptr.index); }即使用一個VauluePorint對象作索引查找PersistenceVector中的實際數據。先在_delegates內部找定位到一個具體的delegate,然后再定位到具體的元素。這里使用的nth()方法大致意思就是在_delegates中定位到一個delegate,其實現未查看clojure代碼。
3、內部類
TridentTupleView有4個內部類,它們均是實現了Factory接口,用于創建Trident消息。這些Factory子類的create()方法會被spout/bolt的各種操作調用來創建一個TridentTuple,我們以后再介紹是誰在調用這些方法。目前我們只需要知道: 
 * ProjectionFactory:它不會創建一個新的消息,而只是保留parent的部分字段(由projectFields定義) 
 * FreshOutputFactory:根據輸入的字段名和值來產生一個新的消息 
 * OperationOutputFactory:通過創建selfFields創建一個新的_delegate,然后與parent一起組成一個新的TridentTupleView。因此它的_delegates數量會+1. 
 * RootFactory:為操作的入口工廠,對輸入的消息起適配作用
4、ProjectionFactory
ProjectionFactory根據輸入的parent以及projectFields重新構建一下TridentTupleView,它不會創建一個新的消息,而只是保留parent的部分字段(由projectFields定義)。
public ProjectionFactory(Factory parent, Fields projectFields) {_parent = parent;if(projectFields==null) projectFields = new Fields();Map<String, ValuePointer> parentFieldIndex = parent.getFieldIndex();_fieldIndex = new HashMap<>();for(String f: projectFields) {_fieldIndex.put(f, parentFieldIndex.get(f));} _index = ValuePointer.buildIndex(projectFields, _fieldIndex);}public TridentTuple create(TridentTuple parent) {if(_index.length==0) return EMPTY_TUPLE;else return new TridentTupleView(((TridentTupleView)parent)._delegates, _index, _fieldIndex);}它返回的delegate的數量和parent相同,因此再次證明它不會產生新的delegate:
@Overridepublic int numDelegates() {return _parent.numDelegates();}5、FreshOutputFactory
FreshOutputFactory根據輸入的字段名和值來產生一個新的消息。
public FreshOutputFactory(Fields selfFields) {_fieldIndex = new HashMap<>();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(0, i, field));}_index = ValuePointer.buildIndex(selfFields, _fieldIndex);}public TridentTuple create(List<Object> selfVals) {return new TridentTupleView(PersistentVector.EMPTY.cons(selfVals), _index, _fieldIndex);}(1)先是在構建函數中通過field的值構建一個ValuePointer對象ValuePointer(0, i, field),也就是說這是第0個_delegate的第i個field,field的名稱是field。 
 (2)然后通過調用create()方法來創建一個PersistentVector對象,并與ValuePointer一起創建一個TridentTupleView。
FreshOutputFactory返回的_delegate對象永遠是1:
@Overridepublic int numDelegates() {return 1;}6、OperationOutputFactory
OperationOutputFactory通過創建selfFields創建一個新的_delegate,然后與parent一起組成一個新的TridentTupleView。因此它的_delegates數量會+1.
這里也證明了一個TridentTupleView會有多個_delegate的。這里指的多個是numDelegates()返回的數量,而不是指多個IPersistentVector對象。事實上每個TridentTupleView只有一個IPersistentVector對象。
public OperationOutputFactory(Factory parent, Fields selfFields) {_parent = parent;_fieldIndex = new HashMap<>(parent.getFieldIndex());int myIndex = parent.numDelegates();for(int i=0; i<selfFields.size(); i++) {String field = selfFields.get(i);_fieldIndex.put(field, new ValuePointer(myIndex, i, field));}List<String> myOrder = new ArrayList<>(parent.getOutputFields());Set<String> parentFieldsSet = new HashSet<>(myOrder);for(String f: selfFields) {if(parentFieldsSet.contains(f)) {throw new IllegalArgumentException("Additive operations cannot add fields with same name as already exists. "+ "Tried adding " + selfFields + " to " + parent.getOutputFields());}myOrder.add(f);}_index = ValuePointer.buildIndex(new Fields(myOrder), _fieldIndex);}public TridentTuple create(TridentTupleView parent, List<Object> selfVals) {IPersistentVector curr = parent._delegates;curr = (IPersistentVector) RT.conj(curr, selfVals);return new TridentTupleView(curr, _index, _fieldIndex);}再確認一下_delegate的數量:
@Overridepublic int numDelegates() {return _parent.numDelegates() + 1;}沒錯,就是parent的數量再加1.
7、RootFactory
RootFactory為操作的入口工廠,對輸入的消息起適配作用。它會根據輸入消息產生一個TridentTupleView類型的消息,這個產生的消息可以被其他工作方法使用。
(五)ValuePointer
1、成員變量&構造函數
public int delegateIndex; protected int index; protected String field;public ValuePointer(int delegateIndex, int index, String field) {this.delegateIndex = delegateIndex;this.index = index;this.field = field; }ValuePointer有3個成員變量,分別表示: 
 (1)delegateIndex表示TridentTupleView中的哪個IPersistentVector對象。正如前面據說的,TridentTupleView可能有多個IPersistentVector對象。 
 (2)index表示IPersistentVector這個集合中的哪個元素。 
 (3)field表示這個field的名稱。
因此,通過ValuePointer可以定位到哪個IPersistentVector對象,然后是IPersistentVector對象的哪個元素,以及這個元素對應的field的名稱是什么
2、2個方法
這2個方法主要用于ValuePointer[]與Map
總結
以上是生活随笔為你收集整理的storm的消息格式分析的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Kafka 副本OffsetOutOfR
- 下一篇: 使用ResourceBundle加载pr
