Strom的trident单词计数代码
生活随笔
收集整理的這篇文章主要介紹了
Strom的trident单词计数代码
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
?
1 /** 2 * 單詞計數(shù) 3 */ 4 public class LocalTridentCount { 5 6 public static class MyBatchSpout implements IBatchSpout { 7 8 Fields fields; 9 HashMap<Long, List<List<Object>>> batches = new HashMap<Long, List<List<Object>>>(); 10 11 public MyBatchSpout(Fields fields) { 12 this.fields = fields; 13 } 14 @Override 15 public void open(Map conf, TopologyContext context) { 16 } 17 18 @Override 19 public void emitBatch(long batchId, TridentCollector collector) { 20 List<List<Object>> batch = this.batches.get(batchId); 21 if(batch == null){ 22 batch = new ArrayList<List<Object>>(); 23 Collection<File> listFiles = FileUtils.listFiles(new File("d:\\stormtest"), new String[]{"txt"}, true); 24 for (File file : listFiles) { 25 List<String> readLines; 26 try { 27 readLines = FileUtils.readLines(file); 28 for (String line : readLines) { 29 batch.add(new Values(line)); 30 } 31 FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis())); 32 } catch (IOException e) { 33 e.printStackTrace(); 34 } 35 36 } 37 if(batch.size()>0){ 38 this.batches.put(batchId, batch); 39 } 40 } 41 for(List<Object> list : batch){ 42 collector.emit(list); 43 } 44 } 45 46 @Override 47 public void ack(long batchId) { 48 this.batches.remove(batchId); 49 } 50 51 @Override 52 public void close() { 53 } 54 55 @Override 56 public Map getComponentConfiguration() { 57 Config conf = new Config(); 58 conf.setMaxTaskParallelism(1); 59 return conf; 60 } 61 62 @Override 63 public Fields getOutputFields() { 64 return fields; 65 } 66 67 } 68 69 /** 70 * 對一行行的數(shù)據(jù)進行切割成一個個單詞 71 */ 72 public static class MySplit extends BaseFunction{ 73 74 @Override 75 public void execute(TridentTuple tuple, TridentCollector collector) { 76 String line = tuple.getStringByField("lines"); 77 String[] words = line.split("\t"); 78 for (String word : words) { 79 collector.emit(new Values(word)); 80 } 81 } 82 83 } 84 85 public static class MyWordAgge extends BaseAggregator<Map<String, Integer>>{ 86 87 @Override 88 public Map<String, Integer> init(Object batchId, 89 TridentCollector collector) { 90 return new HashMap<String, Integer>(); 91 } 92 93 @Override 94 public void aggregate(Map<String, Integer> val, TridentTuple tuple, 95 TridentCollector collector) { 96 String key = tuple.getString(0); 97 /*Integer integer = val.get(key); 98 if(integer==null){ 99 integer=0; 100 } 101 integer++; 102 val.put(key, integer);*/ 103 val.put(key, MapUtils.getInteger(val, key, 0)+1); 104 } 105 106 @Override 107 public void complete(Map<String, Integer> val, 108 TridentCollector collector) { 109 collector.emit(new Values(val)); 110 } 111 112 } 113 114 /** 115 * 匯總局部的map,并且打印結果 116 * 117 */ 118 public static class MyCountPrint extends BaseFunction{ 119 120 HashMap<String, Integer> hashMap = new HashMap<String, Integer>(); 121 @Override 122 public void execute(TridentTuple tuple, TridentCollector collector) { 123 Map<String, Integer> map = (Map<String, Integer>)tuple.get(0); 124 for (Entry<String, Integer> entry : map.entrySet()) { 125 String key = entry.getKey(); 126 Integer value = entry.getValue(); 127 Integer integer = hashMap.get(key); 128 if(integer==null){ 129 integer=0; 130 } 131 hashMap.put(key, integer+value); 132 } 133 134 Utils.sleep(1000); 135 System.out.println("=================================="); 136 for (Entry<String, Integer> entry : hashMap.entrySet()) { 137 System.out.println(entry); 138 } 139 } 140 141 } 142 143 144 public static void main(String[] args) { 145 //大體流程:首先設置一個數(shù)據(jù)源MyBatchSpout,會監(jiān)控指定目錄下文件的變化,當發(fā)現(xiàn)有新文件的時候把文件中的數(shù)據(jù)取出來, 146 //然后封裝到一個batch中發(fā)射出來.就會對tuple中的數(shù)據(jù)進行處理,把每個tuple中的數(shù)據(jù)都取出來,然后切割..切割成一個個的單詞. 147 //單詞發(fā)射出來之后,會對單詞進行分組,會對一批假設有10個tuple,會對這10個tuple分完詞之后的單詞進行分組, 相同的單詞分一塊 148 //分完之后聚合 把相同的單詞使用同一個聚合器聚合 然后出結果 每個單詞出現(xiàn)多少次... 149 //進行匯總 先每一批數(shù)據(jù)局部匯總 最后全局匯總.... 150 //這個代碼也不是很簡單...挺多....就是使用批處理的方式. 151 152 TridentTopology tridentTopology = new TridentTopology(); 153 154 tridentTopology.newStream("spoutid", new MyBatchSpout(new Fields("lines"))) 155 .each(new Fields("lines"), new MySplit(), new Fields("word")) 156 .groupBy(new Fields("word"))//用到了分組 對一批tuple中的單詞進行分組.. 157 .aggregate(new Fields("word"), new MyWordAgge(), new Fields("wwwww"))//用到了聚合 158 .each(new Fields("wwwww"), new MyCountPrint(), new Fields("")); 159 160 LocalCluster localCluster = new LocalCluster(); 161 String simpleName = TridentMeger.class.getSimpleName(); 162 localCluster.submitTopology(simpleName, new Config(), tridentTopology.build()); 163 } 164 }指定路徑下文件中的內容:
程序運行結果:
?
轉載于:https://www.cnblogs.com/DreamDrive/p/6676021.html
總結
以上是生活随笔為你收集整理的Strom的trident单词计数代码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 作业帮app如何取消自动扣费项目(千万不
- 下一篇: PHP7安装扩展