生活随笔
收集整理的這篇文章主要介紹了
【2019春招准备:108.storm(3)】
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
8.DRPC
remote procedure call 遠程過程調用
跨網絡(跨越傳輸和應用兩層),跨進程
hadoopRPC
依賴hadoop client的RPC自己包
DRPC并不是一個storm的特性,可以單獨用,也可以放在一起用(將會很棒–form官網)
DRPC server進行協調:
拿到一個PRC請求,交給一個topology,將產生的結果返回給客戶端。(和PRC的調用流程基本一樣的)
最后是通過id匹配結果
之前不是遠程調用的時候,是直接new一個topologyBuilder;現在是分布式遠程調用,需要用的類是LinearDRPCTopologyBuilder
9.storm整合周邊框架的使用
可以整合hin多的框架
已經有一些現成的實現類:RedisLookupBolt RedisStoreBolt RedisFilterBolt
還是舉一個wordcount的例子,不過最后輸出到ziboris3:6379的redis上面,通過rdm進行結果查看
代碼見最后LocalWordCountRedisStormTopology.java
ConnectionProvider
JDBCMapper
JDBCInsertBolt(寫入table的bolt)
待更新
待更新
待更新
=======================================================================
LocalWordCountRedisStormTopology.java
import org
.apache
.storm
.Config
;
import org
.apache
.storm
.LocalCluster
;
import org
.apache
.storm
.redis
.bolt
.RedisStoreBolt
;
import org
.apache
.storm
.redis
.common
.config
.JedisPoolConfig
;
import org
.apache
.storm
.redis
.common
.mapper
.RedisDataTypeDescription
;
import org
.apache
.storm
.redis
.common
.mapper
.RedisStoreMapper
;
import org
.apache
.storm
.spout
.SpoutOutputCollector
;
import org
.apache
.storm
.task
.OutputCollector
;
import org
.apache
.storm
.task
.TopologyContext
;
import org
.apache
.storm
.topology
.OutputFieldsDeclarer
;
import org
.apache
.storm
.topology
.TopologyBuilder
;
import org
.apache
.storm
.topology
.base
.BaseRichBolt
;
import org
.apache
.storm
.topology
.base
.BaseRichSpout
;
import org
.apache
.storm
.tuple
.Fields
;
import org
.apache
.storm
.tuple
.ITuple
;
import org
.apache
.storm
.tuple
.Tuple
;
import org
.apache
.storm
.tuple
.Values
;
import org
.apache
.storm
.utils
.Utils
;import java
.util
.HashMap
;
import java
.util
.Map
;
import java
.util
.Random
;
public class LocalWordCountRedisStormTopology {public static class DataSourceSpout extends BaseRichSpout {private SpoutOutputCollector collector
;public static final String
[] words
=new String[]{"apple","orange","banana","pinaple"};@Overridepublic void open(Map conf
, TopologyContext context
, SpoutOutputCollector collector
) {this.collector
= collector
;}@Overridepublic void nextTuple() {Random random
=new Random();String word
=words
[random
.nextInt(words
.length
)];this.collector
.emit(new Values(word
));System
.out
.println("emit:"+word
);Utils
.sleep(1000);}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer
) {declarer
.declare(new Fields("line_"));}@Overridepublic void ack(Object msgId
) {super.ack(msgId
);System
.out
.println(msgId
);}}public static class SplitBolt extends BaseRichBolt {private OutputCollector collector
;@Overridepublic void prepare(Map stormConf
, TopologyContext context
, OutputCollector collector
) {this.collector
= collector
;}@Overridepublic void execute(Tuple input
) {String word
= input
.getStringByField("line_");this.collector
.emit(new Values(word
));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer
) {declarer
.declare(new Fields("word_"));}}public static class CountBolt extends BaseRichBolt{Map
<String,Integer> map
=new HashMap<>();private OutputCollector collector
;@Overridepublic void prepare(Map stormConf
, TopologyContext context
, OutputCollector collector
) {this.collector
=collector
;}@Overridepublic void execute(Tuple input
) {String word
=input
.getStringByField("word_");Integer count
=map
.get(word
);if(count
==null
){count
=1;}else{count
++;}map
.put(word
,count
);this.collector
.emit(new Values(word
,map
.get(word
)));}@Overridepublic void declareOutputFields(OutputFieldsDeclarer declarer
) {declarer
.declare(new Fields("word_","count_"));}}public static class WordCountStoreMapper implements RedisStoreMapper{private RedisDataTypeDescription description
;private final String hashKey
="wc";public WordCountStoreMapper(){description
=new RedisDataTypeDescription(RedisDataTypeDescription
.RedisDataType
.HASH
,hashKey
);}@Overridepublic RedisDataTypeDescription
getDataTypeDescription() {return description
;}@Overridepublic String
getKeyFromTuple(ITuple iTuple
) {return iTuple
.getStringByField("word_");}@Overridepublic String
getValueFromTuple(ITuple iTuple
) {return iTuple
.getIntegerByField("count_")+"";}}public static void main(String
[] args
) {TopologyBuilder builder
= new TopologyBuilder();builder
.setSpout("DataSourceSpout_",new DataSourceSpout());builder
.setBolt("SplitBolt_",new SplitBolt()).shuffleGrouping("DataSourceSpout_");builder
.setBolt("CountBolt_",new CountBolt()).shuffleGrouping("SplitBolt_");JedisPoolConfig poolConfig
=new JedisPoolConfig.Builder().setHost("192.168.200.203").setPort(6379).build();WordCountStoreMapper storeMapper
= new WordCountStoreMapper();RedisStoreBolt storeBolt
=new RedisStoreBolt(poolConfig
,storeMapper
);builder
.setBolt("RedisStoreBolt_",storeBolt
).shuffleGrouping("CountBolt_");LocalCluster cluster
=new LocalCluster();cluster
.submitTopology("LocalWordCountStormTopology",new Config(),builder
.createTopology());}
}
總結
以上是生活随笔為你收集整理的【2019春招准备:108.storm(3)】的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。