关于Storm Tick
轉(zhuǎn)載自kqdongnanf-博客園;Email:kqdongnanf@yahoo.com。
1. tick的功能
Apache Storm中內(nèi)置了一種定時(shí)機(jī)制——tick,它能夠讓任何bolt的所有task每隔一段時(shí)間(精確到秒級(jí),用戶可以自定義)收到一個(gè)來自__systemd的__tick stream的tick tuple,bolt收到這樣的tuple后可以根據(jù)業(yè)務(wù)需求完成相應(yīng)的處理。
Tick功能從Apache Storm 0.8.0版本開始支持,本文在Apache Storm 0.9.1上測試。
2. 在代碼中使用tick及其作用
在代碼中如需使用tick,可以參照下面的方式:
2.1. 為bolt設(shè)置tick
若希望某個(gè)bolt每隔一段時(shí)間做一些操作,那么可以將bolt繼承BaseBasicBolt/BaseRichBolt,并重寫getComponentConfiguration()方法。在方法中設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值,單位是秒。
getComponentConfiguration()是backtype.storm.topology.IComponent接口中定義的方法,在此方法的實(shí)現(xiàn)中可以定義以”Topology.*”開頭的此bolt特定的Config。
這樣設(shè)置之后,此bolt的所有task都會(huì)每隔一段時(shí)間收到一個(gè)來自__systemd的__tick stream的tick tuple,因此execute()方法可以實(shí)現(xiàn)如下:
2.2. 為Topology全局設(shè)置tick
若希望Topology中的每個(gè)bolt都每隔一段時(shí)間做一些操作,那么可以定義一個(gè)Topology全局的tick,同樣是設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS的值:
2.3. tick設(shè)置的優(yōu)先級(jí)
與Linux中的環(huán)境變量的優(yōu)先級(jí)類似,storm中的tick也有優(yōu)先級(jí),即全局tick的作用域是全局bolt,但對(duì)每個(gè)bolt其優(yōu)先級(jí)低于此bolt定義的tick。
這個(gè)參數(shù)的名字TOPOLOGY_TICK_TUPLE_FREQ_SECS具有一定的迷惑性,一眼看上去應(yīng)該是Topology全局的,但實(shí)際上每個(gè)bolt也可以自己定義。
2.4. tick的精確度
Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS是精確到秒級(jí)的。例如某bolt設(shè)置Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS為10s,理論上說bolt的每個(gè)task應(yīng)該每個(gè)10s收到一個(gè)tick tuple。
實(shí)際測試發(fā)現(xiàn),這個(gè)時(shí)間間隔的精確性是很高的,一般延遲(而不是提前)時(shí)間在1ms左右。測試環(huán)境:3臺(tái)虛擬機(jī)做supervisor,每臺(tái)配置:4Cpu、16G內(nèi)存、千兆網(wǎng)卡。
3. storm tick的實(shí)現(xiàn)原理
在bolt中的getComponentConfiguration()定義了該bolt的特定的配置后,storm框架會(huì)在TopologyBuilder.setBolt()方法中調(diào)用bolt的getComponentConfiguration()方法,從而設(shè)置該bolt的配置。
調(diào)用路徑為:TopologyBuilder.setBolt()-> TopologyBuilder.initCommon()-> getComponentConfiguration()
4. 附件
測試使用的代碼:
package storm.starter;import backtype.storm.Config; import backtype.storm.Constants; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.ShellBolt; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import storm.starter.spout.RandomSentenceSpout;import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map;public class MyTickTestTopology {public static class WordCount extends BaseBasicBolt {Map<String, Integer> counts = new HashMap<String, Integer>();@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {if (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){System.out.println("################################WorldCount bolt: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));}else{collector.emit(new Values("a", 1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word", "count"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,10);return conf;}}public static class TickTest extends BaseBasicBolt{@Overridepublic void execute(Tuple tuple, BasicOutputCollector collector) {// 收到的tuple是tick tupleif (tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)){System.out.println("################################TickTest bolt: "+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS").format(new Date()));}// 收到的tuple時(shí)正常的tupleelse{collector.emit(new Values("a"));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("test"));}@Overridepublic Map<String, Object> getComponentConfiguration() {Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS,20);return conf;}}public static void main(String[] args) throws Exception {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("spout", new RandomSentenceSpout(), 3);builder.setBolt("count", new WordCount(), 3).shuffleGrouping("spout");builder.setBolt("tickTest", new TickTest(), 3).shuffleGrouping("count");Config conf = new Config();conf.put(conf.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 7);conf.setDebug(false);if (args != null && args.length > 0) {conf.setNumWorkers(3);StormSubmitter.submitTopology(args[0], conf, builder.createTopology());}else {conf.setMaxTaskParallelism(3);LocalCluster cluster = new LocalCluster();cluster.submitTopology("word-count", conf, builder.createTopology());// Thread.sleep(10000); // cluster.shutdown();}} }總結(jié)
以上是生活随笔為你收集整理的关于Storm Tick的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: YARN体系学习笔记
- 下一篇: Leet Code OJ 344. Re