生活随笔
收集整理的這篇文章主要介紹了
RocketMQ topic路由
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
為什么80%的碼農都做不了架構師?>>> ??
?
原創文章,轉載請注明出處:http://jameswxx.iteye.com/blog/2096446 這里以消費者為例說明。一組消費者要消費某個topic,得先知道該topic分布在哪些broker上,某個broker上的topic分布可能會變化,一旦變化,生產者和消費者應該都能被通知到。通知模式有推和拉兩種,客戶端都是采取拉的模式,所以broker如有變化,通知都是有延遲的。
一 什么時候啟動topic路由獲取任務 兩個地方: 1 首先是DefaultMQPushConsumerImpl啟動時,見DefaultMQPushConsumerImpl的start方法里的
this .updateTopicSubscribeInfoWhenSubscriptionChanged(); 2 另外DefaultMQPushConsumerImpl的start方法也啟動了MQClientInstance,MQClientInstance的start方法里調用了 startScheduledTask()方法,該方法啟動了獲取路由的定時任務。
| ? ? ? ?? // 定時從Name Server獲取Topic路由信息 ???????? this . scheduledExecutorService? .scheduleAtFixedRate( new? Runnable() { ???????????? @Override ???????????? public? void? run() { ???????????????? try? { ??????????????????? MQClientInstance. this? .updateTopicRouteInfoFromNameServer(); ??????????????? } ???????????????? catch? (Exception e) { ???????????????????? log .error(? "ScheduledTask updateTopicRouteInfoFromNameServer exception" , e); ??????????????? } ??????????? } ??????? }, 10,? this . clientConfig? .getPollNameServerInteval(), TimeUnit. MILLISECONDS? ); |
? 二 每隔多久獲取一次 很簡單,看定時任務每隔多久執行一次就知道了,這里的間隔參數是 this . clientConfig?
.getPollNameServerInteval()。 ClientConfig的pollNameServerInteval?定義如下:
private int pollNameServerInteval = 1000 * 30; DefaultMQPushConsumer繼承了ClientConfig,
pollNameServerInteval?默認是30秒,顯然,這個時間是可以自己定義的,通過 DefaultMQPushConsumer的
setPollNameServerInteval()方法。 ? 三 獲取路由過程 看 MQClientInstance的 updateTopicRouteInfoFromNameServer()方法,該方法最終會調用下面這個方法,需要注意,對于消費者而言, isDefault參數永遠是false。 | ?? public? boolean? updateTopicRouteInfoFromNameServer( final? String topic,? boolean? isDefault, DefaultMQProducer defaultMQProducer) { ???????? try? { ???????????? if? ( this . lockNamesrv? .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS? )) { ???????????????? try? { ??????????????????? TopicRouteData topicRouteData; ???????????????????? if? (isDefault && defaultMQProducer !=? null ) { ? ? ? ? ? ? ? ? ? ? ? ?//此處省略不必要的信息,對于消費者,分支不會走到這里來,因為 isDefault為false,且生產者肯定為空 ??????????????????? } ???????????????????? else? { ??????????????????????? topicRouteData = ???????????????????????????????? this . mQClientAPIImpl? .getTopicRouteInfoFromNameServer(topic, 1000 * 3); ??????????????????? } ? ? ? ? ? ? ? ? ? ??//此處省略無關語句 ??????????????? } ???????????????? catch? (Exception e) { ???????????????????? if? (!topic.startsWith(MixAll. RETRY_GROUP_TOPIC_PREFIX? ) ??????????????????????????? && !topic.equals(MixAll. DEFAULT_TOPIC? )) { ???????????????????????? log .warn( "updateTopicRouteInfoFromNameServer Exception"? , e); ??????????????????? } ??????????????? } ???????????????? finally? { ???????????????????? this . lockNamesrv? .unlock(); ??????????????? } ??????????? } ???????????? else? { ???????????????? log .warn( "updateTopicRouteInfoFromNameServer tryLock timeout {}ms" ,? LockTimeoutMillis ); ??????????? } ??????? } ???????? catch? (InterruptedException e) { ???????????? log .warn(? "updateTopicRouteInfoFromNameServer Exception" , e); ??????? } ? ???????? return? false? ; ? } |
其實最終都是通過 this ?. mQClientAPIImpl?
.getTopicRouteInfoFromNameServer(topic, 1000 * 3);得到的。 ? ? ? 四 客戶端與nameserver的連接關系 broker與所有nameserver都是長連接,如有變化,則向所有nameserver都發送消息。但是生產者和消費者只是跟某一臺nameserver保持聯系。 設定一個場景, 如果某個broker的topic配置發生了變化,它向所有nameserver發布通知,但是此時如果某一臺nameserver推送失敗(超時或者掛掉了),則nameserver集群之間的信息是不完整的,因為掛掉的那臺nameserver沒有得到最新變化。 由此衍生三個問題: 1 如果該nameserver不是掛掉,只是那一瞬間沒有響應,那么待可正常服務時,剛才那個borker發生的變化應該能生效,不應該被丟棄,否則nameserver之間的數據是不同步的。 ??解決方案:broker是定時向所有nameserver發送自己的注冊信息的,如果當時某臺nameserver掛掉重啟或者超時,沒關系,下次仍然會接受到上次沒接收到的broker信息 2 如果真的掛掉了,但是很快又恢復了,因為borker和nameserver保持的是長連接,顯然掛掉重新啟動后,broker與nameserver的長連接無效了,應該能自動重連 ??見 getAndCreateChannel方法分析 3 只要某個nameserver不可用,消費者應該能failover,每次應該都檢查長連接是否還有效,若無效則 自動連接其他nameserver。 ??見 getAndCreateNameserverChannel()方法分析 ? 帶著這個疑問,看看 this ?. mQClientAPIImpl?
.getTopicRouteInfoFromNameServer(topic, 1000 * 3)方法。 這個方法向nameserver發起調用,獲取路由結果 | RemotingCommand request =? RemotingCommand.createRequestCommand(RequestCode.? GET_ALL_TOPIC_LIST_FROM_NAMESERVER? ,? null ); RemotingCommand response =? this ?. remotingClient? .invokeSync( ?null , request, timeoutMillis); |
重點在于 remotingClient?
.invokeSync方法,如下 | @Override ???? public ?RemotingCommand invokeSync(String addr,? final ?RemotingCommand request,? long? timeoutMillis) ???????????? throws ?InterruptedException, RemotingConnectException, RemotingSendRequestException, ??????????? RemotingTimeoutException { ? ? ? ??//這里獲取連接,該方法里面會做連接的檢查和恢復 ???????? final ?Channel channel =? this ?.getAndCreateChannel(addr); ? ? ? ? ??//最后如果還是不是有效連接,則關閉連接,拋出異常 ???????? if ?(channel !=? null ?&& channel.isActive()) { ???????????? try ?{ ???????????????? if ?( this? . rpcHook? !=? null ) { ???????????????????? this ?. rpcHook? .doBeforeRequest(addr, request); ??????????????? } ??????????????? RemotingCommand response =? this ?.invokeSyncImpl(channel, request, timeoutMillis); ???????????????? if ?( this? . rpcHook? !=? null ) { ???????????????????? this ?. rpcHook? .doAfterResponse(request, response); ??????????????? } ???????????????? return ?response; ??????????? } ???????????? catch ?(RemotingSendRequestException e) { ???????????????? log ?.warn( "invokeSync: send request exception, so close the channel[{}]" , addr); ???????????????? this ?.closeChannel(addr, channel); ???????????????? throw ?e; ??????????? } ???????????? catch ?(RemotingTimeoutException e) { ???????????????? log ?.warn( "invokeSync: wait response timeout exception, the channel[{}]" , addr); ???????????????? // 超時異常如果關閉連接可能會產生連鎖反應 ???????????????? // this.closeChannel(?addr, channel); ???????????????? throw ?e; ??????????? } ??????? } ???????? else ?{ ???????????? this ?.closeChannel(addr, channel); ???????????? throw ? new? RemotingConnectException(addr); ??????? } ??? } |
這個方法大體分為兩步,第一步獲取連接,第二步通過連接發送請求,獲取連接當然是
getAndCreateChannel方法了, getAndCreateChannel方法非常重要,它包含了客戶端對nameserver的failover,也包含了自動重連功能, 對于客戶端,傳入的addr參數都是null,所以一直會走到 getAndCreateNameserverChannel()方法。 | ?? ?private ?Channel? getAndCreateChannel ( ?final ?String addr)? throws ?InterruptedException { ? ? ? ??//無論是producer還是consumer,傳進來的 addr參數都是null ???????? if ?( null? == addr) ???????????? return ?getAndCreateNameserverChannel(); ? ? ? ? ? //因為客戶端傳入的addr是null,所以客戶端不會走到這里來,只有broker才會走到這里來,因為broker傳入的addr不為null ??????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ???????? if ?(cw !=? null? && cw.isOK()) { ???????????? return ?cw.getChannel(); ??????? } ? ? ? ? ? //注意,如果和某個addr的連接不OK了,則再向該nameserver發起重連 ???????? return ? this? .createChannel(addr); ??? } |
? createChannel方法很簡單,無非就是創建連接嘛,就不細看了,分析下 getAndCreateNameserverChannel(),以下是該方法大致過程: 因為客戶端都是與某一臺nameserver長連接,因此長連接一旦選定,后面不會變化,除非nameserver掛掉,所以已建立的長連接要保存起來。下面這段邏輯就是如此。 | ? ? ? ?String addr =? this ?. namesrvAddrChoosed? .get(); ???????? if ?(addr !=? null ) { ??????????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ? ? ? ? ? ? ?//注意這里,雖然長連接已經建立了,但是每次調用時,仍然要通過“ cw !=?null?&& cw.isOK()”檢查連接是否OK。 ? ? ? ? ? ??? if ?(cw !=? null? && cw.isOK()) { ???????????????? return ?cw.getChannel(); ??????????? } ??????? } |
如果連接沒有建立或連接已經斷開,則繼續往下,真正創建連接時需要加鎖的 ? if ( this . lockNamesrvChannel .tryLock( LockTimeoutMillis , TimeUnit. MILLISECONDS )) 下面的代碼都是在這個if塊里面 這里又執行了一邊上面的獲取連接并檢測的代碼,可以連接,因為有時候連接只是偶爾不OK的 | ? ? ?addr =? this .? namesrvAddrChoosed ?.get(); ???????????????? if ?(addr !=? null ) { ??????????????????? ChannelWrapper cw =? this ?. channelTables? .get(addr); ???????????????????? if ?(cw !=? null? && cw.isOK()) { ???????????????????????? return ?cw.getChannel(); ??????????????????? } ??????????????? } |
接著往下,
這段代碼非常重要 namesrvIndex指示了當前跟哪個nameserver發生連接,初始值是個隨機數,跟nameserver數量取模,走到這一步,要么是首次發起調用,之前連接還未創建現在要創建了,或者是已創建的連接無效了要連接下一個nameserver,就是“cw.isOK()”為false。 ? | ? ? ? ? ?if ?(addrList !=? null ?&& !addrList.isEmpty()) { ???????????????????? for ?( int? i = 0; i < addrList.size(); i++) { ???????????????????????? int ?index =? this ?. namesrvIndex? .incrementAndGet(); ??????????????????????? index = Math.?abs(index); ??????????????????????? index = index % addrList.size(); ??????????????????????? String newAddr = addrList.get(index); ? ???????????????????????? this ?.namesrvAddrChoosed.set(newAddr); ??????????????????????? Channel channelNew =? this ?.createChannel(newAddr); ???????????????????????? if ?(channelNew !=? null ) ???????????????????????????? return ?channelNew; ??????????????????? } ??????????????? } |
?
轉載于:https://my.oschina.net/boltwu/blog/473025
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎
總結
以上是生活随笔為你收集整理的RocketMQ topic路由的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。