storm hook的使用
storm hook的使用
@(STORM)[storm]
- storm hook的使用- 一原理
- 二入門例子
- 三hook的類型
- 四應用場景
 
(一)原理
1、先看一下storm的hook是什么東西: http://storm.apache.org/documentation/Hooks.html
Storm provides hooks with which you can insert custom code to run on any number of events within Storm. You create a hook by extending the BaseTaskHook class and overriding the appropriate method for the event you want to catch. There are two ways to register your hook: 
 In the open method of your spout or prepare method of your bolt using the TopologyContext method. 
 Through the Storm configuration using the “topology.auto.task.hooks” config. These hooks are automatically registered in every spout or bolt, and are useful for doing things like integrating with a custom monitoring system.
2、storm的hook也是一個典型的鉤子,當某些事情發生時(比如說執行execute方法,執行ack方法時),相應的代碼會自動被調用。
3、通過繼承BaseTaskHook,并覆蓋其方法來創建一個hook。
4、如何把hook注冊到一個拓撲中,有2種方法: 
 (1)在spout的open方法或者bolt的prepare方法中調用: 
 TopologyContext.addTaskHook(new **Hook()) 
 (2)在storm的配置文件中修改topology.auto.task.hooks項,這會自己注冊到每一個spout和bolt。這種情況對于一些集成應用或者監控之類的有用。
(二)入門例子
1、先創建hook 
 這個hook很簡單,就是當execute或者ack方法被調用時,將相應的信息打印出來:
2、創建topo,并且在topo中注冊鉤子
(1)拓撲很簡單,就是storm-starter中的ExlamationTopoloty,它的spout會隨機發送名字,然后經過2個bolt,每個bolt均在后面加!!!,最后10S后將拓撲kill掉。 
 (2)然后在bolt中定義hook:
完整代碼如下:
package com.lujinhong.demo.storm.hook;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils;import java.util.Map;public class ExclamationTopology {public static class ExclamationBolt extends BaseRichBolt {OutputCollector _collector;@Overridepublic void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;context.addTaskHook(new TraceTaskHook());}@Overridepublic void execute(Tuple tuple) {_collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));_collector.ack(tuple);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word", new TestWordSpout(), 10);builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");Config conf = new Config();conf.setDebug(true);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());}else {LocalCluster cluster = new LocalCluster();cluster.submitTopology("test", conf, builder.createTopology());Utils.sleep(10000);cluster.killTopology("test");cluster.shutdown();}} }3、運行結果 
 類似于以下的內容
(三)hook的類型
storm在BaseTaskHook中支持的類型可見下面的類定義。一般而言,就是在topo發生某項事件(如發射,確認,cleanup等)執行一些操作
public class BaseTaskHook implements ITaskHook {@Overridepublic void prepare(Map conf, TopologyContext context) {}@Overridepublic void cleanup() {} @Overridepublic void emit(EmitInfo info) {}@Overridepublic void spoutAck(SpoutAckInfo info) {}@Overridepublic void spoutFail(SpoutFailInfo info) {}@Overridepublic void boltAck(BoltAckInfo info) {}@Overridepublic void boltFail(BoltFailInfo info) {}@Overridepublic void boltExecute(BoltExecuteInfo info) {} }(四)應用場景
1、可以用于調試代碼,比如確認某個方法(execute, prepare等)是否被調用
2、用于將topo中的一些信息發送出去,可以作為監控,日志等。
總結
以上是生活随笔為你收集整理的storm hook的使用的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: storm metric的使用说明
- 下一篇: git基础指南
