生活随笔
收集整理的這篇文章主要介紹了
【转】storm 开发系列一 第一个程序
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原文:?http://blog.csdn.net/csfreebird/article/details/49104777
-------------------------------------------------------------------------------------------------
本文將在本地開發環境創建一個storm程序,力求簡單。
首先用mvn創建一個簡單的工程hello_storm
?
[plain]?view plaincopy print?
mvn?archetype:generate?-DgroupId=org.csfreebird?-DartifactId=hello_storm?-DarchetypeArtifactId=maven-archetype-quickstart?-DinteractiveMode=false??
編輯pom.xml,添加dependency
?
?
[html]?view plaincopy print?
<project?xmlns="http://maven.apache.org/POM/4.0.0"?xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"????xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/maven-v4_0_0.xsd">????<modelVersion>4.0.0</modelVersion>????<groupId>org.csfreebird</groupId>????<artifactId>hello_storm</artifactId>????<version>0.9.5</version>????<packaging>jar</packaging>????<name>hello_storm</name>????<url>http://maven.apache.org</url>????<dependencies>??????<dependency>????????<groupId>org.apache.storm</groupId>????????<artifactId>storm-core</artifactId>????????<version>${project.version}</version>??????????????<scope>provided</scope>??????</dependency>????</dependencies>??</project>??
provided 表示storm-core的jar包只作為編譯和測試時使用,在集群環境下運行時完全依賴集群環境的storm-core的jar包。
?
?
然后重命名App.Java為HelloTopology.java文件,開始編碼。模仿之前的Example, 這里將所有的spout/bolt類都作為靜態類定義,就放在HelloTopology.java文件。
功能如下
?
編寫HelloTopology.java代碼,spout代碼來自于TestWordSpout,去掉了log的代碼,改變了_引導的成員變量命名方法
?
[plain]?view plaincopy print?
package?org.csfreebird;????import?backtype.storm.Config;??import?backtype.storm.LocalCluster;??import?backtype.storm.StormSubmitter;??import?backtype.storm.task.OutputCollector;??import?backtype.storm.task.TopologyContext;??import?backtype.storm.testing.TestWordSpout;??import?backtype.storm.topology.OutputFieldsDeclarer;??import?backtype.storm.topology.TopologyBuilder;??import?backtype.storm.topology.base.BaseRichBolt;??import?backtype.storm.topology.base.BaseRichSpout;??import?backtype.storm.tuple.Fields;??import?backtype.storm.tuple.Tuple;??import?backtype.storm.tuple.Values;??import?backtype.storm.utils.Utils;??import?backtype.storm.spout.SpoutOutputCollector;??import?java.util.Map;??import?java.util.TreeMap;??import?java.util.Random;????public?class?HelloTopology?{????????????public?static?class?HelloSpout?extends?BaseRichSpout?{????????????boolean?isDistributed;??????SpoutOutputCollector?collector;????????public?HelloSpout()?{??????????this(true);??????}????????public?HelloSpout(boolean?isDistributed)?{??????????this.isDistributed?=?isDistributed;??????}????????????????public?void?open(Map?conf,?TopologyContext?context,?SpoutOutputCollector?collector)?{??????????this.collector?=?collector;??????}????????????public?void?close()?{??????}????????????????public?void?nextTuple()?{??????????Utils.sleep(100);??????????final?String[]?words?=?new?String[]?{"china",?"usa",?"japan",?"russia",?"england"};??????????final?Random?rand?=?new?Random();??????????final?String?word?=?words[rand.nextInt(words.length)];??????????this.collector.emit(new?Values(word));??????}????????????public?void?ack(Object?msgId)?{??????}????????public?void?fail(Object?msgId)?{??????}????????????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{??????????declarer.declare(new?Fields("word"));??????}????????@Override??????public?Map<String,?Object>?getComponentConfiguration()?{??????????if(!this.isDistributed)?{??????????Map<String,?Object>?ret?=?new?TreeMap<String,?Object>();??????????ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM,?1);??????????return?ret;??????????}?else?{??????????return?null;??????????}??????}??????}????????public?static?class?HelloBolt?extends?BaseRichBolt?{??????OutputCollector?collector;????????@Override??????public?void?prepare(Map?conf,?TopologyContext?context,?OutputCollector?collector)?{??????????this.collector?=?collector;??????}????????@Override??????public?void?execute(Tuple?tuple)?{??????????this.collector.emit(tuple,?new?Values("hello,"?+?tuple.getString(0)));??????????this.collector.ack(tuple);??????}????????@Override??????public?void?declareOutputFields(OutputFieldsDeclarer?declarer)?{??????????declarer.declare(new?Fields("word"));??????}??????}????????????public?static?void?main(String[]?args)?throws?Exception?{????????????TopologyBuilder?builder?=?new?TopologyBuilder();??????builder.setSpout("a",?new?HelloSpout(),?10);??????builder.setBolt("b",?new?HelloBolt(),?5).shuffleGrouping("a");????????Config?conf?=?new?Config();??????conf.setDebug(true);????????if?(args?!=?null?&&?args.length?>?0)?{??????????conf.setNumWorkers(3);??????????StormSubmitter.submitTopologyWithProgressBar(args[0],?conf,?builder.createTopology());??????}?else?{??????????String?test_id?=?"hello_test";??????????LocalCluster?cluster?=?new?LocalCluster();??????????cluster.submitTopology(test_id,?conf,?builder.createTopology());??????????Utils.sleep(10000);??????????cluster.killTopology(test_id);??????????cluster.shutdown();??????}??????}?????}??
編譯成功
?
?
[plain]?view plaincopy print?
mvn?clean?compile??
為了能夠在本地模式運行,需要在pom.xml中添加如下:
?
?
[html]?view plaincopy print?
<build>????<plugins>??????<plugin>????????<groupId>org.codehaus.mojo</groupId>????????<artifactId>exec-maven-plugin</artifactId>????????<version>1.2.1</version>????????<executions>??????????<execution>????????????<goals>??????????????<goal>exec</goal>????????????</goals>??????????</execution>????????</executions>????????<configuration>??????????<executable>java</executable>??????????<includeProjectDependencies>true</includeProjectDependencies>??????????<includePluginDependencies>false</includePluginDependencies>??????????<classpathScope>compile</classpathScope>??????????<mainClass>${storm.topology}</mainClass>????????</configuration>??????</plugin>????</plugins>??</build>??
然后運行命令
?
?
[plain]?view plaincopy print?
mvn?compile?exec:java?-Dstorm.topology=org.csfreebird.HelloTopology ? ?
總結
以上是生活随笔為你收集整理的【转】storm 开发系列一 第一个程序的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。