分布式开放 消息系统 (RocketMQ) 的原理与实践
前些天發(fā)現(xiàn)了一個巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點擊跳轉(zhuǎn)到教程。
分布式消息系統(tǒng)作為實現(xiàn)分布式系統(tǒng)可擴(kuò)展、可伸縮性的關(guān)鍵組件,需要具有高吞吐量、高可用等特點。而談到消息系統(tǒng)的設(shè)計,就回避不了兩個問題:
RocketMQ作為阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關(guān)鍵特性?其實現(xiàn)原理是怎樣的?
關(guān)鍵特性以及其實現(xiàn)原理
一、順序消息
消息有序指的是可以按照消息的發(fā)送順序來消費(fèi)。例如:一筆訂單產(chǎn)生了 3 條消息,分別是訂單創(chuàng)建、訂單付款、訂單完成。消費(fèi)時,要按照順序依次消費(fèi)才有意義。與此同時多筆訂單之間又是可以并行消費(fèi)的。首先來看如下示例:
假如生產(chǎn)者產(chǎn)生了2條消息:M1、M2,要保證這兩條消息的順序,應(yīng)該怎樣做?你腦中想到的可能是這樣:
?
你可能會采用這種方式保證消息順序
?
假定M1發(fā)送到S1,M2發(fā)送到S2,如果要保證M1先于M2被消費(fèi),那么需要M1到達(dá)消費(fèi)端被消費(fèi)后,通知S2,然后S2再將M2發(fā)送到消費(fèi)端。
這個模型存在的問題是,如果M1和M2分別發(fā)送到兩臺Server上,就不能保證M1先達(dá)到MQ集群,也不能保證M1被先消費(fèi)。換個角度看,如果M2先于M1達(dá)到MQ集群,甚至M2被消費(fèi)后,M1才達(dá)到消費(fèi)端,這時消息也就亂序了,說明以上模型是不能保證消息的順序的。如何才能在MQ集群保證消息的順序?一種簡單的方式就是將M1、M2發(fā)送到同一個Server上:
保證消息順序,你改進(jìn)后的方法
?
這樣可以保證M1先于M2到達(dá)MQServer(生產(chǎn)者等待M1發(fā)送成功后再發(fā)送M2),根據(jù)先達(dá)到先被消費(fèi)的原則,M1會先于M2被消費(fèi),這樣就保證了消息的順序。
這個模型也僅僅是理論上可以保證消息的順序,在實際場景中可能會遇到下面的問題:
網(wǎng)絡(luò)延遲問題
只要將消息從一臺服務(wù)器發(fā)往另一臺服務(wù)器,就會存在網(wǎng)絡(luò)延遲問題。如上圖所示,如果發(fā)送M1耗時大于發(fā)送M2的耗時,那么M2就仍將被先消費(fèi),仍然不能保證消息的順序。即使M1和M2同時到達(dá)消費(fèi)端,由于不清楚消費(fèi)端1和消費(fèi)端2的負(fù)載情況,仍然有可能出現(xiàn)M2先于M1被消費(fèi)的情況。
那如何解決這個問題?將M1和M2發(fā)往同一個消費(fèi)者,且發(fā)送M1后,需要消費(fèi)端響應(yīng)成功后才能發(fā)送M2。
聰明的你可能已經(jīng)想到另外的問題:如果M1被發(fā)送到消費(fèi)端后,消費(fèi)端1沒有響應(yīng),那是繼續(xù)發(fā)送M2呢,還是重新發(fā)送M1?一般為了保證消息一定被消費(fèi),肯定會選擇重發(fā)M1到另外一個消費(fèi)端2,就如下圖所示。
保證消息順序的正確姿勢
這樣的模型就嚴(yán)格保證消息的順序,細(xì)心的你仍然會發(fā)現(xiàn)問題,消費(fèi)端1沒有響應(yīng)Server時有兩種情況,一種是M1確實沒有到達(dá)(數(shù)據(jù)在網(wǎng)絡(luò)傳送中丟失),另外一種消費(fèi)端已經(jīng)消費(fèi)M1且已經(jīng)發(fā)送響應(yīng)消息,只是MQ Server端沒有收到。如果是第二種情況,重發(fā)M1,就會造成M1被重復(fù)消費(fèi)。也就引入了我們要說的第二個問題,消息重復(fù)問題,這個后文會詳細(xì)講解。
回過頭來看消息順序問題,嚴(yán)格的順序消息非常容易理解,也可以通過文中所描述的方式來簡單處理??偨Y(jié)起來,要實現(xiàn)嚴(yán)格的順序消息,簡單且可行的辦法就是:
保證生產(chǎn)者 - MQServer - 消費(fèi)者是一對一對一的關(guān)系
這樣的設(shè)計雖然簡單易行,但也會存在一些很嚴(yán)重的問題,比如:
但我們的最終目標(biāo)是要集群的高容錯性和高吞吐量。這似乎是一對不可調(diào)和的矛盾,那么阿里是如何解決的?
世界上解決一個計算機(jī)問題最簡單的方法:“恰好”不需要解決它!—— 沈詢
有些問題,看起來很重要,但實際上我們可以通過合理的設(shè)計或者將問題分解來規(guī)避。如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費(fèi)。從這個角度來看消息的順序問題,我們可以得出兩個結(jié)論:
所以從業(yè)務(wù)層面來保證消息的順序而不僅僅是依賴于消息系統(tǒng),是不是我們應(yīng)該尋求的一種更合理的方式?
最后我們從源碼角度分析RocketMQ怎么實現(xiàn)發(fā)送順序消息。
RocketMQ通過輪詢所有隊列的方式來確定消息被發(fā)送到哪一個隊列(負(fù)載均衡策略)。比如下面的示例中,訂單號相同的消息會被先后發(fā)送到同一個隊列中:
// RocketMQ通過MessageQueueSelector中實現(xiàn)的算法來確定消息發(fā)送到哪一個隊列上 // RocketMQ默認(rèn)提供了兩種MessageQueueSelector實現(xiàn):隨機(jī)/Hash // 當(dāng)然你可以根據(jù)業(yè)務(wù)實現(xiàn)自己的MessageQueueSelector來決定消息按照何種策略發(fā)送到消息隊列中 SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);} }, orderId);在獲取到路由信息以后,會根據(jù)MessageQueueSelector實現(xiàn)的算法來選擇一個隊列,同一個OrderId獲取到的肯定是同一個隊列。
private SendResult send() {// 獲取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {MessageQueue mq = null;// 根據(jù)我們的算法,選擇一個發(fā)送隊列// 這里的arg = orderIdmq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);if (mq != null) {return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);}} }二、消息重復(fù)
上面在解決消息順序問題時,引入了一個新的問題,就是消息重復(fù)。那么RocketMQ是怎樣解決消息重復(fù)的問題呢?還是“恰好”不解決。
造成消息重復(fù)的根本原因是:網(wǎng)絡(luò)不可達(dá)。只要通過網(wǎng)絡(luò)交換數(shù)據(jù),就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那么問題就變成了:如果消費(fèi)端收到兩條一樣的消息,應(yīng)該怎樣處理?
第1條很好理解,只要保持冪等性,不管來多少條重復(fù)消息,最后處理的結(jié)果都一樣。第2條原理就是利用一張日志表來記錄已經(jīng)處理成功的消息的ID,如果新到的消息ID已經(jīng)在日志表中,那么就不再處理這條消息。
第1條解決方案,很明顯應(yīng)該在消費(fèi)端實現(xiàn),不屬于消息系統(tǒng)要實現(xiàn)的功能。第2條可以消息系統(tǒng)實現(xiàn),也可以業(yè)務(wù)端實現(xiàn)。正常情況下出現(xiàn)重復(fù)消息的概率其實很小,如果由消息系統(tǒng)來實現(xiàn)的話,肯定會對消息系統(tǒng)的吞吐量和高可用有影響,所以最好還是由業(yè)務(wù)端自己處理消息重復(fù)的問題,這也是RocketMQ不解決消息重復(fù)的問題的原因。
RocketMQ不保證消息不重復(fù),如果你的業(yè)務(wù)需要保證嚴(yán)格的不重復(fù)消息,需要你自己在業(yè)務(wù)端去重。
三、事務(wù)消息
RocketMQ除了支持普通消息,順序消息,另外還支持事務(wù)消息。首先討論一下什么是事務(wù)消息以及支持事務(wù)消息的必要性。我們以一個轉(zhuǎn)帳的場景為例來說明這個問題:Bob向Smith轉(zhuǎn)賬100塊。
在單機(jī)環(huán)境下,執(zhí)行事務(wù)的情況,大概是下面這個樣子:
單機(jī)環(huán)境下轉(zhuǎn)賬事務(wù)示意圖
當(dāng)用戶增長到一定程度,Bob和Smith的賬戶及余額信息已經(jīng)不在同一臺服務(wù)器上了,那么上面的流程就變成了這樣:
集群環(huán)境下轉(zhuǎn)賬事務(wù)示意圖
這時候你會發(fā)現(xiàn),同樣是一個轉(zhuǎn)賬的業(yè)務(wù),在集群環(huán)境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規(guī)避這個問題?
大事務(wù) = 小事務(wù) + 異步
將大事務(wù)拆分成多個小事務(wù)異步執(zhí)行。這樣基本上能夠?qū)⒖鐧C(jī)事務(wù)的執(zhí)行效率優(yōu)化到與單機(jī)一致。轉(zhuǎn)賬的事務(wù)就可以分解成如下兩個小事務(wù):
?
小事務(wù)+異步消息
?
圖中執(zhí)行本地事務(wù)(Bob賬戶扣款)和發(fā)送異步消息應(yīng)該保證同時成功或者同時失敗,也就是扣款成功了,發(fā)送消息一定要成功,如果扣款失敗了,就不能再發(fā)送消息。那問題是:我們是先扣款還是先發(fā)送消息呢?
首先看下先發(fā)送消息的情況,大致的示意圖如下:
事務(wù)消息:先發(fā)送消息
存在的問題是:如果消息發(fā)送成功,但是扣款失敗,消費(fèi)端就會消費(fèi)此消息,進(jìn)而向Smith賬戶加錢。
先發(fā)消息不行,那就先扣款吧,大致的示意圖如下:
事務(wù)消息-先扣款
存在的問題跟上面類似:如果扣款成功,發(fā)送消息失敗,就會出現(xiàn)Bob扣錢了,但是Smith賬戶未加錢。
可能大家會有很多的方法來解決這個問題,比如:直接將發(fā)消息放到Bob扣款的事務(wù)中去,如果發(fā)送失敗,拋出異常,事務(wù)回滾。這樣的處理方式也符合“恰好”不需要解決的原則。
這里需要說明一下:如果使用Spring來管理事物的話,大可以將發(fā)送消息的邏輯放到本地事物中去,發(fā)送消息失敗拋出異常,Spring捕捉到異常后就會回滾此事物,以此來保證本地事物與發(fā)送消息的原子性。
RocketMQ支持事務(wù)消息,下面來看看RocketMQ是怎樣來實現(xiàn)的。
RocketMQ實現(xiàn)發(fā)送事務(wù)消息
RocketMQ第一階段發(fā)送Prepared消息時,會拿到消息的地址,第二階段執(zhí)行本地事物,第三階段通過第一階段拿到的地址去訪問消息,并修改消息的狀態(tài)。
細(xì)心的你可能又發(fā)現(xiàn)問題了,如果確認(rèn)消息發(fā)送失敗了怎么辦?RocketMQ會定期掃描消息集群中的事物消息,如果發(fā)現(xiàn)了Prepared消息,它會向消息發(fā)送端(生產(chǎn)者)確認(rèn),Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續(xù)發(fā)送確認(rèn)消息呢?RocketMQ會根據(jù)發(fā)送端設(shè)置的策略來決定是回滾還是繼續(xù)發(fā)送確認(rèn)消息。這樣就保證了消息發(fā)送與本地事務(wù)同時成功或同時失敗。
那我們來看下RocketMQ源碼,是如何處理事務(wù)消息的??蛻舳税l(fā)送事務(wù)消息的部分(完整代碼請查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):
// =============================發(fā)送事務(wù)消息的一系列準(zhǔn)備工作======================================== // 未決事務(wù),MQ服務(wù)器回查客戶端 // 也就是上文所說的,當(dāng)RocketMQ發(fā)現(xiàn)`Prepared消息`時,會根據(jù)這個Listener實現(xiàn)的策略來決斷事務(wù) TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構(gòu)造事務(wù)消息的生產(chǎn)者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設(shè)置事務(wù)決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務(wù)的處理邏輯,相當(dāng)于示例中檢查Bob賬戶并扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構(gòu)造MSG,省略構(gòu)造參數(shù) Message msg = new Message(......); // 發(fā)送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();接著查看sendMessageInTransaction方法的源碼,總共分為3個階段:發(fā)送Prepared消息、執(zhí)行本地事務(wù)、發(fā)送確認(rèn)消息。
// ================================事務(wù)消息的發(fā)送過程============================================= public TransactionSendResult sendMessageInTransaction(.....) {// 邏輯代碼,非實際代碼// 1.發(fā)送消息sendResult = this.send(msg);// sendResult.getSendStatus() == SEND_OK// 2.如果消息發(fā)送成功,處理與消息關(guān)聯(lián)的本地事務(wù)單元LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);// 3.結(jié)束事務(wù)this.endTransaction(sendResult, localTransactionState, localException); }endTransaction方法會將請求發(fā)往broker(mq server)去更新事務(wù)消息的最終狀態(tài):
如果endTransaction方法執(zhí)行失敗,數(shù)據(jù)沒有發(fā)送到broker,導(dǎo)致事務(wù)消息的 狀態(tài)更新失敗,broker會有回查線程定時(默認(rèn)1分鐘)掃描每個存儲事務(wù)狀態(tài)的表格文件,如果是已經(jīng)提交或者回滾的消息直接跳過,如果是prepared狀態(tài)則會向Producer發(fā)起CheckTransaction請求,Producer會調(diào)用DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回調(diào)請求,而checkTransactionState會調(diào)用我們的事務(wù)設(shè)置的決斷方法來決定是回滾事務(wù)還是繼續(xù)執(zhí)行,最后調(diào)用endTransactionOneway讓broker來更新消息的最終狀態(tài)。
再回到轉(zhuǎn)賬的例子,如果Bob的賬戶的余額已經(jīng)減少,且消息已經(jīng)發(fā)送成功,Smith端開始消費(fèi)這條消息,這個時候就會出現(xiàn)消費(fèi)失敗和消費(fèi)超時兩個問題,解決超時問題的思路就是一直重試,直到消費(fèi)端消費(fèi)消息成功,整個過程中有可能會出現(xiàn)消息重復(fù)的問題,按照前面的思路解決即可。
消費(fèi)事務(wù)消息
這樣基本上可以解決消費(fèi)端超時問題,但是如果消費(fèi)失敗怎么辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務(wù)的流程,因為某種原因Smith加款失敗,那么需要回滾整個流程。如果消息系統(tǒng)要實現(xiàn)這個回滾流程的話,系統(tǒng)復(fù)雜度將大大提升,且很容易出現(xiàn)Bug,估計出現(xiàn)Bug的概率會比消費(fèi)失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設(shè)計實現(xiàn)消息系統(tǒng)時,我們需要衡量是否值得花這么大的代價來解決這樣一個出現(xiàn)概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。
20160321補(bǔ)充:在3.2.6版本中移除了事務(wù)消息的實現(xiàn),所以此版本不支持事務(wù)消息,具體情況請參考rocketmq的issues(已失效):
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
四、Producer如何發(fā)送消息
Producer輪詢某topic下的所有隊列的方式來實現(xiàn)發(fā)送方的負(fù)載均衡,如下圖所示:
producer發(fā)送消息負(fù)載均衡
首先分析一下RocketMQ的客戶端發(fā)送消息的源碼:
?
// 構(gòu)造Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 初始化Producer,整個應(yīng)用生命周期內(nèi),只需要初始化1次 producer.start(); // 構(gòu)造Message Message msg = new Message("TopicTest1",// topic"TagA",// tag:給消息打標(biāo)簽,用于區(qū)分一類消息,可為null"OrderID188",// key:自定義Key,可以用于去重,可為null("Hello MetaQ").getBytes());// body:消息內(nèi)容 // 發(fā)送消息并返回結(jié)果 SendResult sendResult = producer.send(msg); // 清理資源,關(guān)閉網(wǎng)絡(luò)連接,注銷自己 producer.shutdown();在整個應(yīng)用生命周期內(nèi),生產(chǎn)者需要調(diào)用一次start方法來初始化,初始化主要完成的任務(wù)有:
初始化完成后,開始發(fā)送消息,發(fā)送消息的主要代碼如下:
private SendResult sendDefaultImpl(Message msg,......) {// 檢查Producer的狀態(tài)是否是RUNNINGthis.makeSureStateOK();// 檢查msg是否合法:是否為null、topic,body是否為空、body是否超長Validators.checkMessage(msg, this.defaultMQProducer);// 獲取topic路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());// 從路由信息中選擇一個消息隊列MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);// 將消息發(fā)送到該隊列上去sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); }代碼中需要關(guān)注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務(wù)獲取路由信息并更新到本地緩存,所以tryToFindTopicPublishInfo會首先從緩存中獲取topic路由信息,如果沒有獲取到,則會自己去namesrv獲取路由信息。selectOneMessageQueue方法通過輪詢的方式,返回一個隊列,以達(dá)到負(fù)載均衡的目的。
如果Producer發(fā)送消息失敗,會自動重試,重試的策略:
五、消息存儲
RocketMQ的消息存儲是由consume queue和commit log配合完成的。
1、Consume Queue
consume queue是消息的邏輯隊列,相當(dāng)于字典的目錄,用來指定消息在物理文件commit log上的位置。
我們可以在配置中指定consumequeue與commitlog存儲的目錄
每個topic下的每個queue都有一個對應(yīng)的consumequeue文件,比如:
Consume Queue文件組織,如圖所示:
Consume Queue文件組織示意圖
死信隊列(Dead Letter Queue)一般用于存放由于某種原因無法傳遞的消息,比如處理失敗或者已經(jīng)過期的消息。
Consume Queue中存儲單元是一個20字節(jié)定長的二進(jìn)制數(shù)據(jù),順序?qū)戫樞蜃x,如下圖所示:
consumequeue文件存儲單元格式
2、Commit Log
CommitLog:消息存放的物理文件,每臺broker上的commitlog被本機(jī)所有的queue共享,不做任何區(qū)分。
文件的默認(rèn)位置如下,仍然可通過配置文件修改:
CommitLog的消息存儲單元長度不固定,文件順序?qū)?#xff0c;隨機(jī)讀。消息的存儲結(jié)構(gòu)如下表所示,按照編號順序以及編號對應(yīng)的內(nèi)容依次存儲。
?
Commit Log存儲單元結(jié)構(gòu)圖
3、消息存儲實現(xiàn)
消息存儲實現(xiàn),比較復(fù)雜,也值得大家深入了解,后面會單獨(dú)成文來分析(目前正在收集素材),這小節(jié)只以代碼說明一下具體的流程。
// Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting msg.setBodyCRC(UtilAll.crc32(msg.getBody())); StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); synchronized (this) {long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();// Here settings are stored timestamp, in order to ensure an orderly globalmsg.setStoreTimestamp(beginLockTimestamp);// MapedFile:操作物理文件在內(nèi)存中的映射以及將內(nèi)存數(shù)據(jù)持久化到物理文件中MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();// 將Message追加到文件commitlogresult = mapedFile.appendMessage(msg, this.appendMessageCallback);switch (result.getStatus()) {case PUT_OK:break;case END_OF_FILE:// Create a new file, re-write the messagemapedFile = this.mapedFileQueue.getLastMapedFile();result = mapedFile.appendMessage(msg, this.appendMessageCallback);break;DispatchRequest dispatchRequest = new DispatchRequest(topic,// 1queueId,// 2result.getWroteOffset(),// 3result.getWroteBytes(),// 4tagsCode,// 5msg.getStoreTimestamp(),// 6result.getLogicsOffset(),// 7msg.getKeys(),// 8/*** Transaction*/msg.getSysFlag(),// 9msg.getPreparedTransactionOffset());// 10// 1.分發(fā)消息位置到ConsumeQueue// 2.分發(fā)到IndexService建立索引this.defaultMessageStore.putDispatchRequest(dispatchRequest); }4、消息的索引文件
如果一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內(nèi)容結(jié)構(gòu)如圖:
?
消息索引
?
索引文件主要用于根據(jù)key來查詢消息的,流程主要是:
六、消息訂閱
RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費(fèi)端推送;另外一種是Pull模式,即消費(fèi)端在需要時,主動到MQServer拉取。但在具體實現(xiàn)時,Push和Pull模式都是采用消費(fèi)端主動拉取的方式。
首先看下消費(fèi)端的負(fù)載均衡:
?
消費(fèi)端負(fù)載均衡
?
消費(fèi)端會通過RebalanceService線程,10秒鐘做一次基于topic下的所有隊列負(fù)載:
如同上圖所示:如果有 5 個隊列,2 個 consumer,那么第一個 Consumer 消費(fèi) 3 個隊列,第二 consumer 消費(fèi) 2 個隊列。這里采用的就是平均分配策略,它類似于分頁的過程,TOPIC下面的所有queue就是記錄,Consumer的個數(shù)就相當(dāng)于總的頁數(shù),那么每頁有多少條記錄,就類似于某個Consumer會消費(fèi)哪些隊列。
通過這樣的策略來達(dá)到大體上的平均消費(fèi),這樣的設(shè)計也可以很方面的水平擴(kuò)展Consumer來提高消費(fèi)能力。
消費(fèi)端的Push模式是通過長輪詢的模式來實現(xiàn)的,就如同下圖:
Push模式示意圖
?
Consumer端每隔一段時間主動向broker發(fā)送拉消息請求,broker在收到Pull請求后,如果有消息就立即返回數(shù)據(jù),Consumer端收到返回的消息后,再回調(diào)消費(fèi)者設(shè)置的Listener方法。如果broker在收到Pull請求時,消息隊列里沒有數(shù)據(jù),broker端會阻塞請求直到有數(shù)據(jù)傳遞或超時才返回。
當(dāng)然,Consumer端是通過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>中的PullRequest發(fā)送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發(fā)現(xiàn)沒有消息,就會把PullRequest扔到ConcurrentHashMap中緩存起來。broker在啟動時,會啟動一個線程不停的從ConcurrentHashMap取出PullRequest檢查,直到有數(shù)據(jù)返回。
七、RocketMQ的其他特性
前面的6個特性都是基本上都是點到為止,想要深入了解,還需要大家多多查看源碼,多多在實際中運(yùn)用。當(dāng)然除了已經(jīng)提到的特性外,RocketMQ還支持:
其中涉及到的很多設(shè)計思路和解決方法都值得我們深入研究:
RocketMQ最佳實踐
一、Producer最佳實踐
1、一個應(yīng)用盡可能用一個 Topic,消息子類型用 tags 來標(biāo)識,tags 可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時,才可以利用 tags 在 broker 做消息過濾。
2、每個消息在業(yè)務(wù)層面的唯一標(biāo)識碼,要設(shè)置到 keys 字段,方便將來定位消息丟失問題。由于是哈希索引,請務(wù)必保證 key 盡可能唯一,這樣可以避免潛在的哈希沖突。
3、消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印 sendresult 和 key 字段。
4、對于消息不可丟失應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制。例如:消息發(fā)送失敗,存儲到數(shù)據(jù)庫,能有定時程序嘗試重發(fā)或者人工觸發(fā)重發(fā)。
5、某些應(yīng)用如果不關(guān)注消息是否發(fā)送成功,請直接使用sendOneWay方法發(fā)送消息。
二、Consumer最佳實踐
1、消費(fèi)過程要做到冪等(即消費(fèi)端去重)
2、盡量使用批量方式消費(fèi)方式,可以很大程度上提高消費(fèi)吞吐量。
3、優(yōu)化每條消息消費(fèi)過程
三、其他配置
線上應(yīng)該關(guān)閉autoCreateTopicEnable,即在配置文件中將其設(shè)置為false。
RocketMQ在發(fā)送消息時,會首先獲取路由信息。如果是新的消息,由于MQServer上面還沒有創(chuàng)建對應(yīng)的Topic,這個時候,如果上面的配置打開的話,會返回默認(rèn)TOPIC的(RocketMQ會在每臺broker上面創(chuàng)建名為TBW102的TOPIC)路由信息,然后Producer會選擇一臺Broker發(fā)送消息,選中的broker在存儲消息時,發(fā)現(xiàn)消息的topic還沒有創(chuàng)建,就會自動創(chuàng)建topic。后果就是:以后所有該TOPIC的消息,都將發(fā)送到這臺broker上,達(dá)不到負(fù)載均衡的目的。
所以基于目前RocketMQ的設(shè)計,建議關(guān)閉自動創(chuàng)建TOPIC的功能,然后根據(jù)消息量的大小,手動創(chuàng)建TOPIC。
RocketMQ設(shè)計相關(guān)
RocketMQ的設(shè)計假定:
每臺PC機(jī)器都可能宕機(jī)不可服務(wù)
任意集群都有可能處理能力不足
最壞的情況一定會發(fā)生
內(nèi)網(wǎng)環(huán)境需要低延遲來提供最佳用戶體驗
RocketMQ的關(guān)鍵設(shè)計:
分布式集群化
強(qiáng)數(shù)據(jù)安全
海量數(shù)據(jù)堆積
毫秒級投遞延遲(推拉模式)
這是RocketMQ在設(shè)計時的假定前提以及需要到達(dá)的效果。我想這些假定適用于所有的系統(tǒng)設(shè)計。隨著我們系統(tǒng)的服務(wù)的增多,每位開發(fā)者都要注意自己的程序是否存在單點故障,如果掛了應(yīng)該怎么恢復(fù)、能不能很好的水平擴(kuò)展、對外的接口是否足夠高效、自己管理的數(shù)據(jù)是否足夠安全...... 多多規(guī)范自己的設(shè)計,才能開發(fā)出高效健壯的程序。
參考資料
轉(zhuǎn)自:https://www.jianshu.com/p/453c6e7ff81c
?
總結(jié)
以上是生活随笔為你收集整理的分布式开放 消息系统 (RocketMQ) 的原理与实践的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 解决:springcloud eurek
- 下一篇: MFC原理 消息传递