Sorm进阶(1):storm实现github提交数监控看板
生活随笔
收集整理的這篇文章主要介紹了
Sorm进阶(1):storm实现github提交数监控看板
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
1. 基礎(chǔ)組件及其API
- storm中有關(guān)spout類(lèi)的層次
在本例中,spout基于github的API監(jiān)控某指定項(xiàng)目倉(cāng)庫(kù)的動(dòng)態(tài),并將變動(dòng)情況發(fā)射為元組,每個(gè)元組包含針對(duì)該倉(cāng)庫(kù)的全部提交消息。緊接著,spout類(lèi)文件的changelog.txt文件包含了所期望格式的提交消息,如下所示:
代碼實(shí)現(xiàn):
import org.apache.logging.log4j.core.util.IOUtils; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.spout.SpoutOutputCollector; //發(fā)射元組 import org.apache.storm.topology.OutputFieldsDeclarer; // 為spout發(fā)射的所有元組定義字段命名 import org.apache.storm.tuple.Fields; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Values;import java.io.IOException; import java.nio.charset.Charset; import java.util.List; import java.lang.String; import java.util.Map;public class CommitFeedListener extends BaseRichSpout {private SpoutOutputCollector outputCollector; // 負(fù)責(zé)發(fā)射元組或者讓元組失效private List<String> commits; //從 changelog.txt文件中讀取提交的消息列表// 為spout發(fā)射的所有元組定義字段命名public void declareOutputFields(OutputFieldsDeclarer declarer){// Field類(lèi)構(gòu)造函數(shù)中的命名順序,必須與發(fā)射元組中值的順序匹配,而發(fā)射元值的順序由Value類(lèi)來(lái)定義declarer.declare(new Fields("commit")); // spout發(fā)射一個(gè)命名為commit的字段}// storm準(zhǔn)備運(yùn)行spout時(shí)候調(diào)用,連接數(shù)據(jù)源public void open(Map configMap,TopologyContext context,SpoutOutputCollector outputCollector){this.outputCollector = outputCollector;try{ // 讀取changelog.txt到Listcommits = org.apache.storm.shade.org.apache.commons.io.IOUtils.readLines(ClassLoader.getSystemResourceAsStream("changelog.txt"),Charset.defaultCharset().name());} catch(IOException e) {throw new RuntimeException(e);}}public void nextTuple(){ // 當(dāng)spout讀取下一個(gè)元組時(shí),由storm定時(shí)調(diào)用,數(shù)據(jù)源準(zhǔn)備好一個(gè)完整的數(shù)據(jù)之后才會(huì)觸發(fā)for (String commit:commits) {// 為每個(gè)提交消息發(fā)射一個(gè)元組outputCollector.emit(new Values(commit));}} }- storm中有關(guān)bolt的類(lèi)層次
這里我們需要用到兩個(gè)bolt,可以直接繼承BaseBasicBolt,一個(gè)負(fù)責(zé)從元組中接受完整的提交消息,并提取提交Github代碼用戶的email地址,然后發(fā)射包含email地址的元組;另一個(gè)bolt維護(hù)一個(gè)內(nèi)存映射表,并在映射表中更新用戶提交的次數(shù)。
代碼實(shí)現(xiàn):
import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values;public class EmailExtractor extends BaseBasicBolt{//public void declareOutputFields(OutputFieldsDeclarer declearer){// 用于聲明bolt發(fā)射的元組中字段的命名為emaildeclearer.declare(new Fields("email"));}// 當(dāng)一個(gè)tuple被發(fā)射到該bolt上時(shí)被調(diào)用public void execute(Tuple tuple,BasicOutputCollector outputCollector) {// 獲取字段為commit的值String commit = tuple.getStringByField("commit");String[] parts = commit.split(" ");// 發(fā)射一個(gè)字段為email的新元組outputCollector.emit(new Values(parts[1]));} } import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple;import java.util.HashMap; import java.util.Map;public class EmailCounter extends BaseBasicBolt{private Map<String, Integer> counts;public void declareOutputFields(OutputFieldsDeclarer declarer){// does not emit anything}private Integer countFor(String email){Integer count = counts.get(email);return count == null? 0:count;}private void printCounts(){for(String email:counts.keySet()){System.out.println(String.format("s% has count of %s", email, counts.get(email)));}}// 獲取字段為email的值public void execute(Tuple tuple,BasicOutputCollector outputCollector){String email = tuple.getStringByField("email");counts.put(email, countFor(email) + 1);printCounts();}// storm在bolt準(zhǔn)備運(yùn)行時(shí)調(diào)用public void prepare(Map config,TopologyContext context){counts = new HashMap<String, Integer>();} }2. Storm實(shí)現(xiàn)
完成spout和bolt部分,我們需要告訴storm數(shù)據(jù)流的位置以及每個(gè)流的分組策略,并構(gòu)建spout-bolt的拓?fù)溆?jì)算圖。作為統(tǒng)籌的環(huán)節(jié),需要完成三項(xiàng)工作:
- 構(gòu)建拓?fù)溆?jì)算圖,并告訴storm數(shù)據(jù)流的位置,以及指明每個(gè)數(shù)據(jù)流的流分組策略
- 創(chuàng)建配置,建議打開(kāi)日志
- 生成拓?fù)?#xff0c;連同配置提交到storm集群,最后kill并關(guān)閉
示例代碼:
import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.utils.Utils;public class LocalTopologyRunner {private static final int TEN_MINUTES = 600000;// 構(gòu)建拓?fù)溆?jì)算圖,將spout-bolt鏈接起來(lái)// 本地運(yùn)行拓?fù)涞膍ainpublic static void main (String[] args){TopologyBuilder builder = new TopologyBuilder();// 添加spout到拓?fù)鋱D,并指定idbuilder.setSpout("commit-feed-listener", new CommitFeedListener());// 添加listener-bolt到拓?fù)鋱D,并鏈接commit-spoutbuilder.setBolt("email-extractor", new EmailExtractor()).shuffleGrouping("commit-feed-listener");// 添加count-bolt到拓?fù)鋱D,并鏈接extractoe-boltbuilder.setBolt("email-counter", new EmailCounter()).shuffleGrouping("email-extractor");// 拓?fù)鋵拥呐渲妙?lèi),為了保持調(diào)試,開(kāi)啟了debugConfig config = new Config();config.setDebug(true);// 創(chuàng)建拓?fù)溆?jì)算圖StormTopology topology = builder.createTopology();// 定義本地集群LocalCluster cluster = new LocalCluster();// 提交拓?fù)溆?jì)算圖,并配置到本地集群cluster.submitTopology("github-commit-count", config, topology);Utils.sleep(TEN_MINUTES);cluster.killTopology("github-commit-count");cluster.shutdown();} }3. 小結(jié)
- 一個(gè)拓?fù)溆?jì)算圖是一個(gè)結(jié)點(diǎn)圖集,圖中每個(gè)節(jié)點(diǎn)代表一個(gè)進(jìn)程或者計(jì)算處理,每條邊代表上個(gè)節(jié)點(diǎn)計(jì)算的輸出,下個(gè)結(jié)點(diǎn)計(jì)算的輸入
- 元組是一個(gè)有序地?cái)?shù)值序列,其中每個(gè)數(shù)值都被賦予一個(gè)命名
- spout是數(shù)據(jù)流的源頭,目的就是為了從數(shù)據(jù)源讀取數(shù)據(jù),并且發(fā)射元組作為輸出流到數(shù)據(jù)流中
- bolt是實(shí)現(xiàn)核心業(yè)務(wù)邏輯的地方,執(zhí)行過(guò)濾/聚合/連接或者數(shù)據(jù)庫(kù)交互等操作
- spout/bolt都可以執(zhí)行一個(gè)或者多個(gè)實(shí)例,這個(gè)是線程控制的
- main中需要完成統(tǒng)籌的工作,包括添加節(jié)點(diǎn)到拓?fù)鋱D,配置拓?fù)鋱D,創(chuàng)建拓?fù)鋱D,最后將拓?fù)鋱D提交到storm集群
- 所有的結(jié)點(diǎn)都有id唯一識(shí)別,所有的元組都有自己的字段名,并且結(jié)點(diǎn)上聲明的Fieds類(lèi)型的字段名必須與同節(jié)點(diǎn)發(fā)射的Value類(lèi)型字段名相同
總結(jié)
以上是生活随笔為你收集整理的Sorm进阶(1):storm实现github提交数监控看板的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 美国凤凰号探测器从火星土壤中提取到水
- 下一篇: 问一个AddDevice和设备符号链的问