storm metric的使用说明
storm metric的使用說明
@(STORM)[storm]
- storm metric的使用說明
- 一概述
- 二使用storm metric的關鍵步驟
- 1在bolt的prepare中注冊metric
- 2在bolt的execute方法中更新metric
- 3在topo中指定將metric consumer這里使用了storm自帶的consumer將其輸出到日志文件中也可以自定義consumer見下面
- 4示例程序完整代碼
- 5輸出示例
- 三詳細說明
- 1metric和metric consumer
- 2metric
- 3自帶的Metric實現
- 4Metric Consumer
- 5自定義MetricsConsumer
- 6storm ui的變化
- 四應用場景
- 1調試
- 2監控
(一)概述
storm metric類似于hadoop的counter,用于收集應用程序中的特定指標,輸出到外部。在storm中是存儲到各個機器logs目錄下的metric.log文件中。有時我們想保存一些計算的中間變量,當達到一定狀態時,統一在一個位置輸出,或者統計整個應用的一些指標,metric是個很好的選擇。
(二)使用storm metric的關鍵步驟
1、在bolt的prepare中注冊metric
transient CountMetric _countMetric; transient MultiCountMetric _wordCountMetric; transient ReducedMetric _wordLengthMeanMetric;void initMetrics(TopologyContext context) {_countMetric = new CountMetric();_wordCountMetric = new MultiCountMetric();_wordLengthMeanMetric = new ReducedMetric(new MeanReducer());context.registerMetric("execute_count", _countMetric, 5);context.registerMetric("word_count", _wordCountMetric, 60);context.registerMetric("word_length", _wordLengthMeanMetric, 60); }(1)metric都定義為了transient。因為這些Metric實現都沒有實現Serializable,而在storm的spout/bolt中,所有非transient的變量都必須Serializable
(2)三個參數為metric名稱,metric對象,以及時間間隔。時間間隔表示多久一次metric將數據發送給metric consumer。
2、在bolt的execute方法中更新metric
void updateMetrics(String word) {_countMetric.incr();_wordCountMetric.scope(word).incr();_wordLengthMeanMetric.update(word.length()); }3、在topo中指定將metric consumer,這里使用了storm自帶的consumer將其輸出到日志文件中,也可以自定義consumer。見下面。
conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);4、示例程序完整代碼
package com.lujinhong.demo.storm.metrics;import java.util.Map;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.metric.LoggingMetricsConsumer; import backtype.storm.metric.api.CountMetric; import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.MultiCountMetric; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;// 定義指標統計對象transient CountMetric _countMetric;transient MultiCountMetric _wordCountMetric;transient ReducedMetric _wordLengthMeanMetric;@Overridepublic void prepare(Map conf, TopologyContext context,OutputCollector collector) {_collector = collector;initMetrics(context);}@Overridepublic void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);updateMetrics(tuple.getString(0));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}// 初始化計數器void initMetrics(TopologyContext context) {_countMetric = new CountMetric();_wordCountMetric = new MultiCountMetric();_wordLengthMeanMetric = new ReducedMetric(new MeanReducer());context.registerMetric("execute_count", _countMetric, 5);context.registerMetric("word_count", _wordCountMetric, 60);context.registerMetric("word_length", _wordLengthMeanMetric, 60);}// 更新計數器void updateMetrics(String word) {_countMetric.incr();_wordCountMetric.scope(word).incr();_wordLengthMeanMetric.update(word.length());}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 10);builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");Config conf = new Config();conf.setDebug(true);// 輸出統計指標值到日志文件中conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf,builder.createTopology());} else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}} }5、輸出示例
2015-11-05 14:31:06,801 38311 1446705066 gdc-storm11-storm.i.nease.net:6727 8:exclaim1 execute_count 165 2015-11-05 14:31:06,835 38345 1446705066 gdc-storm08-storm.i.nease.net:6727 6:exclaim1 execute_count 165 2015-11-05 14:31:11,801 43311 1446705071 gdc-storm11-storm.i.nease.net:6727 8:exclaim1 execute_count 166 2015-11-05 14:31:11,835 43345 1446705071 gdc-storm08-storm.i.nease.net:6727 9:exclaim2 execute_count 2502015-11-05 14:32:32,391 123901 1446705152 gdc-storm08-storm.i.nease.net:6727 6:exclaim1 word_length 5.852204408817635 2015-11-05 14:33:32,373 183883 1446705212 gdc-storm11-storm.i.nease.net:6727 8:exclaim1 word_length 5.797094188376754 2015-11-05 14:33:32,391 183901 1446705212 gdc-storm08-storm.i.nease.net:6727 6:exclaim1 word_length 5.7766064257028112015-11-05 14:31:32,374 63884 1446705092 gdc-storm11-storm.i.nease.net:6727 8:exclaim1 word_count {bertels=413, jackson=354, nathan=432, mike=380, golda=412} 2015-11-05 14:31:32,391 63901 1446705092 gdc-storm08-storm.i.nease.net:6727 6:exclaim1 word_count {bertels=374, jackson=396, nathan=411, mike=416, golda=392} 2015-11-05 14:31:32,404 63914 1446705092 gdc-storm08-storm.i.nease.net:6727 9:exclaim2 word_count {bertels!!!=593, golda!!!=615, jackson!!!=571, nathan!!!=646, mike!!!=562}(三)詳細說明
1、metric和metric consumer
storm中關于metric的API主要有2部分:metric和metric consumer。前者用于生成一些統計值,后者將其處理后輸出。更詳細請見下面。
2、metric
(1)metric在用于在spout/bolt中收集度量值。
(2)metric通過TopologyContext.registerMetric(…)來注冊到相應的spout或者bolt中,一般是在prepare/open方法中注冊metric,然后在execute方法中更新metric值。
(3)Metric必須實現backtype.storm.metric.api.IMetric接口。
3、自帶的Metric實現
storm提供了幾個常用的metric實現:
AssignableMetric – set the metric to the explicit value you supply. Useful if it’s an external value or in the case that you are already calculating the summary statistic yourself. Note: Useful for statsd Gauges.
CombinedMetric – generic interface for metrics that can be updated associatively.
CountMetric – a running total of the supplied values. Call incr() to increment by one, incrBy(n) to add/subtract the given number.
Note: Useful for statsd counters.
MultiCountMetric– a hashmap of count metrics. Note: Useful for many Counters where you may not know the name of the metric a priori or where creating many Counter’s manually is burdensome.
MeanReducer – an implementation of ReducedMetric that tracks a running average of values given to its reduce() method. (It accepts Double, Integer or Long values, and maintains the internal average as a Double.) Despite his reputation, the MeanReducer is actually a pretty nice guy in person.
4、Metric Consumer
(1)MetricsConsumer用于收集拓撲中注冊的所有Metric,并進行處理、輸出等。處理的對象為DataPoint類的對象,同時包括了一些額外信息,如worker主機,worker端口,組件ID,taskID,時間戳,更新周期等。
(2)MetricsConsumer使用backtype.storm.Config.registerMetricsConsumer(…)在創建topo時注冊,或者在storm的配置文件中指定topology.metrics.consumer.register。
(3)MetricsConsumer必須實現backtype.storm.metric.api.IMetricsConsumer接口。
5、自定義MetricsConsumer
上面例子中的
conf.registerMetricsConsumer(LoggingMetricsConsumer.class, 2);其中LoggingMetricsConsumer是由系統提供的類,它將內容寫入metrics.log文件中,格式可參考上面的輸出。
如果對輸出有特殊要求,比如要將日志輸出到其它地方,或者是輸出的內容不同,可以自定義一個consumer。例如:
6、storm ui的變化
在storm的UI中會有一個metric的節點,它與所有的節點都有交集。
沒有metric的情況:
加上metric的情況:
(四)應用場景
1、調試
將一些調整級別的信息輸出到metric中,以協助定位分析問題
2、監控
將一些核心指標定期輸出到外部系統,如監控系統,當發生異常時自動報警。
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的storm metric的使用说明的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java.util.logging.Lo
- 下一篇: storm hook的使用