Storm Trident API
在Storm Trident中有五種操作類型
- Apply Locally:本地操作,所有操作應用在本地節點數據上,不會產生網絡傳輸??? ?
- Repartitioning:數據流重定向,單純的改變數據流向,不會改變數據內容,這部分會有網絡傳輸
- Aggragation:聚合操作,會有網絡傳輸
- Grouped streams上的操作
- Merge和Join
一Apply Locally
1.functions函數操作
函數的作用是接收一個tuple(需指定接收tuple的哪個字段),輸出0個或多個tuples。輸出的新字段值會被追加到原始輸入tuple的后面,如果一個function不輸出tuple,那就意味這這個tuple被過濾掉了,例如下面的例子:
1 class AddAndSubFuction extends BaseFunction{ 2 3 public void execute(TridentTuple tuple, TridentCollector collector) { 4 int res1 = tuple.getInteger(0); 5 int res2 = tuple.getInteger(1); 6 int sub = res1 > res2 ? res1 - res2 : res2 - res1; 7 collector.emit(new Values(res1+res2,sub)); 8 } 9 }?
2.Filter過濾操作
Filters很簡單,接收一個tuple,并決定是否保留這個tuple,例如
1 class ScoreFilter extends BaseFilter{ 2 3 public boolean isKeep(TridentTuple tuple) { 4 return tuple.getInteger(0) >= 60; 5 } 6 }上述Filter過濾調成績小于60的tuple.
3.partitionAggregate
PartitionAggregate的作用對每個Partition中的tuple進行聚合,與前面的函數在原tuple后面追加數據不同,PartitionAggregate的輸出會直接替換掉輸入的tuple,僅數據PartitionAggregate中發射的tuple。
TridentAPI提供了三個聚合器接口:CombinerAggregator,ReducerAggregator,Aggregator
我們先來看一看CombinerAggregator,CombinerAggregator接口的定義如下:
?
CombinerAggregator接口只返回一個tuple,并且這個tuple也只包含一個field。init方法會先執行,它負責預處理每一個接收到的tuple,然后再執行combine函數來計算收到的tuples直到最后一個tuple到達,當所有tuple處理完時,CombinerAggregator會發射zero函數的輸出,比如CombinerAggregator的實現類Count的定義如下:
?
當你使用aggregate?方法代替PartitionAggregate時,CombinerAggregator的好處就體現出來了,因為Trident會自動優化計算,在網絡傳輸tuples之前做局部聚合。
我們再來看一下ReducerAggregator,ReducerAggregator的定義如下:
?
ReducerAggregator通過init方法提供一個初始值,然后為輸入的每個tuple迭代這個值,最終產生一個唯一的tuple并輸出,定義一個實例如下:
?
最后看一下通用的聚合器Aggregator,它的定義如下:
public interface Aggregator<T> extends Operation {T init(Object batchId, TridentCollector collector);void aggregate(T val, TridentTuple tuple, TridentCollector collector);void complete(T val, TridentCollector collector); } Aggregator接口可以發射含任意數量屬性的任意數據量的tuples,并且可以在執行過程中的任何時候發射:
init:在處理數據之前被調用,它的返回值會作為一個狀態值傳遞給aggregate和complete方法
aggregate:用來處理每一個輸入的tuple,它可以更新狀態值也可以發射tuple
complete:當所有tuple都被處理完成后被調用
有時候我們需要執行多個聚合器,這在Trident中稱為chaining
4.projection投影操作
投影操作的作用是僅僅保留stream指定字段的數據,和關系數據庫中投影的概念類似
二Repartitioning重定向操作
重定向操作是如何在各個任務間對tuples進行分區。分區的數量也有可能改變重定向的結果。重定向需要網絡傳輸,下面介紹下重定向函數:
?
三Aggragation聚合操作
Trident有aggregate和 persistentAggregate方法來做聚合操作。aggregate是獨立的運行在Stream的每個Batch上的,而persistentAggregate則是運行在Stream的所有Batch上并把運算結果存儲在state source中。 運行aggregate方法做全局聚合。當你用到 ReducerAggregator或Aggregator時,Stream首先被重定向到一個分區中,然后其中的聚合函數便在這個分區上運行。當你用到CombinerAggregator時,Trident會首先在每個分區上做局部聚合,然后把局部聚合后的結果重定向到一個分區,因此使用CombinerAggregator會更高效,可能的話我們需要優先考慮使用它。
四Grouped streams
GroupBy操作是根據特定的字段對流進行重定向的,還有,在一個分區內部,每個相同字段的tuple也會被Group到一起。如果你在grouped Stream上面運行aggregators,聚合操作會運行在每個Group中而不是整個Batch。persistentAggregate也能運行在GroupedSteam上,不過結果會被保存在MapState中,其中的key便是分組的字段。 當然,aggregators在GroupedStreams上也可以串聯。
五Merge和Join
api的最后一部分便是如何把各種流匯聚到一起。最簡單的方式就是把這些流匯聚成一個流。我們可以這么做:
topology.merge(stream1, stream2, stream3);?
另一種合并流的方式就是join。一個標準的join就像是一個sql,必須有標準的輸入,因此,join只針對符合條件的Stream。join應用在來自Spout的每一個小Batch中。join時候的tuple會包含:
1.join的字段,如Stream1中的key和Stream2中的x
2.所有非join的字段,根據傳入join方法的順序,a和b分別代表steam1的val1和val2,c代表Stream2的val1
當join的是來源于不同Spout的stream時,這些Spout在發射數據時需要同步,一個Batch所包含的tuple會來自各個Spout。
?
?
?
?
轉載于:https://www.cnblogs.com/senlinyang/p/8081447.html
總結
以上是生活随笔為你收集整理的Storm Trident API的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 第一周冲刺_周三总结
- 下一篇: 随机验证码 php