Storm的StreamID使用样例(版本1.0.2)
生活随笔
收集整理的這篇文章主要介紹了
Storm的StreamID使用样例(版本1.0.2)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
隨手嘗試了一下StreamID的的用法。留個筆記。
?
==數據樣例==
{"Address": "小橋鎮小橋中學對面","CityCode": "511300","CountyCode": "511322","EnterpriseCode": "YUNDA","MailNo": "667748320345","Mobile": "183****5451","Name": "王***","ProvCode": "510000","Weight": "39" }?
==拓撲結構==
?
==程序源碼==
<Spout1>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import common.simulate.DataRandom; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values;import java.util.Map;public class Spout1 extends BaseRichSpout {private SpoutOutputCollector _collector = null;private DataRandom _dataRandom = null;private int _timeInterval = 1000;@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream1", new Fields("json"));declarer.declareStream("Stream2", new Fields("json"));}@Overridepublic void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {_collector = collector;_dataRandom = DataRandom.getInstance();if (conf.containsKey(Constants.SpoutInterval)) {_timeInterval = Integer.valueOf((String) conf.get(Constants.SpoutInterval));}}@Overridepublic void nextTuple() {try {Thread.sleep(_timeInterval);} catch (InterruptedException e) {e.printStackTrace();}JSONObject jsonObject = _dataRandom.getRandomExpressData();System.out.print("[---Spout1---]jsonObject=" + jsonObject + "\n");_collector.emit("Stream1", new Values(jsonObject.toJSONString()));_collector.emit("Stream2", new Values(jsonObject.toJSONString()));} }?
<CountBolt1>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map;public class CountBolt1 extends BaseRichBolt {private OutputCollector _collector = null;private int taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declareStream("Stream3", new Fields("company", "count"));}@Overridepublic void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {_collector = collector;taskId = context.getThisTaskId();}@Overridepublic void execute(Tuple input) {String str = input.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String company = jsonObject.getString(Constants.EnterpriseCode);int count = 0;if (_map.containsKey(company)) {count = _map.get(company);}count++;_map.put(company, count);_collector.emit("Stream3", new Values(company, count));System.out.print("[---CountBolt1---]" +"taskId=" + taskId + ", company=" + company + ", count=" + count + "\n");} }?
<CountBolt2>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt2 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String prov = jsonObject.getString(Constants.ProvCode);int count = 0;if (_map.containsKey(prov)) {count = _map.get(prov);}count++;_map.put(prov, count);_collector.emit("Stream4", new Values(prov, count, UUID.randomUUID()));System.out.print("[---CountBolt2---]" +"taskId=" + _taskId + ", prov=" + prov + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("prov", "count", "random"));} }?
<CountBolt3>
package test;import com.alibaba.fastjson.JSONObject; import common.constants.Constants; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;import java.util.HashMap; import java.util.Map; import java.util.UUID;public class CountBolt3 extends BaseRichBolt {private OutputCollector _collector = null;private int _taskId = 0;private Map<String, Integer> _map = new HashMap<>();@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {_collector = outputCollector;_taskId = topologyContext.getThisTaskId();}@Overridepublic void execute(Tuple tuple) {String str = tuple.getStringByField("json");JSONObject jsonObject = JSONObject.parseObject(str);String city = jsonObject.getString(Constants.CityCode);int count = 0;if (_map.containsKey(city)) {count = _map.get(city);}count++;_map.put(city, count);_collector.emit("Stream4", new Values(city, count));System.out.print("[---CountBolt3---]" +"taskId=" + _taskId + ", city=" + city + ", count=" + count + "\n");}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declareStream("Stream4", new Fields("city", "count"));} }?
<TopBolt>
package test;import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple;import java.util.List; import java.util.Map;public class TopBolt extends BaseRichBolt {@Overridepublic void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {}@Overridepublic void execute(Tuple tuple) {System.out.print("[---TopBolt---]StreamID=" + tuple.getSourceStreamId() + "\n");List<Object> values = tuple.getValues();for(Object value : values) {System.out.print("[---TopBolt---]value=" + value + "\n");}}@Overridepublic void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {} }?
<TestTopology>
package test;import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.StormSubmitter; import org.apache.storm.generated.AlreadyAliveException; import org.apache.storm.generated.AuthorizationException; import org.apache.storm.generated.InvalidTopologyException; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields;public class TestTopology {public static void main(String[] args)throws InvalidTopologyException, AuthorizationException, AlreadyAliveException {TopologyBuilder builder = new TopologyBuilder();builder.setSpout("Spout1", new Spout1());builder.setBolt("Count1", new CountBolt1()).shuffleGrouping("Spout1", "Stream1");builder.setBolt("Count2", new CountBolt2()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Count3", new CountBolt3()).shuffleGrouping("Spout1", "Stream2");builder.setBolt("Top", new TopBolt()).fieldsGrouping("Count1", "Stream3", new Fields("company")).fieldsGrouping("Count2", "Stream4", new Fields("prov")).fieldsGrouping("Count3", "Stream4", new Fields("city"));Config config = new Config();config.setNumWorkers(1);config.put(common.constants.Constants.SpoutInterval, args[1]);if (Boolean.valueOf(args[0])) {StormSubmitter.submitTopology("TestTopology1", config, builder.createTopology());} else {LocalCluster localCluster = new LocalCluster();localCluster.submitTopology("TestTopology1", config, builder.createTopology());}} }?
==結果日志==
[---Spout1---]jsonObject={"CityCode":"511300","CountyCode":"511322","Address":"小橋鎮小橋中學對面","MailNo":"667748320345","ProvCode":"510000","Mobile":"183****5451","EnterpriseCode":"YUNDA","Weight":"39","Name":"王***"} [---CountBolt1---]taskId=1, company=YUNDA, count=1 [---CountBolt3---]taskId=3, city=511300, count=1 [---CountBolt2---]taskId=2, prov=510000, count=1 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=510000 [---TopBolt---]value=1 [---TopBolt---]value=99bd1cdb-d5c1-4ac8-b1a1-a4cfffb5a616 [---TopBolt---]StreamID=Stream4 [---TopBolt---]value=511300 [---TopBolt---]value=1 [---TopBolt---]StreamID=Stream3 [---TopBolt---]value=YUNDA [---TopBolt---]value=1?
轉載于:https://www.cnblogs.com/quchunhui/p/8302192.html
總結
以上是生活随笔為你收集整理的Storm的StreamID使用样例(版本1.0.2)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kotlin——初级篇(六):空类型、空
- 下一篇: centos7下 vsftpd初使用