RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失
1. 消息丟失源頭
RabbitMQ 消息丟失的源頭主要有以下三個(gè):
- 生產(chǎn)者丟失消息
RabbitMQ丟失消息- 消費(fèi)者丟失消息
下面主要從 3 個(gè)方面進(jìn)行說(shuō)明并提供應(yīng)對(duì)措施
2. 生產(chǎn)者丟失消息
RabbitMQ 生產(chǎn)者將數(shù)據(jù)發(fā)送到 rabbitmq 的時(shí)候,可能數(shù)據(jù)在網(wǎng)絡(luò)傳輸中搞丟了,這個(gè)時(shí)候 RabbitMQ 收不到消息,消息就丟了。
解決方法:
2.1 事務(wù)方式
在生產(chǎn)者發(fā)送消息之前,通過(guò) channel.txSelect 開(kāi)啟一個(gè)事務(wù),接著發(fā)送消息,
- 如果消息沒(méi)有成功被
RabbitMQ接收到,生產(chǎn)者會(huì)收到異常,此時(shí)就可以進(jìn)行事務(wù)回滾channel.txRollback然后重新發(fā)送; - 假如
RabbitMQ收到了這個(gè)消息,就可以提交事務(wù)channel.txCommit;
但是這樣一來(lái),生產(chǎn)者的吞吐量和性能都會(huì)降低很多,現(xiàn)在一般不這么干。
2.2 confirm 機(jī)制
confirm 機(jī)制是在生產(chǎn)者設(shè)置的,就是每次寫(xiě)消息的時(shí)候會(huì)分配一個(gè)唯一的 id,然后 RabbitMQ 收到之后會(huì)回傳一個(gè) ack,告訴生產(chǎn)者這個(gè)消息 ok 了。如果 rabbitmq 沒(méi)有處理到這個(gè)消息,那么就回調(diào)一個(gè) nack 的接口,這個(gè)時(shí)候生產(chǎn)者就可以重發(fā)。
事務(wù)機(jī)制和 confirm 機(jī)制最大的不同點(diǎn):
- 事務(wù)機(jī)制是同步的,提交一個(gè)事務(wù)之后會(huì)阻塞在那兒,它性能太差,官方主動(dòng)棄用;
- 但是
confirm機(jī)制是異步的,發(fā)送一個(gè)消息之后就可以發(fā)送下一個(gè)消息,然后那個(gè)消息rabbitmq接收了之后會(huì)異步回調(diào)你一個(gè)接口通知你這個(gè)消息接收到了;
所以一般在生產(chǎn)者這塊避免數(shù)據(jù)丟失,都是用 confirm 機(jī)制的。
在 confirm 機(jī)制下,我們可以將 channel 設(shè)置成 confirm 模式,一旦 channel 進(jìn)入 confirm 模式,所有在該 channel 上面發(fā)布的消息都將會(huì)被指派一個(gè)唯一的 ID(從 1 開(kāi)始),一旦消息被投遞到所有匹配的隊(duì)列之后,RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息的唯一 ID),這就使得生產(chǎn)者知道消息已經(jīng)正確到達(dá)目的隊(duì)列了,如果消息和隊(duì)列是可持久化的,那么確認(rèn)消息會(huì)在將消息寫(xiě)入磁盤(pán)之后發(fā)出,RabbitMQ 回傳給生產(chǎn)者的確認(rèn)消息中 delivery-tag 域包含了確認(rèn)消息的序列號(hào),此外 RabbitMQ 也可以設(shè)置 basic.ack 的 multiple 域,表示到這個(gè)序列號(hào)之前的所有消息都已經(jīng)得到了處理。
confirm 模式最大的好處在于它是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息,如果 RabbitMQ 因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失,就會(huì)發(fā)送一條 nack 消息,生產(chǎn)者應(yīng)用程序同樣可以在回調(diào)方法中處理該 nack 消息。
confirm 機(jī)制和 transaction 事務(wù)模式是不能夠共存的,已經(jīng)處于 transaction 事務(wù)模式的 channel不能被設(shè)置為 confirm模式,同理,反過(guò)來(lái)也一樣。
通常我們可以通過(guò)調(diào)用 channel 的 confirmSelect 方法將 channel 設(shè)置為 confirm 模式。如果沒(méi)有設(shè)置 no-wait 標(biāo)志的話,RabbitMQ會(huì)返回 confirm.select-ok 表示同意生產(chǎn)者當(dāng)前 channel 信道設(shè)置為 confirm 模式。
客戶端生產(chǎn)者側(cè):生產(chǎn)者將消息發(fā)送到 RabbitMQ 然后寫(xiě)入到磁盤(pán)后通知生成者已收到生產(chǎn)者消息,保證生產(chǎn)者發(fā)送的消息不會(huì)丟失。
支持兩種通知方式:
- 同步方式,即每發(fā)一條消息生成者等待
RabbitMQ確認(rèn)后再繼續(xù)發(fā)送消息; - 異步方式,即生產(chǎn)者提供回調(diào)函數(shù)入口,生產(chǎn)者發(fā)送完消息后不等待
RabbitMQ回應(yīng)繼續(xù)發(fā)送消息,RabbitMQ會(huì)回調(diào)通知生產(chǎn)者是否收到消息,一般實(shí)際生產(chǎn)環(huán)境用此方式比較多。
import com.rabbitmq.client.Channel;import com.rabbitmq.client.ConfirmListener;import com.rabbitmq.client.Connection;public class ConfirmSend {private static String exchange_name = "";private static String queue_name = "tx_queue";/*** confirm機(jī)制:確認(rèn)publisher發(fā)送消息到broker,由broker進(jìn)行應(yīng)答(不能確認(rèn)是否被有效消費(fèi))* confirmSelect,進(jìn)入confirm消息確認(rèn)模式* ,確認(rèn)方式:1、異步ConfirmListener;2、同步waitForConfirms* ConfirmListener、waitForConfirms均需要配合confirm機(jī)制使用* @param mes* @throws Exception*/public static void txSend(Serializable mes) throws Exception {Connection conn = MqManager.newConnection();Channel channel = conn.createChannel();// 開(kāi)啟confirm機(jī)制channel.confirmSelect();channel.queueDeclare(queue_name, false, false, true, null);// 異步實(shí)現(xiàn)發(fā)送消息的確認(rèn)(此部分的消息確認(rèn)是指發(fā)送消息到隊(duì)列,并非確認(rèn)消息的有效消費(fèi))channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {// multiple:測(cè)試發(fā)現(xiàn)multiple隨機(jī)true或false,原因未知System.out.println("Nack deliveryTag:" + deliveryTag + ",multiple:" + multiple);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("Ack deliveryTag:" + deliveryTag + ",multiple:" + multiple);}});for (int i = 0; i < 10; i++) {System.out.println("---------消息發(fā)送-----");channel.basicPublish(exchange_name, queue_name, null, SerializationUtils.serialize(mes.toString() + i));}// channel.waitForConfirms();//同步實(shí)現(xiàn)發(fā)送消息的確認(rèn)System.out.println("-----------");channel.close();conn.close();}public static void main(String[] args) throws Exception {txSend("hello world!");}}
3. RabbitMQ 丟失消息
RabbitMQ 集群也會(huì)弄丟消息,就是說(shuō)在消息發(fā)送到 RabbitMQ 之后,默認(rèn)是沒(méi)有保存到磁盤(pán)的,萬(wàn)一 RabbitMQ 宕機(jī)了,這個(gè)時(shí)候消息就丟失了。
所以為了解決這個(gè)問(wèn)題,RabbitMQ 提供了一個(gè)持久化的機(jī)制,消息寫(xiě)入之后會(huì)持久化到磁盤(pán),哪怕是宕機(jī)了,恢復(fù)之后也會(huì)自動(dòng)恢復(fù)之前存儲(chǔ)的數(shù)據(jù),這樣的機(jī)制可以確保消息不會(huì)丟失。
設(shè)置持久化步驟:
- 創(chuàng)建
queue的時(shí)候?qū)⑵湓O(shè)置為持久化的,這樣就可以保證rabbitmq持久化queue的元數(shù)據(jù),但是不會(huì)持久化queue里的數(shù)據(jù); - 發(fā)送消息的時(shí)候?qū)⑾⒌?
deliveryMode設(shè)置為 2,就是將消息設(shè)置為持久化的,此時(shí)rabbitmq就會(huì)將消息持久化到磁盤(pán)上去。
但是這樣一來(lái)可能會(huì)有人說(shuō):萬(wàn)一消息發(fā)送到 RabbitMQ 之后,還沒(méi)來(lái)得及持久化到磁盤(pán)就掛掉了,數(shù)據(jù)也丟失了。
對(duì)于這個(gè)問(wèn)題,其實(shí)是配合上面的 confirm 機(jī)制一起來(lái)保證的,就是在消息持久化到磁盤(pán)之后才會(huì)給生產(chǎn)者發(fā)送 ack 消息。
RabbitMQ 主要是采用持久化的方式保證消息不丟,啟用隊(duì)列、交換機(jī)、消息的持久化,確保不會(huì)因?yàn)?RabbitMQ 服務(wù)器的宕機(jī)導(dǎo)致消息丟失。
channel.basicPublish(exchange_name, "routingKey",true, MessageProperties.PERSISTENT_BASIC, "xiao ming".getBytes());
MessageProperties.PERSISTENT_BASIC 即可表示消息是要進(jìn)行持久化的。
消息持久化成功的條件:
- 投遞消息的時(shí)候
durable設(shè)置為true,消息持久化,代碼:
channel.queueDeclare(x, true, false, false, null)
參數(shù) 2 設(shè)置為 true 持久化;
- 設(shè)置投遞模式
deliveryMode設(shè)置為 2(持久),代碼:
channel.basicPublish(x, x, MessageProperties.PERSISTENTTEXTPLAIN,x)
參數(shù) 3 設(shè)置為存儲(chǔ)純文本到磁盤(pán);
- 消息已經(jīng)到達(dá)持久化交換器上;
- 消息已經(jīng)到達(dá)持久化的隊(duì)列;
四個(gè)條件都需要滿足。
持久化工作原理:
RabbitMQ 會(huì)將你的持久化消息寫(xiě)入磁盤(pán)上的持久化日志文件,等消息被消費(fèi)之后,RabbitMQ 會(huì)把這條消息標(biāo)識(shí)為等待垃圾回收。
持久化的缺點(diǎn):
消息持久化的優(yōu)點(diǎn)顯而易見(jiàn),但缺點(diǎn)也很明顯,那就是性能,因?yàn)橐獙?xiě)入硬盤(pán)要比寫(xiě)入內(nèi)存性能較低很多,從而降低了服務(wù)器的吞吐量,盡管使用 SSD 硬盤(pán)可以使事情得到緩解,但他仍然吸干了 RabbitMQ 的性能,當(dāng)消息成千上萬(wàn)條要寫(xiě)入磁盤(pán)的時(shí)候,性能是很低的。
4. 消費(fèi)者丟失消息
RabbitMQ 消費(fèi)者在消費(fèi)消息的時(shí)候,剛拿到消息,結(jié)果進(jìn)程掛了,這個(gè)時(shí)候 RabbitMQ 就會(huì)認(rèn)為你已經(jīng)消費(fèi)成功了,這條數(shù)據(jù)就丟了。
RabbitMQ 提供了一個(gè)消息確認(rèn)的概念:當(dāng)一個(gè)消息從隊(duì)列中投遞給消費(fèi)者后,消費(fèi)者會(huì)通知一下消息中間件(RabbitMQ),這個(gè)可以是系統(tǒng)自動(dòng) autoACK 的也可以由處理消息的應(yīng)用操作。
當(dāng)“消息確認(rèn)”被啟用的時(shí)候,RabbitMQ 不會(huì)完全將消息從隊(duì)列中刪除,直到它收到來(lái)自消費(fèi)者的確認(rèn)回執(zhí)(acknowledgement)。
為了解決這個(gè)問(wèn)題,RabbitMQ 提供了 2 種處理模式來(lái)解決這個(gè)問(wèn)題:
- 自動(dòng)確認(rèn)模式:當(dāng)
RabbbitMQ將消息發(fā)送給應(yīng)用后,消費(fèi)者端自動(dòng)回送一個(gè)確認(rèn)消息。(使用 方法:basic.deliver或basic.get-ok)。 - 顯式確認(rèn)模式:
RabbitMQ不會(huì)完全將消息從隊(duì)列中刪除,直到消費(fèi)者發(fā)送一個(gè)確認(rèn)回執(zhí)(acknowledgement)后再刪除消息。(使用方法:basic.ack)。
在顯式確認(rèn)模式下,消費(fèi)者可以自由選擇什么時(shí)候發(fā)送確認(rèn)回執(zhí)(acknowledgement)。消費(fèi)者可以在收到消息后立即發(fā)送,或?qū)⑽刺幚淼南⒋鎯?chǔ)后發(fā)送,或等到消息被處理完畢后再發(fā)送確認(rèn)回執(zhí)。
如果一個(gè)消費(fèi)者在尚未發(fā)送確認(rèn)回執(zhí)的情況下掛掉了,那 RabbitMQ 會(huì)將消息重新投遞給另一個(gè)消費(fèi)者。如果當(dāng)時(shí)沒(méi)有可用的消費(fèi)者了,消息代理會(huì)死等下一個(gè)注冊(cè)到此隊(duì)列的消費(fèi)者,然后再次嘗試投遞。
消費(fèi)者在獲取隊(duì)列消息時(shí),可以指定 autoAck 參數(shù),采用顯式確認(rèn)模式,需要指定 autoAck = false,在顯式確認(rèn)模式,RabbitMQ 不會(huì)為未 ack 的消息設(shè)置超時(shí)時(shí)間,它判斷此消息是否需要重新投遞給消費(fèi)者的唯一依據(jù)是消費(fèi)該消息的消費(fèi)者連接是否已經(jīng)斷開(kāi)。如果斷開(kāi)連接,RabbitMQ 也沒(méi)有收到 ACK,則 RabbitMQ 會(huì)安排該消息重新進(jìn)入隊(duì)列,等待投遞給下一個(gè)消費(fèi)者。
但是默認(rèn)情況下這個(gè)發(fā)送 ack 的操作是自動(dòng)提交的,也就是說(shuō)消費(fèi)者一收到這個(gè)消息就會(huì)自動(dòng)返回 ack 給 RabbitMQ,所以會(huì)出現(xiàn)丟消息的問(wèn)題。
所以針對(duì)這個(gè)問(wèn)題的解決方案就是:關(guān)閉 RabbitMQ 消費(fèi)者的自動(dòng)提交 ack,在消費(fèi)者處理完這條消息之后再手動(dòng)提交 ack。
import java.io.IOException;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;public class ConsumerTest {private static String queue_name = "tx_queue";/*** @param args*/public static void main(String[] args) {Connection conn;try {conn = MqManager.newConnection();Channel channel = conn.createChannel();// 消費(fèi)消息boolean autoAck = false;channel.basicConsume(queue_name, autoAck, "myConsumer Tag", new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {String routingKey = envelope.getRoutingKey();String convernType = properties.getContentType();long deliveryTag = envelope.getDeliveryTag();System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body));channel.basicAck(deliveryTag, false);}});}catch (Exception e) {e.printStackTrace();}}}
5. 業(yè)務(wù)邏輯
在該流程中,一個(gè)分布式事務(wù)由 A 和 B 兩個(gè)服務(wù)共同完成,在 A 和 B 都執(zhí)行成功時(shí),分布式事務(wù)結(jié)果不會(huì)出現(xiàn)以外,但是如果該流程中某一個(gè)步驟出現(xiàn)問(wèn)題,很可能就會(huì)導(dǎo)致 AB 的數(shù)據(jù)不一致的問(wèn)題。接下來(lái),我們仔細(xì)分析一下該流程中的問(wèn)題所在。
在 A 服務(wù)中,由于是本地事務(wù)控制,可以保證 a、b 操作的原子性。這里要特別說(shuō)明的是 b 操作所涉及到的內(nèi)容:
- 服務(wù) A 向 MQ 服務(wù)發(fā)送一個(gè)消息;
- MQ 持久化保存消息;
- MQ 向服務(wù) A 發(fā)送 ack 確認(rèn)。
待這一系列操作完成后,A 認(rèn)為所有操作完成,提交事務(wù)保存。
在 B 服務(wù)中,由于是本地事務(wù)控制,可以保證 c、d 操作的原子性。這里要特別說(shuō)明的是 c 操作所涉及到的內(nèi)容:
- MQ 向 B 服務(wù)發(fā)送消息(B 接收消息);
- B 向 MQ 發(fā)送 ack 確認(rèn)消息(該步驟也可以是在 d 操作完成后返回給 MQ)。
本章節(jié)部分參考
https://gitbook.cn/books/5d65124b2b27dd24ed390665/index.html
https://gitbook.cn/books/5f30bfb0be80f0592e70f206/index.html
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 但求一死下一句是什么呢?
- 下一篇: 从面试官角度观察到的程序员工资瓶颈,同时