【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析
生活随笔
收集整理的這篇文章主要介紹了
【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
前言:閱讀筆記
storm和hadoop集群非常像。hadoop執行mr。storm執行topologies。 mr和topologies最關鍵的不同點是:mr執行終于會結束,而topologies永遠執行直到你kill。 storm集群有兩種節點:master和worker。 master執行一個后臺進程Nimbus,和hadoop的jobtracker相似。 Nimbus負責在集群中分發代碼。為工作節點分配任務,并監控故障。worker執行一個后臺進程Supervisor。 supervisor監聽分配來的任務,啟動和停止worker進程去處理nimbus分配來的任務。 每一個worker進程執行拓撲的一個子集;一個執行的拓撲結構由非常多分布在不同機器的worker進程構成。
全部nimbus和supervisor之間的協調工作是有zk集群來做的。 此外。nimbus和supervisor是fail-fast和stateless;全部狀態保存在zk或者本地磁盤。 守護進程能夠是無狀態的并且失效或重新啟動時不會影響整個系統的健康。 執行storm storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2 storm jar負責連接nimbus而且上傳jar。 原始主要的storm提供了spouts和bolts做流轉換。
spouts和bolts是執行應用邏輯要實現的接口。
spout是流的源。讀進數據并以流的形式發送出去; bolt消費輸入的流,處理或者以新的流發送出去。Storm會自己主動又一次分配失敗的任務,而且storm保證不會有數據丟失。即使機器宕機
下載安裝
http://storm.apache.org/downloads.html
1、依賴安裝
yum install uuid -y yum install e2fsprogs -y yum install libuuid* yum install libtool -y yum install *c++* -y yum install git -y2、zk集群
http://blog.csdn.net/simonchi/article/details/43019401
3、zeromq&jzmq
tar -xzvf zeromq-4.0.5.tar.gz ./autogen.sh ./configure && make && make install jzmq git clone git://github.com/nathanmarz/jzmq.git ./autogen.sh ./configure && make && make install4、python
./configure make make install rm -f ?/usr/bin/python ln /usr/local/bin/python3.4 /usr/bin/python python -V vi /usr/bin/yum #!/usr/bin/python 改為 #!/usr/bin/python2.4配置執行
storm.yaml
storm.zookeeper.servers: - 192.168.11.176 - 192.168.11.177 - 192.168.11.178 storm.zookeeper.port: 2181 nimbus.host: "192.168.11.176" storm.local.dir: "/home/storm/workdir" supervisor.slots.ports: - 6700 - 6701 - 6702 - 67031、Nimbus: 在Storm主控節點上執行"bin/storm nimbus >/dev/null 2>&1 &"啟動Nimbus后臺程序,并放到后臺執行。 2、Supervisor: 在Storm各個工作節點上執行"bin/storm supervisor >/dev/null 2>&1 &"啟動Supervisor后臺程序,并放到后臺執行; 3、UI: 在Storm主控節點上執行"bin/storm ui >/dev/null 2>&1 &"啟動UI后臺程序。并放到后臺執行,啟動后能夠通過http://{nimbus host}:8080觀察集群的worker資源使用情況、Topologies的執行狀態等信息。
單詞計數程序
Spout
package com.cmcc.chiwei.storm;import java.io.BufferedReader; import java.io.FileNotFoundException; import java.io.FileReader; import java.util.Map; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;public class WordReader extends BaseRichSpout {private static final long serialVersionUID = 1L;private SpoutOutputCollector collector;private FileReader fileReader;private boolean completed = false;public boolean isDistributed() {return false;}public void ack(Object msgId) {System.out.println("OK:"+msgId);}public void close() {}public void fail(Object msgId) {System.out.println("FAIL:"+msgId);}/*** The only thing that the methods will do It is emit each * file line*/public void nextTuple() {if(completed){try {Thread.sleep(1000);} catch (InterruptedException e) {//Do nothing}return;}String str;//Open the readerBufferedReader reader = new BufferedReader(fileReader);try{//Read all lineswhile((str = reader.readLine()) != null){/*** By each line emmit a new value with the line as a their*/this.collector.emit(new Values(str),str);}}catch(Exception e){throw new RuntimeException("Error reading tuple",e);}finally{completed = true;}}/*** We will create the file and get the collector object*/public void open(Map conf, TopologyContext context,SpoutOutputCollector collector) {try {this.fileReader = new FileReader(conf.get("wordsFile").toString());} catch (FileNotFoundException e) {throw new RuntimeException("Error reading file ["+conf.get("wordFile")+"]");}this.collector = collector;}/*** Declare the output field "word"*/public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("line"));} }Bolt1
package com.cmcc.chiwei.storm;import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;public class WordNormalizer extends BaseBasicBolt {private static final long serialVersionUID = 1L;public void cleanup() {}/*** The bolt will receive the line from the* words file and process it to Normalize this line* * The normalize will be put the words in lower case* and split the line to get all words in this */public void execute(Tuple input, BasicOutputCollector collector) {String sentence = input.getString(0);String[] words = sentence.split(" ");for(String word : words){word = word.trim();if(!word.isEmpty()){word = word.toLowerCase();collector.emit(new Values(word));}}}/*** The bolt will only emit the field "word" */public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} }Bolt2
package com.cmcc.chiwei.storm;import java.util.HashMap; import java.util.Map;import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple;public class WordCounter extends BaseBasicBolt {private static final long serialVersionUID = 1L;Integer id;String name;Map<String, Integer> counters;/*** At the end of the spout (when the cluster is shutdown* We will show the word counters*/@Overridepublic void cleanup() {System.out.println("-- Word Counter ["+name+"-"+id+"] --");for(Map.Entry<String, Integer> entry : counters.entrySet()){System.out.println(entry.getKey()+": "+entry.getValue());}}/*** On create */@Overridepublic void prepare(Map stormConf, TopologyContext context) {this.counters = new HashMap<String, Integer>();this.name = context.getThisComponentId();this.id = context.getThisTaskId();}public void declareOutputFields(OutputFieldsDeclarer declarer) {}public void execute(Tuple input, BasicOutputCollector collector) {String str = input.getString(0);if(!counters.containsKey(str)){counters.put(str, 1);}else{Integer c = counters.get(str) + 1;counters.put(str, c);}} }Topology
package com.cmcc.chiwei.storm;import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields;public class TopologyMain {public static void main(String[] args) throws InterruptedException {//Topology創建拓撲,安排storm各個節點以及它們交換數據的方式TopologyBuilder builder = new TopologyBuilder();builder.setSpout("word-reader",new WordReader());builder.setBolt("word-normalizer", new WordNormalizer()).shuffleGrouping("word-reader");builder.setBolt("word-counter", new WordCounter(),1).fieldsGrouping("word-normalizer", new Fields("word"));//ConfigurationConfig conf = new Config();conf.put("wordsFile", args[0]);conf.setDebug(false);//Topology runconf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1);LocalCluster cluster = new LocalCluster();cluster.submitTopology("Getting-Started-Topology", conf, builder.createTopology());Thread.sleep(2000);cluster.shutdown();} }words.txt
hello world storm flume hadoop hdfs what's wrong flume ? what's up hdfs ? Hi,storm,what are you doing ?執行結果
OK:hello OK:world OK:storm flume hadoop hdfs OK:what's wrong flume ?OK:what's up hdfs ? OK:Hi,storm,what are you doing ?
-- Word Counter [word-counter-2] -- what's: 2 flume: 2 hdfs: 2 you: 1 storm: 1 up: 1 hello: 1 hadoop: 1 hi,storm,what: 1 are: 1 doing: 1 wrong: 1 ?
: 3 world: 1
分析內容: spout 讀取原始數據,為bolt提供數據。 bolt 從spout或其他bolt接收數據并處理。處理結果可作為其他bolt的數據源或終于結果。 nimbus 主節點的守護進程。負責為工作節點分發任務。topology 拓撲結構。storm的一個任務單元。 define fields定義域,由spout和bolt提供。被bolt接收。 一個storm集群就是在一連串的bolt之間轉換spout傳過來的數據。 如: spout讀到一行文本,文本行傳給一個bolt。按單詞分割后傳給還有一個bolt,第二個bolt做計數累加。
Spout
open --> nextTuple
Bolt1
declareOutputFields --> execute
Bolt2
prepare --> execute --> cleanup
更具體的內容,將在興許慢慢解說,我也在研究中。
。。
。。
望各位不吝不吝賜教!。
轉載于:https://www.cnblogs.com/yfceshi/p/7088828.html
總結
以上是生活随笔為你收集整理的【Storm】storm安装、配置、使用以及Storm单词计数程序的实例分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 夺命雷公狗---linux NO:17
- 下一篇: sqlserver字段选择参照