Strom+Kafka + redis实时计算单词出现频率的案例
案例要實現的目標
在Kafka的shell 客戶端中輸入內容,通過Storm實時去kafka中取數據并進行計算單詞出現的次數,并且實時把這些數據信息存儲到redis中。
代碼編寫
編寫Pom文件,代碼如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.toto.storm.kafkastormredis</groupId><artifactId>kafkastormredis</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.storm</groupId><artifactId>storm-core</artifactId><!--<scope>provided</scope>--><version>1.1.0</version></dependency><dependency><groupId>org.apache.storm</groupId><artifactId>storm-kafka</artifactId><version>1.1.0</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.7.3</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.8.2</artifactId><version>0.8.1</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><artifactId>maven-assembly-plugin</artifactId><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><!--告訴運行的主類是哪個,注意根據自己的情況,下面的包名做相應的修改--><mainClass>cn.toto.strom.wordcount.StormTopologyDriver</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><configuration><source>1.7</source><target>1.7</target></configuration></plugin></plugins></build> </project>在strom案例中需要有spout接收數據。在一些常規學習用的案例中通常從一個文件中獲取數據。通常的代碼如下:
package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/20.*/import org.apache.commons.lang.StringUtils; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields;import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.util.ArrayList; import java.util.List; import java.util.Map;/*** 這個類是模擬從文件中讀取數據的代碼。在本案例的strom + kafka + redis的案例中將用不到。** @author tuzq* @create 2017-06-20 23:41*/ public class MyLocalFileSpout extends BaseRichSpout {private SpoutOutputCollector collector;private BufferedReader bufferedReader;/*** 初始化方法* @param map* @param context* @param collector*/@Overridepublic void open(Map map, TopologyContext context, SpoutOutputCollector collector) {this.collector = collector;try {this.bufferedReader = new BufferedReader(new FileReader(new File("E:/wordcount/input/1.txt")));} catch (Exception e) {e.printStackTrace();}}/*** Strom實時計算的特性就是對數據一條一條的處理* while(true) {* this.nextTuple();* }*/@Overridepublic void nextTuple() {//每被調用一次就會發送一條數據出去try {String line = bufferedReader.readLine();if (StringUtils.isNotBlank(line)) {List<Object> arrayList = new ArrayList<Object>();arrayList.add(line);collector.emit(arrayList);}} catch(Exception e) {e.printStackTrace();}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("juzi"));}}在spout編寫完成之后,通常通過Bolt來進行文本的切割。在下面的切割代碼中,模擬的是從kafka中獲取數據,并進行切割。代碼如下:
package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;/*** 這個Bolt模擬從kafkaSpout接收數據,并把數據信息發送給MyWordCountAndPrintBolt的過程。** @author tuzq* @create 2017-06-21 9:14*/ public class MySplitBolt extends BaseBasicBolt {@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {//1、數據如何獲取//如果StormTopologyDriver中的spout配置的是MyLocalFileSpout,則用的是declareOutputFields中的juzi這個key//byte[] juzi = (byte[]) input.getValueByField("juzi");//2、這里用這個是因為StormTopologyDriver這個里面的spout用的是KafkaSpout,而KafkaSpout中的declareOutputFields返回的是bytes,所以下面用bytes,這個地方主要模擬的是從kafka中獲取數據byte[] juzi = (byte[]) input.getValueByField("bytes");//2、進行切割String[] strings = new String(juzi).split(" ");//3、發送數據for (String word : strings) {//Values對象幫我們生成一個listcollector.emit(new Values(word,1));}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word","num"));} }對文本信息進行切割之后,需要對數據進行統計,這里使用另外一個Bolt來完成,代碼如下:
package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import redis.clients.jedis.Jedis;import java.util.HashMap; import java.util.Map;/*** 用于統計分析,并且把統計分析的結果存儲到redis中。** @author tuzq* @create 2017-06-21 9:22*/ public class MyWordCountAndPrintBolt extends BaseBasicBolt {private Jedis jedis;private Map<String,String> wordCountMap = new HashMap<String,String>();@Overridepublic void prepare(Map stormConf, TopologyContext context) {//連接redis---代表可以連接任何事物jedis = new Jedis("hadoop11",6379);super.prepare(stormConf,context);}@Overridepublic void execute(Tuple input, BasicOutputCollector collector) {String word = (String) input.getValueByField("word");Integer num = (Integer) input.getValueByField("num");//1、查看單詞對應的value是否存在Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word));if (integer == null || integer.intValue() == 0) {wordCountMap.put(word,num + "");} else {wordCountMap.put(word,(integer.intValue() + num) + "");}//2、保存到redisSystem.out.println(wordCountMap);//redis key wordcount:-->Mapjedis.hmset("wordcount",wordCountMap);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {//todo 不需要定義輸出的字段} }接下來通過一個Driver串聯起Spout、Bolt實現實時計算,代碼如下:
package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.kafka.KafkaSpout; import org.apache.storm.kafka.SpoutConfig; import org.apache.storm.kafka.ZkHosts; import org.apache.storm.topology.TopologyBuilder;/*** 這個Driver使Kafka、strom、redis進行串聯起來。** 這個代碼執行前需要創建kafka的topic,創建代碼如下:* [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount** 接著還要向kafka中傳遞數據,打開一個shell的producer來模擬生產數據* [root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount* 接著輸入數據** @author tuzq* @create 2017-06-21 9:39*/ public class StormTopologyDriver {public static void main(String[] args) throws Exception {//1、準備任務信息TopologyBuilder topologyBuilder = new TopologyBuilder();topologyBuilder.setSpout("KafkaSpout",new KafkaSpout(new SpoutConfig(new ZkHosts("hadoop11:2181"),"wordCount","/wordCount","wordCount")),2);topologyBuilder.setBolt("bolt1",new MySplitBolt(),4).shuffleGrouping("KafkaSpout");topologyBuilder.setBolt("bolt2",new MyWordCountAndPrintBolt(),2).shuffleGrouping("bolt1");//2、任務提交//提交給誰?提交內容Config config = new Config();config.setNumWorkers(2);StormTopology stormTopology = topologyBuilder.createTopology();//本地模式LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("wordcount",config,stormTopology);//集群模式//StormSubmitter.submitTopology("wordcount1",config,stormTopology);} }運行程序
1、啟動Kafka集群,啟動方式參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73430874
2、啟動redis,啟動和安裝方式參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/43611535
3、在kafka上創建topic,參考博文:
http://blog.csdn.net/tototuzuoquan/article/details/73432256
這里我們使用:
//創建kafka的topic
[root@hadoop1 ~]# cd $KAFKA_HOME [root@hadoop1 kafka]# bin/kafka-topics.sh --create --zookeeper hadoop11:2181 --replication-factor 1 -partitions 3 --topic wordCount接下來創建producer,來發送數據到kafka:
[root@hadoop1 kafka]# bin/kafka-console-producer.sh --broker-list hadoop1:9092 --topic wordCount在上面輸入數據。
4、運行程序,進入StormTopologyDriver,右鍵run.最后的效果如下:
5、最后如果想看MyWordCountAndPrintBolt中記錄到redis的wordcount內容,可以編寫如下代碼案例:
package cn.toto.storm.kafkastormredis;/*** Created by toto on 2017/6/21.*/import redis.clients.jedis.Jedis;import java.util.Map;/*** 代碼說明** @author tuzq* @create 2017-06-21 10:13*/ public class TestRedis {public static void main(String[] args) {Jedis jedis = new Jedis("hadoop11",6379);Map<String,String> wordcount = jedis.hgetAll("wordcount");System.out.println(wordcount);} }運行后的結果如下:
總結
以上是生活随笔為你收集整理的Strom+Kafka + redis实时计算单词出现频率的案例的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 基金新闻有真有假,你会判断吗?
- 下一篇: 磁条交易被拒绝怎么办