理解Storm Metrics
生活随笔
收集整理的這篇文章主要介紹了
理解Storm Metrics
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
在hadoop中,存在對應(yīng)的counter計數(shù)器用于記錄hadoop map/reduce job任務(wù)執(zhí)行過程中自定義的一些計數(shù)器,其中hadoop任務(wù)中已經(jīng)內(nèi)置了一些計數(shù)器,例如CPU時間,GC時間等。 Storm中也存在類似counter的功能,metrics,詳細介紹可以參考下面的文檔: http://storm.apache.org/releases/1.0.1/Metrics.html Storm exposes a metrics interface to report summary statistics across the full topology. It's used internally to track the numbers you see in the Nimbus UI console: counts of executes and acks; average process latency per bolt; worker heap usage; and so forth. ? 所有的Metric都需要實現(xiàn)IMetric接口,該接口中只有一個方法用來表示取出現(xiàn)有的counter值,并將其清零。 public interface IMetric {public Object getValueAndReset();
} ? 以提供的實例CountMetric, MultiCountMetric和ReduceMetric為例,類圖方式展示如下:
? 在CountMetric中,只是記錄了一個long值,在每次incr和incrBy時進行遞增記錄; MultiCountMetric中,其中內(nèi)置了Map<String, CountMetric>用來記錄多個CountMetric,scope方法用來以key的方式圈定范圍,在每次getValueAndSet時,都會直接清掉Map中所有的CountMetric; ReducedMetric是比較特殊的一個,因為它不僅僅記錄了一個維度,還可以以reduce的方式來計算一段時間來的平均值,比如Storm中提供的實現(xiàn)MeanReducer,在ReducedMetric中的實現(xiàn),其中_accumlator作為累積數(shù)據(jù)(從字面意思上理解),類型為Object,可以用任何類型來表示: public class ReducedMetric implements IMetric {private final IReducer _reducer;private Object _accumulator;public ReducedMetric(IReducer reducer) {_reducer = reducer;_accumulator = _reducer.init();}public void update(Object value) {_accumulator = _reducer.reduce(_accumulator, value);}public Object getValueAndReset() {Object ret = _reducer.extractResult(_accumulator);_accumulator = _reducer.init();return ret;} } ? ? MeanReducer中,就記錄兩個維度 count和總和,通過這兩個維度,我們可以輕易計算出一段時間內(nèi)的平均值。 class MeanReducerState {public int count = 0;public double sum = 0.0; }public class MeanReducer implements IReducer<MeanReducerState> {public MeanReducerState init() {return new MeanReducerState();}public MeanReducerState reduce(MeanReducerState acc, Object input) {acc.count++;if(input instanceof Double) {acc.sum += (Double)input;} else if(input instanceof Long) {acc.sum += ((Long)input).doubleValue();} else if(input instanceof Integer) {acc.sum += ((Integer)input).doubleValue();} else {throw new RuntimeException("MeanReducer::reduce called with unsupported input type `" + input.getClass()+ "`. Supported types are Double, Long, Integer.");}return acc;}public Object extractResult(MeanReducerState acc) {if(acc.count > 0) {return acc.sum / (double) acc.count;} else {return null;}} } ? 所有的Metrics都需要在Spout/Bolt初始化之前記錄,對應(yīng)Spout.open(), Bolt.prepare方法,注冊時需要指定指標的名稱,對應(yīng)實例,以及間隔時間(以秒為單位)。 context.registerMetric("execute_count", countMetric, 5); context.registerMetric("word_count", wordCountMetric, 60); context.registerMetric("word_length", wordLengthMeanMetric, 60); ?
? 在Config中注冊后,通過內(nèi)置的特殊Bolt:MetricConsumerBolt來執(zhí)行handleDataPoints方法,其中handleDataPoints賦給的兩個參數(shù)taskInfo, dataPoints如下所示,給定了source task的一些狀態(tài),以及傳輸過來的匯總數(shù)據(jù):
? 在應(yīng)用后,就可以在storm的日志目錄下查看到metrics日志文件: /usr/local/apache-storm-1.0.1/logs/workers-artifacts/FirstTopo-46-1468485056/6703-rw-rw-r-- 1 java java 55K 7月 14 18:47 gc.log.0 -rw-rw-r-- 1 java java 28K 7月 14 18:47 worker.log -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.err -rw-rw-r-- 1 java java 1.2M 7月 14 18:47 worker.log.metrics -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.out -rw-rw-r-- 1 java java 5 7月 14 16:31 worker.pid -rw-rw-r-- 1 java java 120 7月 14 16:31 worker.yaml ? 在worker.log.metrics中就可以查看到所有metrics的相關(guān)信息,注意不僅僅包含我們自定義的bolt類型,一些system類型的信息也會在上面顯示出來: 2016-07-14 16:31:40,700 31721 1468485098 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:45,702 36723 1468485103 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:50,702 41723 1468485108 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:10,705 61726 1468485128 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:15,708 66729 1468485133 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:25,699 76720 1468485143 192.168.1.127:6702 6:bolt __ack-count {spout:default=60} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt __sendqueue {sojourn_time_ms=0.0, write_pos=10, read_pos=10, arrival_rate_secs=0.10267994660642776, overflow=0, capacity=1024, population=0} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt word_count {happy=18, angry=19, excited=14} 2016-07-14 16:32:25,702 76723 1468485143 192.168.1.127:6702 6:bolt __receive {sojourn_time_ms=817.6666666666666, write_pos=62, read_pos=61, arrival_rate_secs=1.222992254382389, overflow=0, capacity=1024, population=1} ?
? 在CountMetric中,只是記錄了一個long值,在每次incr和incrBy時進行遞增記錄; MultiCountMetric中,其中內(nèi)置了Map<String, CountMetric>用來記錄多個CountMetric,scope方法用來以key的方式圈定范圍,在每次getValueAndSet時,都會直接清掉Map中所有的CountMetric; ReducedMetric是比較特殊的一個,因為它不僅僅記錄了一個維度,還可以以reduce的方式來計算一段時間來的平均值,比如Storm中提供的實現(xiàn)MeanReducer,在ReducedMetric中的實現(xiàn),其中_accumlator作為累積數(shù)據(jù)(從字面意思上理解),類型為Object,可以用任何類型來表示: public class ReducedMetric implements IMetric {private final IReducer _reducer;private Object _accumulator;public ReducedMetric(IReducer reducer) {_reducer = reducer;_accumulator = _reducer.init();}public void update(Object value) {_accumulator = _reducer.reduce(_accumulator, value);}public Object getValueAndReset() {Object ret = _reducer.extractResult(_accumulator);_accumulator = _reducer.init();return ret;} } ? ? MeanReducer中,就記錄兩個維度 count和總和,通過這兩個維度,我們可以輕易計算出一段時間內(nèi)的平均值。 class MeanReducerState {public int count = 0;public double sum = 0.0; }public class MeanReducer implements IReducer<MeanReducerState> {public MeanReducerState init() {return new MeanReducerState();}public MeanReducerState reduce(MeanReducerState acc, Object input) {acc.count++;if(input instanceof Double) {acc.sum += (Double)input;} else if(input instanceof Long) {acc.sum += ((Long)input).doubleValue();} else if(input instanceof Integer) {acc.sum += ((Integer)input).doubleValue();} else {throw new RuntimeException("MeanReducer::reduce called with unsupported input type `" + input.getClass()+ "`. Supported types are Double, Long, Integer.");}return acc;}public Object extractResult(MeanReducerState acc) {if(acc.count > 0) {return acc.sum / (double) acc.count;} else {return null;}} } ? 所有的Metrics都需要在Spout/Bolt初始化之前記錄,對應(yīng)Spout.open(), Bolt.prepare方法,注冊時需要指定指標的名稱,對應(yīng)實例,以及間隔時間(以秒為單位)。 context.registerMetric("execute_count", countMetric, 5); context.registerMetric("word_count", wordCountMetric, 60); context.registerMetric("word_length", wordLengthMeanMetric, 60); ?
IMetricsConsumer
?
注冊metrics后,只是在定時進行記錄metrics,但metrics該如何顯示,這就取決于IMetricsConsumer,在Config中可以手動進行注冊自定義的metricsConsumer,也可以直接使用storm中提供的記錄日志的LoggingMetricsConsumer,該consumer會以日志的形式記錄統(tǒng)計指標,下面是對其介紹: Listens for all metrics, dumps them to log To use, add this to your topology's configuration: ```java conf.registerMetricsConsumer(org.apache.storm.metrics.LoggingMetricsConsumer.class, 1); ``` Or edit the storm.yaml config file: ```yaml topology.metrics.consumer.register: - class: "org.apache.storm.metrics.LoggingMetricsConsumer" parallelism.hint: 1 ?? 這表示,在config中可以通過手動注冊的方式將LoggingMetricsConsumer注冊上去,第二個參數(shù)為并行度: config.registerMetricsConsumer(LoggingMetricsConsumer.class, 2); ?? 此時Config對象(類似HashMap)會將topology.metrics.consumer.register屬性注冊,記錄其class, parallelism.hint并行度,以及argument參數(shù)。? 在Config中注冊后,通過內(nèi)置的特殊Bolt:MetricConsumerBolt來執(zhí)行handleDataPoints方法,其中handleDataPoints賦給的兩個參數(shù)taskInfo, dataPoints如下所示,給定了source task的一些狀態(tài),以及傳輸過來的匯總數(shù)據(jù):
? 在應(yīng)用后,就可以在storm的日志目錄下查看到metrics日志文件: /usr/local/apache-storm-1.0.1/logs/workers-artifacts/FirstTopo-46-1468485056/6703-rw-rw-r-- 1 java java 55K 7月 14 18:47 gc.log.0 -rw-rw-r-- 1 java java 28K 7月 14 18:47 worker.log -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.err -rw-rw-r-- 1 java java 1.2M 7月 14 18:47 worker.log.metrics -rw-rw-r-- 1 java java 0 7月 14 16:31 worker.log.out -rw-rw-r-- 1 java java 5 7月 14 16:31 worker.pid -rw-rw-r-- 1 java java 120 7月 14 16:31 worker.yaml ? 在worker.log.metrics中就可以查看到所有metrics的相關(guān)信息,注意不僅僅包含我們自定義的bolt類型,一些system類型的信息也會在上面顯示出來: 2016-07-14 16:31:40,700 31721 1468485098 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:45,702 36723 1468485103 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:31:50,702 41723 1468485108 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:10,705 61726 1468485128 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:15,708 66729 1468485133 192.168.1.127:6702 6:bolt execute_count 5 2016-07-14 16:32:25,699 76720 1468485143 192.168.1.127:6702 6:bolt __ack-count {spout:default=60} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt __sendqueue {sojourn_time_ms=0.0, write_pos=10, read_pos=10, arrival_rate_secs=0.10267994660642776, overflow=0, capacity=1024, population=0} 2016-07-14 16:32:25,701 76722 1468485143 192.168.1.127:6702 6:bolt word_count {happy=18, angry=19, excited=14} 2016-07-14 16:32:25,702 76723 1468485143 192.168.1.127:6702 6:bolt __receive {sojourn_time_ms=817.6666666666666, write_pos=62, read_pos=61, arrival_rate_secs=1.222992254382389, overflow=0, capacity=1024, population=1} ?
轉(zhuǎn)載于:https://www.cnblogs.com/mmaa/p/5789852.html
總結(jié)
以上是生活随笔為你收集整理的理解Storm Metrics的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: HTTP提交数据
- 下一篇: 必须使用初始化列表的情况