消息队列常见面试题
文章目錄
- 2. 消息隊(duì)列
- 2.1 MQ有什么用?
- 2.2 說一說生產(chǎn)者與消費(fèi)者模式
- 2.3 消息隊(duì)列如何保證順序消費(fèi)?
- 2.4 消息隊(duì)列如何保證消息不丟?
- 2.5 消息隊(duì)列如何保證不重復(fù)消費(fèi)?
- 2.6 MQ處理消息失敗了怎么辦?
- 2.7 請介紹消息隊(duì)列推和拉的使用場景
- 2.8 RabbitMQ和Kafka有什么區(qū)別?
- 2.9 Kafka為什么速度快?
- 2.10 RabbitMQ如何保證消息已達(dá)?
2. 消息隊(duì)列
2.1 MQ有什么用?
參考答案
消息隊(duì)列有很多使用場景,比較常見的有3個(gè):解耦、異步、削峰。
2.2 說一說生產(chǎn)者與消費(fèi)者模式
參考答案
所謂生產(chǎn)者-消費(fèi)者問題,實(shí)際上主要是包含了兩類線程。一種是生產(chǎn)者線程用于生產(chǎn)數(shù)據(jù),另一種是消費(fèi)者線程用于消費(fèi)數(shù)據(jù),為了解耦生產(chǎn)者和消費(fèi)者的關(guān)系,通常會(huì)采用共享的數(shù)據(jù)區(qū)域,就像是一個(gè)倉庫。生產(chǎn)者生產(chǎn)數(shù)據(jù)之后直接放置在共享數(shù)據(jù)區(qū)中,并不需要關(guān)心消費(fèi)者的行為。而消費(fèi)者只需要從共享數(shù)據(jù)區(qū)中去獲取數(shù)據(jù),就不再需要關(guān)心生產(chǎn)者的行為。但是,這個(gè)共享數(shù)據(jù)區(qū)域中應(yīng)該具備這樣的線程間并發(fā)協(xié)作的功能:
在Java語言中,實(shí)現(xiàn)生產(chǎn)者消費(fèi)者問題時(shí),可以采用三種方式:
2.3 消息隊(duì)列如何保證順序消費(fèi)?
參考答案
在生產(chǎn)中經(jīng)常會(huì)有一些類似報(bào)表系統(tǒng)這樣的系統(tǒng),需要做 MySQL 的 binlog 同步。比如訂單系統(tǒng)要同步訂單表的數(shù)據(jù)到大數(shù)據(jù)部門的 MySQL 庫中用于報(bào)表統(tǒng)計(jì)分析,通常的做法是基于 Canal 這樣的中間件去監(jiān)聽訂單數(shù)據(jù)庫的 binlog,然后把這些 binlog 發(fā)送到 MQ 中,再由消費(fèi)者從 MQ 中獲取 binlog 落地到大數(shù)據(jù)部門的 MySQL 中。
在這個(gè)過程中,可能會(huì)有對某個(gè)訂單的增刪改操作,比如有三條 binlog 執(zhí)行順序是增加、修改、刪除。消費(fèi)者愣是換了順序給執(zhí)行成刪除、修改、增加,這樣能行嗎?肯定是不行的。不同的消息隊(duì)列產(chǎn)品,產(chǎn)生消息錯(cuò)亂的原因,以及解決方案是不同的。下面我們以RabbitMQ、Kafka、RocketMQ為例,來說明保證順序消費(fèi)的辦法。
RabbitMQ:
對于 RabbitMQ 來說,導(dǎo)致上面順序錯(cuò)亂的原因通常是消費(fèi)者是集群部署,不同的消費(fèi)者消費(fèi)到了同一訂單的不同的消息。如消費(fèi)者A執(zhí)行了增加,消費(fèi)者B執(zhí)行了修改,消費(fèi)者C執(zhí)行了刪除,但是消費(fèi)者C執(zhí)行比消費(fèi)者B快,消費(fèi)者B又比消費(fèi)者A快,就會(huì)導(dǎo)致消費(fèi) binlog 執(zhí)行到數(shù)據(jù)庫的時(shí)候順序錯(cuò)亂,本該順序是增加、修改、刪除,變成了刪除、修改、增加。如下圖:
RabbitMQ 的問題是由于不同的消息都發(fā)送到了同一個(gè) queue 中,多個(gè)消費(fèi)者都消費(fèi)同一個(gè) queue 的消息。解決這個(gè)問題,我們可以給 RabbitMQ 創(chuàng)建多個(gè) queue,每個(gè)消費(fèi)者固定消費(fèi)一個(gè) queue 的消息,生產(chǎn)者發(fā)送消息的時(shí)候,同一個(gè)訂單號的消息發(fā)送到同一個(gè) queue 中,由于同一個(gè) queue 的消息是一定會(huì)保證有序的,那么同一個(gè)訂單號的消息就只會(huì)被一個(gè)消費(fèi)者順序消費(fèi),從而保證了消息的順序性。如下圖:
Kafka:
對于 Kafka 來說,一個(gè) topic 下同一個(gè) partition 中的消息肯定是有序的,生產(chǎn)者在寫的時(shí)候可以指定一個(gè) key,通過我們會(huì)用訂單號作為 key,這個(gè) key 對應(yīng)的消息都會(huì)發(fā)送到同一個(gè) partition 中,所以消費(fèi)者消費(fèi)到的消息也一定是有序的。
那么為什么 Kafka 還會(huì)存在消息錯(cuò)亂的問題呢?問題就出在消費(fèi)者身上。通常我們消費(fèi)到同一個(gè) key 的多條消息后,會(huì)使用多線程技術(shù)去并發(fā)處理來提高消息處理速度,否則一條消息的處理需要耗時(shí)幾十 毫秒,1 秒也就只能處理幾十條消息,吞吐量就太低了。而多線程并發(fā)處理的話,binlog 執(zhí)行到數(shù)據(jù)庫的時(shí)候就不一定還是原來的順序了。如下圖:
Kafka 從生產(chǎn)者到消費(fèi)者消費(fèi)消息這一整個(gè)過程其實(shí)都是可以保證有序的,導(dǎo)致最終亂序是由于消費(fèi)者端需要使用多線程并發(fā)處理消息來提高吞吐量,比如消費(fèi)者消費(fèi)到了消息以后,開啟 32 個(gè)線程處理消息,每個(gè)線程線程處理消息的快慢是不一致的,所以才會(huì)導(dǎo)致最終消息有可能不一致。
所以對于 Kafka 的消息順序性保證,其實(shí)我們只需要保證同一個(gè)訂單號的消息只被同一個(gè)線程處理的就可以了。由此我們可以在線程處理前增加個(gè)內(nèi)存隊(duì)列,每個(gè)線程只負(fù)責(zé)處理其中一個(gè)內(nèi)存隊(duì)列的消息,同一個(gè)訂單號的消息發(fā)送到同一個(gè)內(nèi)存隊(duì)列中即可。如下圖:
RocketMQ:
對于 RocketMQ 來說,每個(gè) Topic 可以指定多個(gè) MessageQueue,當(dāng)我們寫入消息的時(shí)候,會(huì)把消息均勻地分發(fā)到不同的 MessageQueue 中,比如同一個(gè)訂單號的消息,增加 binlog 寫入到 MessageQueue1 中,修改 binlog 寫入到 MessageQueue2 中,刪除 binlog 寫入到 MessageQueue3 中。
但是當(dāng)消費(fèi)者有多臺機(jī)器的時(shí)候,會(huì)組成一個(gè) Consumer Group,Consumer Group 中的每臺機(jī)器都會(huì)負(fù)責(zé)消費(fèi)一部分 MessageQueue 的消息,所以可能消費(fèi)者A消費(fèi)了 MessageQueue1 的消息執(zhí)行增加操作,消費(fèi)者B消費(fèi)了 MessageQueue2 的消息執(zhí)行修改操作,消費(fèi)者C消費(fèi)了 MessageQueue3 的消息執(zhí)行刪除操作,但是此時(shí)消費(fèi) binlog 執(zhí)行到數(shù)據(jù)庫的時(shí)候就不一定是消費(fèi)者A先執(zhí)行了,有可能消費(fèi)者C先執(zhí)行刪除操作,因?yàn)閹着_消費(fèi)者是并行執(zhí)行,是不能夠保證他們之間的執(zhí)行順序的。如下圖:
RocketMQ 的消息亂序是由于同一個(gè)訂單號的 binlog 進(jìn)入了不同的 MessageQueue,進(jìn)而導(dǎo)致一個(gè)訂單的 binlog 被不同機(jī)器上的 Consumer 處理。
要解決 RocketMQ 的亂序問題,我們只需要想辦法讓同一個(gè)訂單的 binlog 進(jìn)入到同一個(gè) MessageQueue 中就可以了。因?yàn)橥粋€(gè) MessageQueue 內(nèi)的消息是一定有序的,一個(gè) MessageQueue 中的消息只能交給一個(gè) Consumer 來進(jìn)行處理,所以 Consumer 消費(fèi)的時(shí)候就一定會(huì)是有序的。
2.4 消息隊(duì)列如何保證消息不丟?
參考答案
丟數(shù)據(jù)一般分為兩種,一種是mq把消息丟了,一種就是消費(fèi)時(shí)將消息丟了。下面從rabbitmq和kafka分別說一下,丟失數(shù)據(jù)的場景。
RabbitMQ:
RabbitMQ丟失消息分為如下幾種情況:
生產(chǎn)者丟消息:
生產(chǎn)者將數(shù)據(jù)發(fā)送到RabbitMQ的時(shí)候,可能在傳輸過程中因?yàn)榫W(wǎng)絡(luò)等問題而將數(shù)據(jù)弄丟了。
RabbitMQ自己丟消息:
如果沒有開啟RabbitMQ的持久化,那么RabbitMQ一旦重啟數(shù)據(jù)就丟了。所以必須開啟持久化將消息持久化到磁盤,這樣就算RabbitMQ掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟失。除非極其罕見的情況,RabbitMQ還沒來得及持久化自己就掛了,這樣可能導(dǎo)致一部分?jǐn)?shù)據(jù)丟失。
消費(fèi)端丟消息:
主要是因?yàn)橄M(fèi)者消費(fèi)時(shí),剛消費(fèi)到還沒有處理,結(jié)果消費(fèi)者就掛了,這樣你重啟之后,RabbitMQ就認(rèn)為你已經(jīng)消費(fèi)過了,然后就丟了數(shù)據(jù)。
針對上述三種情況,RabbitMQ可以采用如下方式避免消息丟失:
生產(chǎn)者丟消息:
- 可以選擇使用RabbitMQ提供是事務(wù)功能,就是生產(chǎn)者在發(fā)送數(shù)據(jù)之前開啟事務(wù),然后發(fā)送消息,如果消息沒有成功被RabbitMQ接收到,那么生產(chǎn)者會(huì)受到異常報(bào)錯(cuò),這時(shí)就可以回滾事務(wù),然后嘗試重新發(fā)送。如果收到了消息,那么就可以提交事務(wù)。這種方式有明顯的缺點(diǎn),即RabbitMQ事務(wù)開啟后,就會(huì)變?yōu)橥阶枞僮?#xff0c;生產(chǎn)者會(huì)阻塞等待是否發(fā)送成功,太耗性能會(huì)造成吞吐量的下降。
- 可以開啟confirm模式。在生產(chǎn)者那里設(shè)置開啟了confirm模式之后,每次寫的消息都會(huì)分配一個(gè)唯一的id,然后如何寫入了RabbitMQ之中,RabbitMQ會(huì)給你回傳一個(gè)ack消息,告訴你這個(gè)消息發(fā)送OK了。如果RabbitMQ沒能處理這個(gè)消息,會(huì)回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id,如果超過一定時(shí)間還沒接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)。
事務(wù)機(jī)制是同步的,你提交了一個(gè)事物之后會(huì)阻塞住,但是confirm機(jī)制是異步的,發(fā)送消息之后可以接著發(fā)送下一個(gè)消息,然后RabbitMQ會(huì)回調(diào)告知成功與否。 一般在生產(chǎn)者這塊避免丟失,都是用confirm機(jī)制。
RabbitMQ自己丟消息:
設(shè)置消息持久化到磁盤,設(shè)置持久化有兩個(gè)步驟:
- 創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
- 發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2,這樣消息就會(huì)被設(shè)為持久化方式,此時(shí)RabbitMQ就會(huì)將消息持久化到磁盤上。 必須要同時(shí)開啟這兩個(gè)才可以。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來,只有消息持久化到了磁盤之后,才會(huì)通知生產(chǎn)者ack,這樣就算是在持久化之前RabbitMQ掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)。
消費(fèi)端丟消息:
使用RabbitMQ提供的ack機(jī)制,首先關(guān)閉RabbitMQ的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后,在代碼里手動(dòng)調(diào)用ack。這樣就可以避免消息還沒有處理完就ack。
Kafka:
Kafka丟失消息分為如下幾種情況:
生產(chǎn)者丟消息:
生產(chǎn)者沒有設(shè)置相應(yīng)的策略,發(fā)送過程中丟失數(shù)據(jù)。
Kafka自己丟消息:
比較常見的一個(gè)場景,就是Kafka的某個(gè)broker宕機(jī)了,然后重新選舉partition的leader時(shí)。如果此時(shí)follower還沒來得及同步數(shù)據(jù),leader就掛了,然后某個(gè)follower成為了leader,它就少了一部分?jǐn)?shù)據(jù)。
消費(fèi)端丟消息:
消費(fèi)者消費(fèi)到了這個(gè)數(shù)據(jù),然后消費(fèi)之自動(dòng)提交了offset,讓Kafka知道你已經(jīng)消費(fèi)了這個(gè)消息,當(dāng)你準(zhǔn)備處理這個(gè)消息時(shí),自己掛掉了,那么這條消息就丟了。
針對上述三種情況,Kafka可以采用如下方式避免消息丟失:
生產(chǎn)者丟消息:
關(guān)閉自動(dòng)提交offset,在自己處理完畢之后手動(dòng)提交offset,這樣就不會(huì)丟失數(shù)據(jù)。
Kafka自己丟消息:
一般要求設(shè)置4個(gè)參數(shù)來保證消息不丟失:
- 給topic設(shè)置 replication.factor 參數(shù),這個(gè)值必須大于1,表示要求每個(gè)partition必須至少有2個(gè)副本。
- 在kafka服務(wù)端設(shè)置 min.isync.replicas 參數(shù),這個(gè)值必須大于1,表示 要求一個(gè)leader至少感知到有至少一個(gè)follower在跟自己保持聯(lián)系正常同步數(shù)據(jù),這樣才能保證leader掛了之后還有一個(gè)follower。
- 在生產(chǎn)者端設(shè)置 acks=all ,表示 要求每條每條數(shù)據(jù),必須是寫入所有replica副本之后,才能認(rèn)為是寫入成功了。
- 在生產(chǎn)者端設(shè)置 retries=MAX (很大的一個(gè)值),表示這個(gè)是要求一旦寫入事變,就無限重試。
消費(fèi)端丟消息:
如果按照上面設(shè)置了ack=all,則一定不會(huì)丟失數(shù)據(jù),要求是,你的leader接收到消息,所有的follower都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒滿足這個(gè)條件,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無限次。
數(shù)據(jù),大數(shù)據(jù)量的數(shù)據(jù)處理上。
架構(gòu)模型方面
RabbitMQ:以broker為中心,有消息的確認(rèn)機(jī)制。
Kafka:以consumer為中心,沒有消息的確認(rèn)機(jī)制。
吞吐量方面
RabbitMQ:支持消息的可靠的傳遞,支持事務(wù),不支持批量操作,基于存儲(chǔ)的可靠性的要求存儲(chǔ)可以采用內(nèi)存或硬盤,吞吐量小。
Kafka:內(nèi)部采用消息的批量處理,數(shù)據(jù)的存儲(chǔ)和獲取是本地磁盤順序批量操作,消息處理的效率高,吞吐量高。
集群負(fù)載均衡方面
RabbitMQ:本身不支持負(fù)載均衡,需要loadbalancer的支持。
Kafka:采用zookeeper對集群中的broker,consumer進(jìn)行管理,可以注冊topic到zookeeper上,通過zookeeper的協(xié)調(diào)機(jī)制,producer保存對應(yīng)的topic的broker信息,可以隨機(jī)或者輪詢發(fā)送到broker上,producer可以基于語義指定分片,消息發(fā)送到broker的某個(gè)分片上。
2.5 消息隊(duì)列如何保證不重復(fù)消費(fèi)?
參考答案
先大概說一說可能會(huì)有哪些重復(fù)消費(fèi)的問題。首先就是比如rabbitmq、rocketmq、kafka,都有可能會(huì)出現(xiàn)消費(fèi)重復(fù)消費(fèi)的問題,正常。因?yàn)檫@問題通常不是mq自己保證的,是給你保證的。然后我們挑一個(gè)kafka來舉個(gè)例子,說說怎么重復(fù)消費(fèi)吧。
kafka實(shí)際上有個(gè)offset的概念,就是每個(gè)消息寫進(jìn)去,都有一個(gè)offset,代表他的序號,然后consumer消費(fèi)了數(shù)據(jù)之后,每隔一段時(shí)間,會(huì)把自己消費(fèi)過的消息的offset提交一下,代表我已經(jīng)消費(fèi)過了,下次我要是重啟啥的,你就讓我繼續(xù)從上次消費(fèi)到的offset來繼續(xù)消費(fèi)吧。
但是凡事總有意外,比如我們之前生產(chǎn)經(jīng)常遇到的,就是你有時(shí)候重啟系統(tǒng),看你怎么重啟了,如果碰到點(diǎn)著急的,直接kill進(jìn)程了,再重啟。這會(huì)導(dǎo)致consumer有些消息處理了,但是沒來得及提交offset,尷尬了。重啟之后,少數(shù)消息會(huì)再次消費(fèi)一次。
其實(shí)重復(fù)消費(fèi)不可怕,可怕的是你沒考慮到重復(fù)消費(fèi)之后,怎么保證冪等性。舉個(gè)例子,假設(shè)你有個(gè)系統(tǒng),消費(fèi)一條往數(shù)據(jù)庫里插入一條,要是你一個(gè)消息重復(fù)兩次,你不就插入了兩條,這數(shù)據(jù)不就錯(cuò)了?但是你要是消費(fèi)到第二次的時(shí)候,自己判斷一下已經(jīng)消費(fèi)過了,直接扔了,不就保留了一條數(shù)據(jù)?
一條數(shù)據(jù)重復(fù)出現(xiàn)兩次,數(shù)據(jù)庫里就只有一條數(shù)據(jù),這就保證了系統(tǒng)的冪等性冪等性。通俗點(diǎn)說,就一個(gè)數(shù)據(jù),或者一個(gè)請求,給你重復(fù)來多次,你得確保對應(yīng)的數(shù)據(jù)是不會(huì)改變的,不能出錯(cuò)。
想要保證不重復(fù)消費(fèi),其實(shí)還要結(jié)合業(yè)務(wù)來思考,這里給幾個(gè)思路:
還有比如基于數(shù)據(jù)庫的唯一鍵來保證重復(fù)數(shù)據(jù)不會(huì)重復(fù)插入多條,我們之前線上系統(tǒng)就有這個(gè)問題,就是拿到數(shù)據(jù)的時(shí)候,每次重啟可能會(huì)有重復(fù),因?yàn)閗afka消費(fèi)者還沒來得及提交offset,重復(fù)數(shù)據(jù)拿到了以后我們插入的時(shí)候,因?yàn)橛形ㄒ绘I約束了,所以重復(fù)數(shù)據(jù)只會(huì)插入報(bào)錯(cuò),不會(huì)導(dǎo)致數(shù)據(jù)庫中出現(xiàn)臟數(shù)據(jù)。
2.6 MQ處理消息失敗了怎么辦?
參考答案
一般生產(chǎn)環(huán)境中,都會(huì)在使用MQ的時(shí)候設(shè)計(jì)兩個(gè)隊(duì)列:一個(gè)是核心業(yè)務(wù)隊(duì)列,一個(gè)是死信隊(duì)列。核心業(yè)務(wù)隊(duì)列,就是比如專門用來讓訂單系統(tǒng)發(fā)送訂單消息的,然后另外一個(gè)死信隊(duì)列就是用來處理異常情況的。
比如說要是第三方物流系統(tǒng)故障了,此時(shí)無法請求,那么倉儲(chǔ)系統(tǒng)每次消費(fèi)到一條訂單消息,嘗試通知發(fā)貨和配送,都會(huì)遇到對方的接口報(bào)錯(cuò)。此時(shí)倉儲(chǔ)系統(tǒng)就可以把這條消息拒絕訪問,或者標(biāo)志位處理失敗!注意,這個(gè)步驟很重要。
一旦標(biāo)志這條消息處理失敗了之后,MQ就會(huì)把這條消息轉(zhuǎn)入提前設(shè)置好的一個(gè)死信隊(duì)列中。然后你會(huì)看到的就是,在第三方物流系統(tǒng)故障期間,所有訂單消息全部處理失敗,全部會(huì)轉(zhuǎn)入死信隊(duì)列。然后你的倉儲(chǔ)系統(tǒng)得專門有一個(gè)后臺線程,監(jiān)控第三方物流系統(tǒng)是否正常,能否請求的,不停的監(jiān)視。一旦發(fā)現(xiàn)對方恢復(fù)正常,這個(gè)后臺線程就從死信隊(duì)列消費(fèi)出來處理失敗的訂單,重新執(zhí)行發(fā)貨和配送的通知邏輯。死信隊(duì)列的使用,其實(shí)就是MQ在生產(chǎn)實(shí)踐中非常重要的一環(huán),也就是架構(gòu)設(shè)計(jì)必須要考慮的。
整個(gè)過程,如下圖所示:
2.7 請介紹消息隊(duì)列推和拉的使用場景
參考答案
推模式:
推模式是服務(wù)器端根據(jù)用戶需要,由目的、按時(shí)將用戶感興趣的信息主動(dòng)發(fā)送到用戶的客戶端。
優(yōu)點(diǎn):
- 對用戶要求低,方便用戶獲取需要的信息;
- 及時(shí)性好,服務(wù)器端及時(shí)地向客戶端推送更新動(dòng)態(tài)信息,吞吐量大。
缺點(diǎn):
- 不能確保發(fā)送成功,推模式采用廣播方式,只有服務(wù)器端和客戶端在同一個(gè)頻道上,推模式才有效,用戶才能接收到信息;
- 沒有信息狀態(tài)跟蹤,推模式采用開環(huán)控制技術(shù),一個(gè)信息推送后的狀態(tài),比如客戶端是否接收等,無從得知;
- 針對性較差。推送的信息可能并不能滿足客戶端的個(gè)性化需求。
拉模式:
拉模式是客戶端主動(dòng)從服務(wù)器端獲取信息。
優(yōu)點(diǎn):
- 針對性強(qiáng),能滿足客戶端的個(gè)性化需求;
- 信息傳輸量較小,網(wǎng)絡(luò)中傳輸?shù)闹皇强蛻舳说恼埱蠛头?wù)器端對該請求的響應(yīng);
- 服務(wù)器端的任務(wù)輕。服務(wù)器端只是被動(dòng)接收查詢,對客戶端的查詢請求做出響應(yīng)。
缺點(diǎn):
- 實(shí)時(shí)性較差,針對于服務(wù)器端實(shí)時(shí)更新的信息,客戶端難以獲取實(shí)時(shí)信息;
- 對于客戶端用戶的要求較高,需要對服務(wù)器端具有一定的了解。
2.8 RabbitMQ和Kafka有什么區(qū)別?
參考答案
在實(shí)際生產(chǎn)應(yīng)用中,通常會(huì)使用Kafka作為消息傳輸?shù)臄?shù)據(jù)管道,RabbitMQ作為交易數(shù)據(jù)作為數(shù)據(jù)傳輸管道,主要的取舍因素則是是否存在丟數(shù)據(jù)的可能。RabbitMQ在金融場景中經(jīng)常使用,具有較高的嚴(yán)謹(jǐn)性,數(shù)據(jù)丟失的可能性更小,同事具備更高的實(shí)時(shí)性。而Kafka優(yōu)勢主要體現(xiàn)在吞吐量上,雖然可以通過策略實(shí)現(xiàn)數(shù)據(jù)不丟失,但從嚴(yán)謹(jǐn)性角度來講,大不如RabbitMQ。而且由于Kafka保證每條消息最少送達(dá)一次,有較小的概率會(huì)出現(xiàn)數(shù)據(jù)重復(fù)發(fā)送的情況。詳細(xì)來說,它們之間主要有如下的區(qū)別:
應(yīng)用場景方面
RabbitMQ:用于實(shí)時(shí)的,對可靠性要求較高的消息傳遞上。
Kafka:用于處于活躍的流式數(shù)據(jù),大數(shù)據(jù)量的數(shù)據(jù)處理上。
架構(gòu)模型方面
RabbitMQ:以broker為中心,有消息的確認(rèn)機(jī)制。
Kafka:以consumer為中心,沒有消息的確認(rèn)機(jī)制。
吞吐量方面
RabbitMQ:支持消息的可靠的傳遞,支持事務(wù),不支持批量操作,基于存儲(chǔ)的可靠性的要求存儲(chǔ)可以采用內(nèi)存或硬盤,吞吐量小。
Kafka:內(nèi)部采用消息的批量處理,數(shù)據(jù)的存儲(chǔ)和獲取是本地磁盤順序批量操作,消息處理的效率高,吞吐量高。
集群負(fù)載均衡方面
RabbitMQ:本身不支持負(fù)載均衡,需要loadbalancer的支持。
Kafka:采用zookeeper對集群中的broker,consumer進(jìn)行管理,可以注冊topic到zookeeper上,通過zookeeper的協(xié)調(diào)機(jī)制,producer保存對應(yīng)的topic的broker信息,可以隨機(jī)或者輪詢發(fā)送到broker上,producer可以基于語義指定分片,消息發(fā)送到broker的某個(gè)分片上。
2.9 Kafka為什么速度快?
參考答案
Kafka的消息是保存或緩存在磁盤上的,一般認(rèn)為在磁盤上讀寫數(shù)據(jù)是會(huì)降低性能的,因?yàn)閷ぶ窌?huì)比較消耗時(shí)間,但是實(shí)際上,Kafka的特性之一就是高吞吐率。即使是普通的服務(wù)器,Kafka也可以輕松支持每秒百萬級的寫入請求,超過了大部分的消息中間件,這種特性也使得Kafka在日志處理等海量數(shù)據(jù)場景廣泛應(yīng)用。
下面從數(shù)據(jù)寫入和讀取兩方面分析,為什么Kafka速度這么快:
寫入數(shù)據(jù):
Kafka會(huì)把收到的消息都寫入到硬盤中,它絕對不會(huì)丟失數(shù)據(jù)。為了優(yōu)化寫入速度Kafka采用了兩個(gè)技術(shù),順序?qū)懭牒蚆MFile 。
一、順序?qū)懭?/p>
磁盤讀寫的快慢取決于你怎么使用它,也就是順序讀寫或者隨機(jī)讀寫。在順序讀寫的情況下,磁盤的順序讀寫速度和內(nèi)存持平。因?yàn)橛脖P是機(jī)械結(jié)構(gòu),每次讀寫都會(huì)尋址->寫入,其中尋址是一個(gè)“機(jī)械動(dòng)作”,它是最耗時(shí)的。所以硬盤最討厭隨機(jī)I/O,最喜歡順序I/O。為了提高讀寫硬盤的速度,Kafka就是使用順序I/O。
而且Linux對于磁盤的讀寫優(yōu)化也比較多,包括read-ahead和write-behind,磁盤緩存等。如果在內(nèi)存做這些操作的時(shí)候,一個(gè)是JAVA對象的內(nèi)存開銷很大,另一個(gè)是隨著堆內(nèi)存數(shù)據(jù)的增多,JAVA的GC時(shí)間會(huì)變得很長,使用磁盤操作有以下幾個(gè)好處:
下圖就展示了Kafka是如何寫入數(shù)據(jù)的, 每一個(gè)Partition其實(shí)都是一個(gè)文件 ,收到消息后Kafka會(huì)把數(shù)據(jù)插入到文件末尾(虛框部分):
這種方法有一個(gè)缺陷——沒有辦法刪除數(shù)據(jù) ,所以Kafka是不會(huì)刪除數(shù)據(jù)的,它會(huì)把所有的數(shù)據(jù)都保留下來,每個(gè)消費(fèi)者(Consumer)對每個(gè)Topic都有一個(gè)offset用來表示讀取到了第幾條數(shù)據(jù) 。
二、Memory Mapped Files
即便是順序?qū)懭胗脖P,硬盤的訪問速度還是不可能追上內(nèi)存。所以Kafka的數(shù)據(jù)并不是實(shí)時(shí)的寫入硬盤 ,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲(chǔ)來利用內(nèi)存提高I/O效率。Memory Mapped Files(后面簡稱mmap)也被翻譯成 內(nèi)存映射文件,在64位操作系統(tǒng)中一般可以表示20G的數(shù)據(jù)文件,它的工作原理是直接利用操作系統(tǒng)的Page來實(shí)現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會(huì)被同步到硬盤上(操作系統(tǒng)在適當(dāng)?shù)臅r(shí)候)。
通過mmap,進(jìn)程像讀寫硬盤一樣讀寫內(nèi)存(當(dāng)然是虛擬機(jī)內(nèi)存),也不必關(guān)心內(nèi)存的大小有虛擬內(nèi)存為我們兜底。使用這種方式可以獲取很大的I/O提升,省去了用戶空間到內(nèi)核空間復(fù)制的開銷(調(diào)用文件的read會(huì)把數(shù)據(jù)先放到內(nèi)核空間的內(nèi)存中,然后再復(fù)制到用戶空間的內(nèi)存中。)
但也有一個(gè)很明顯的缺陷——不可靠,寫到mmap中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會(huì)在程序主動(dòng)調(diào)用flush的時(shí)候才把數(shù)據(jù)真正的寫到硬盤。Kafka提供了一個(gè)參數(shù)——producer.type來控制是不是主動(dòng)flush,如果Kafka寫入到mmap之后就立即flush然后再返回Producer叫 同步 (sync);寫入mmap之后立即返回Producer不調(diào)用flush叫異步 (async)。
讀取數(shù)據(jù):
一、基于sendfile實(shí)現(xiàn)Zero Copy
傳統(tǒng)模式下,當(dāng)需要對一個(gè)文件進(jìn)行傳輸?shù)臅r(shí)候,其具體流程細(xì)節(jié)如下:
- 調(diào)用read函數(shù),文件數(shù)據(jù)被copy到內(nèi)核緩沖區(qū);
- read函數(shù)返回,文件數(shù)據(jù)從內(nèi)核緩沖區(qū)copy到用戶緩沖區(qū);
- write函數(shù)調(diào)用,將文件數(shù)據(jù)從用戶緩沖區(qū)copy到內(nèi)核與socket相關(guān)的緩沖區(qū);
- 數(shù)據(jù)從socket緩沖區(qū)copy到相關(guān)協(xié)議引擎。
以上細(xì)節(jié)是傳統(tǒng)read/write方式進(jìn)行網(wǎng)絡(luò)文件傳輸?shù)姆绞?#xff0c;我們可以看到,在這個(gè)過程當(dāng)中,文件數(shù)據(jù)實(shí)際上是經(jīng)過了四次copy操作:硬盤->內(nèi)核buf->用戶buf->socket相關(guān)緩沖區(qū)->協(xié)議引擎。而sendfile系統(tǒng)調(diào)用則提供了一種減少以上多次copy,提升文件傳輸性能的方法。
在內(nèi)核版本2.1中,引入了sendfile系統(tǒng)調(diào)用,以簡化網(wǎng)絡(luò)上和兩個(gè)本地文件之間的數(shù)據(jù)傳輸。sendfile的引入不僅減少了數(shù)據(jù)復(fù)制,還減少了上下文切換。運(yùn)行流程如下:
- sendfile系統(tǒng)調(diào)用,文件數(shù)據(jù)被copy至內(nèi)核緩沖區(qū);
- 再從內(nèi)核緩沖區(qū)copy至內(nèi)核中socket相關(guān)的緩沖區(qū);
- 最后再socket相關(guān)的緩沖區(qū)copy到協(xié)議引擎。
相較傳統(tǒng)read/write方式,2.1版本內(nèi)核引進(jìn)的sendfile已經(jīng)減少了內(nèi)核緩沖區(qū)到user緩沖區(qū),再由user緩沖區(qū)到socket相關(guān)緩沖區(qū)的文件copy,而在內(nèi)核版本2.4之后,文件描述符結(jié)果被改變,sendfile實(shí)現(xiàn)了更簡單的方式,再次減少了一次copy操作。
在Apache、Nginx、lighttpd等web服務(wù)器當(dāng)中,都有一項(xiàng)sendfile相關(guān)的配置,使用sendfile可以大幅提升文件傳輸性能。Kafka把所有的消息都存放在一個(gè)一個(gè)的文件中,當(dāng)消費(fèi)者需要數(shù)據(jù)的時(shí)候Kafka直接把文件發(fā)送給消費(fèi)者,配合mmap作為文件讀寫方式,直接把它傳給sendfile。
二、批量壓縮
在很多情況下,系統(tǒng)的瓶頸不是CPU或磁盤,而是網(wǎng)絡(luò)IO,對于需要在廣域網(wǎng)上的數(shù)據(jù)中心之間發(fā)送消息的數(shù)據(jù)流水線尤其如此。進(jìn)行數(shù)據(jù)壓縮會(huì)消耗少量的CPU資源,不過對于kafka而言,網(wǎng)絡(luò)IO更應(yīng)該需要考慮。
- 如果每個(gè)消息都壓縮,但是壓縮率相對很低,所以Kafka使用了批量壓縮,即將多個(gè)消息一起壓縮而不是單個(gè)消息壓縮;
- Kafka允許使用遞歸的消息集合,批量的消息可以通過壓縮的形式傳輸并且在日志中也可以保持壓縮格式,直到被消費(fèi)者解壓縮;
- Kafka支持多種壓縮協(xié)議,包括Gzip和Snappy壓縮協(xié)議。
總結(jié):
Kafka速度的秘訣在于,它把所有的消息都變成一個(gè)批量的文件,并且進(jìn)行合理的批量壓縮,減少網(wǎng)絡(luò)IO損耗,通過mmap提高I/O速度,寫入數(shù)據(jù)的時(shí)候由于單個(gè)Partion是末尾添加所以速度最優(yōu)。讀取數(shù)據(jù)的時(shí)候配合sendfile直接暴力輸出。
2.10 RabbitMQ如何保證消息已達(dá)?
參考答案
RabbitMQ可能丟失消息分為如下幾種情況:
生產(chǎn)者丟消息:
生產(chǎn)者將數(shù)據(jù)發(fā)送到RabbitMQ的時(shí)候,可能在傳輸過程中因?yàn)榫W(wǎng)絡(luò)等問題而將數(shù)據(jù)弄丟了。
RabbitMQ自己丟消息:
如果沒有開啟RabbitMQ的持久化,那么RabbitMQ一旦重啟數(shù)據(jù)就丟了。所以必須開啟持久化將消息持久化到磁盤,這樣就算RabbitMQ掛了,恢復(fù)之后會(huì)自動(dòng)讀取之前存儲(chǔ)的數(shù)據(jù),一般數(shù)據(jù)不會(huì)丟失。除非極其罕見的情況,RabbitMQ還沒來得及持久化自己就掛了,這樣可能導(dǎo)致一部分?jǐn)?shù)據(jù)丟失。
消費(fèi)端丟消息:
主要是因?yàn)橄M(fèi)者消費(fèi)時(shí),剛消費(fèi)到還沒有處理,結(jié)果消費(fèi)者就掛了,這樣你重啟之后,RabbitMQ就認(rèn)為你已經(jīng)消費(fèi)過了,然后就丟了數(shù)據(jù)。
針對上述三種情況,RabbitMQ可以采用如下方式避免消息丟失:
生產(chǎn)者丟消息:
- 可以選擇使用RabbitMQ提供是事務(wù)功能,就是生產(chǎn)者在發(fā)送數(shù)據(jù)之前開啟事務(wù),然后發(fā)送消息,如果消息沒有成功被RabbitMQ接收到,那么生產(chǎn)者會(huì)受到異常報(bào)錯(cuò),這時(shí)就可以回滾事務(wù),然后嘗試重新發(fā)送。如果收到了消息,那么就可以提交事務(wù)。這種方式有明顯的缺點(diǎn),即RabbitMQ事務(wù)開啟后,就會(huì)變?yōu)橥阶枞僮?#xff0c;生產(chǎn)者會(huì)阻塞等待是否發(fā)送成功,太耗性能會(huì)造成吞吐量的下降。
- 可以開啟confirm模式。在生產(chǎn)者那里設(shè)置開啟了confirm模式之后,每次寫的消息都會(huì)分配一個(gè)唯一的id,然后如何寫入了RabbitMQ之中,RabbitMQ會(huì)給你回傳一個(gè)ack消息,告訴你這個(gè)消息發(fā)送OK了。如果RabbitMQ沒能處理這個(gè)消息,會(huì)回調(diào)你一個(gè)nack接口,告訴你這個(gè)消息失敗了,你可以進(jìn)行重試。而且你可以結(jié)合這個(gè)機(jī)制知道自己在內(nèi)存里維護(hù)每個(gè)消息的id,如果超過一定時(shí)間還沒接收到這個(gè)消息的回調(diào),那么你可以進(jìn)行重發(fā)。
事務(wù)機(jī)制是同步的,你提交了一個(gè)事物之后會(huì)阻塞住,但是confirm機(jī)制是異步的,發(fā)送消息之后可以接著發(fā)送下一個(gè)消息,然后RabbitMQ會(huì)回調(diào)告知成功與否。 一般在生產(chǎn)者這塊避免丟失,都是用confirm機(jī)制。
RabbitMQ自己丟消息:
設(shè)置消息持久化到磁盤,設(shè)置持久化有兩個(gè)步驟:
- 創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
- 發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2,這樣消息就會(huì)被設(shè)為持久化方式,此時(shí)RabbitMQ就會(huì)將消息持久化到磁盤上。 必須要同時(shí)開啟這兩個(gè)才可以。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來,只有消息持久化到了磁盤之后,才會(huì)通知生產(chǎn)者ack,這樣就算是在持久化之前RabbitMQ掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)。
消費(fèi)端丟消息:
息,然后RabbitMQ會(huì)回調(diào)告知成功與否。 一般在生產(chǎn)者這塊避免丟失,都是用confirm機(jī)制。
RabbitMQ自己丟消息:
設(shè)置消息持久化到磁盤,設(shè)置持久化有兩個(gè)步驟:
- 創(chuàng)建queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證RabbitMQ持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里面的數(shù)據(jù)。
- 發(fā)送消息的時(shí)候講消息的deliveryMode設(shè)置為2,這樣消息就會(huì)被設(shè)為持久化方式,此時(shí)RabbitMQ就會(huì)將消息持久化到磁盤上。 必須要同時(shí)開啟這兩個(gè)才可以。
而且持久化可以跟生產(chǎn)的confirm機(jī)制配合起來,只有消息持久化到了磁盤之后,才會(huì)通知生產(chǎn)者ack,這樣就算是在持久化之前RabbitMQ掛了,數(shù)據(jù)丟了,生產(chǎn)者收不到ack回調(diào)也會(huì)進(jìn)行消息重發(fā)。
消費(fèi)端丟消息:
使用RabbitMQ提供的ack機(jī)制,首先關(guān)閉RabbitMQ的自動(dòng)ack,然后每次在確保處理完這個(gè)消息之后,在代碼里手動(dòng)調(diào)用ack。這樣就可以避免消息還沒有處理完就ack。
總結(jié)
- 上一篇: 面向对象语言的技术特点
- 下一篇: 2022届互联网秋招备战