RabbitMQ消息确认机制
文章目錄
- 1. 事務(wù)機(jī)制
- 2. Confirm模式
- 2.1 生產(chǎn)者
- 2.1.1 普通Confirm模式
- 2.1.2 批量Confirm模式
- 2.1.3 異步Confirm模式
- 2.2 消費(fèi)者
- 3. 其他
消費(fèi)者如何確保消息一定能夠消費(fèi)成功呢?
由于在前面工作隊(duì)列模式里面我們了解了應(yīng)答模式,所以我們可以很自信的回答如上題目。
通過(guò)應(yīng)答形式,默認(rèn)自動(dòng)應(yīng)答,可以修改為手動(dòng)應(yīng)答來(lái)保證消息消費(fèi)成功。
其實(shí)應(yīng)答形式就是 RabbitMQ 消息確認(rèn)機(jī)制的一種體現(xiàn),我們?cè)賮?lái)看看問(wèn)題的產(chǎn)生背景:
生產(chǎn)者發(fā)送消息出去之后,不知道到底有沒(méi)有發(fā)送到 RabbitMQ 服務(wù)器, 默認(rèn)是不知道的。而且有的時(shí)候我們?cè)诎l(fā)送消息之后,后面的邏輯出問(wèn)題了,我們不想要發(fā)送之前的消息了,需要撤回該怎么做。
兩種解決方案:
1. 事務(wù)機(jī)制
事務(wù)機(jī)制分為三部分,開(kāi)啟事務(wù),提交事務(wù),事務(wù)回滾,如下:
我們通過(guò)一個(gè)例子模擬消息生產(chǎn)者發(fā)送消息過(guò)程發(fā)生異常,進(jìn)行事務(wù)回滾的過(guò)程。
public class Producer {/** 隊(duì)列名稱 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.獲取連接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.創(chuàng)建通道 */Channel channel = newConnection.createChannel();/** 3.創(chuàng)建隊(duì)列聲明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.發(fā)送消息 */try {/** 4.1 開(kāi)啟事務(wù) */channel.txSelect();String msg = "我是生產(chǎn)者生成的消息";System.out.println("生產(chǎn)者發(fā)送消息:"+msg);channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());/** 4.2 提交事務(wù) - 模擬異常 */int i = 1/0;channel.txCommit();}catch (Exception e){e.printStackTrace();System.out.println("發(fā)生異常,我要進(jìn)行事務(wù)回滾了!");/** 4.3 事務(wù)回滾 */channel.txRollback();}finally {channel.close();newConnection.close();}}}打印結(jié)果:
生產(chǎn)者發(fā)送消息:我是生產(chǎn)者生成的消息
java.lang.ArithmeticException: / by zero at club.sscai.producer.Producer.main(Producer.java:37)
發(fā)生異常,我要進(jìn)行事務(wù)回滾了!
2. Confirm模式
像上方這種采用 AMQP 事務(wù)機(jī)制來(lái)保證消息的準(zhǔn)確到達(dá),在一定程度上是消耗了性能的,所以我們?cè)賮?lái)看看 Confirm 模式。
Confirm 模式分為兩塊,一是生產(chǎn)者的 Confirm 模式,再就是消費(fèi)者的 Confirm 模式。
2.1 生產(chǎn)者
通過(guò)生產(chǎn)者的確認(rèn)模式我們是要保證消息準(zhǔn)確達(dá)到客戶端,而與 AMQP 事務(wù)不同的是 Confirm 是針對(duì)一條消息的,而事務(wù)是可以針對(duì)多條消息的。
Confirm 模式最大的好處在于它是異步的,一旦發(fā)布一條消息,生產(chǎn)者應(yīng)用程序就可以在等信道返回確認(rèn)的同時(shí)繼續(xù)發(fā)送下一條消息,當(dāng)消息最終得到確認(rèn)之后,生產(chǎn)者應(yīng)用便可以通過(guò)回調(diào)方法來(lái)處理該確認(rèn)消息。
Confirm 的三種實(shí)現(xiàn)方式:
2.1.1 普通Confirm模式
public class Producer11 {/** 隊(duì)列名稱 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.獲取連接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.創(chuàng)建通道 */Channel channel = newConnection.createChannel();/** 3.創(chuàng)建隊(duì)列聲明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.開(kāi)啟發(fā)送方確認(rèn)模式 */channel.confirmSelect();/** 5.發(fā)送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, (" Confirm模式, 第" + (i + 1) + "條消息").getBytes());try {if (channel.waitForConfirms()) {System.out.println("發(fā)送成功");}else{System.out.println("進(jìn)行消息重發(fā)");}} catch (InterruptedException e) {e.printStackTrace();}}/** 5.關(guān)閉通道、連接 */channel.close();newConnection.close();} }在推送消息之前,channel.confirmSelect() 聲明開(kāi)啟發(fā)送方確認(rèn)模式,再使用channel.waitForConfirms() 等待消息被服務(wù)器確認(rèn)即可。
2.1.2 批量Confirm模式
public class Producer22 {/** 隊(duì)列名稱 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {/** 1.獲取連接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.創(chuàng)建通道 */Channel channel = newConnection.createChannel();/** 3.創(chuàng)建隊(duì)列聲明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.開(kāi)啟發(fā)送方確認(rèn)模式 */channel.confirmSelect();/** 5.發(fā)送消息 */for (int i = 0; i < 5; i++) {channel.basicPublish("", QUEUE_NAME, null, (" Confirm模式, 第" + (i + 1) + "條消息").getBytes());}/** 6.直到所有信息都發(fā)布,只要有一個(gè)未確認(rèn)就會(huì)IOException */channel.waitForConfirmsOrDie();System.out.println("全部執(zhí)行完成");/** 5.關(guān)閉通道、連接 */channel.close();newConnection.close();} }channel.waitForConfirmsOrDie() 使用同步方式等所有的消息發(fā)送之后才會(huì)執(zhí)行后面代碼,只要有一個(gè)消息未被確認(rèn)就會(huì)拋出 IOException 異常。
2.1.3 異步Confirm模式
public class Producer33 {/** 隊(duì)列名稱 */private static final String QUEUE_NAME = "test_queue";public static void main(String[] args) throws IOException, TimeoutException {/** 1.獲取連接 */Connection newConnection = MQConnectionUtils.newConnection();/** 2.創(chuàng)建通道 */Channel channel = newConnection.createChannel();/** 3.創(chuàng)建隊(duì)列聲明 */channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** 4.開(kāi)啟發(fā)送方確認(rèn)模式 */channel.confirmSelect();for (int i = 0; i < 10; i++) {String message = "我是生產(chǎn)者生成的消息:" + i;channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));}/** 5.發(fā)送消息 異步監(jiān)聽(tīng)確認(rèn)和未確認(rèn)的消息 */channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("未確認(rèn)消息,標(biāo)識(shí):" + deliveryTag);}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println(String.format("已確認(rèn)消息,標(biāo)識(shí):%d,多個(gè)消息:%b", deliveryTag, multiple));}});/** 6.關(guān)閉通道、連接 *//** channel.close();*//** newConnection.close();*/}}異步模式的優(yōu)點(diǎn),就是執(zhí)行效率高,不需要等待消息執(zhí)行完,只需要監(jiān)聽(tīng)消息即可,以上異步返回的信息如下:
可以看出,代碼是異步執(zhí)行的,消息確認(rèn)有可能是批量確認(rèn)的,是否批量確認(rèn)在于返回的 multiple 的參數(shù),此參數(shù)為 bool 值,如果 true 表示批量執(zhí)行了 deliveryTag 這個(gè)值以前的所有消息,如果為 false 的話表示單條確認(rèn)。
維持異步調(diào)用要求我們不能斷掉連接,因此注釋掉第6步。
2.2 消費(fèi)者
為了保證消息從隊(duì)列可靠地到達(dá)消費(fèi)者,RabbitMQ 提供消息確認(rèn)機(jī)制(message acknowledgment)。消費(fèi)者在聲明隊(duì)列時(shí),可以指定 noAck 參數(shù),當(dāng) noAck=false 時(shí), RabbitMQ 會(huì)等待消費(fèi)者顯式發(fā)回ack信號(hào)后才從內(nèi)存(和磁盤(pán),如果是持久化消息的話)中移去消息。否則,RabbitMQ 會(huì)在隊(duì)列中消息被消費(fèi)后立即刪除它。
在消費(fèi)者中 Confirm 模式又分為手動(dòng)確認(rèn)和自動(dòng)確認(rèn)。
關(guān)于兩者的介紹:
自動(dòng)確認(rèn): 在自動(dòng)確認(rèn)模式下,消息在發(fā)送后立即被認(rèn)為是發(fā)送成功。 這種模式可以提高吞吐量(只要消費(fèi)者能夠跟上),不過(guò)會(huì)降低投遞和消費(fèi)者處理的安全性。 這種模式通常被稱為“發(fā)后即忘”。 與手動(dòng)確認(rèn)模式不同,如果消費(fèi)者的TCP連接或信道在成功投遞之前關(guān)閉,該消息則會(huì)丟失。
手動(dòng)確認(rèn): 使用自動(dòng)確認(rèn)模式時(shí)需要考慮的另一件事是消費(fèi)者過(guò)載。 手動(dòng)確認(rèn)模式通常與有限的信道預(yù)取一起使用,限制信道上未完成(“進(jìn)行中”)傳送的數(shù)量。 然而,對(duì)于自動(dòng)確認(rèn),根據(jù)定義沒(méi)有這樣的限制。 因此,消費(fèi)者可能會(huì)被交付速度所壓倒,可能積壓在內(nèi)存中,堆積如山,或者被操作系統(tǒng)終止。 某些客戶端庫(kù)將應(yīng)用TCP反壓(直到未處理的交付積壓下降超過(guò)一定的限制時(shí)才停止從套接字讀取)。 因此,只建議當(dāng)消費(fèi)者可以有效且穩(wěn)定地處理投遞時(shí)才使用自動(dòng)投遞方式。
綜上:盡量選擇手動(dòng)確認(rèn)方式。
主要實(shí)現(xiàn)代碼:
// 手動(dòng)確認(rèn)消息 channel.basicAck(envelope.getDeliveryTag(), false);// 關(guān)閉自動(dòng)確認(rèn) boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, consumer);3. 其他
1、如果 RabbitMQ 服務(wù)器宕機(jī)了,消息會(huì)丟失嗎?
不會(huì)丟失,RabbitMQ 服務(wù)器支持消息持久化機(jī)制,會(huì)把消息持久化到硬盤(pán)上。
2、如何確保消息正確地發(fā)送至RabbitMQ?
RabbitMQ 使用發(fā)送方確認(rèn)模式,確保消息正確地發(fā)送到 RabbitMQ。
發(fā)送方確認(rèn)模式:將信道設(shè)置成 confirm 模式(發(fā)送方確認(rèn)模式),則所有在信道上發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID。一旦消息被投遞到目的隊(duì)列后,或者消息被寫(xiě)入磁盤(pán)后(可持久化的消息),信道會(huì)發(fā)送一個(gè)確認(rèn)給生產(chǎn)者(包含消息唯一ID)。如果RabbitMQ發(fā)生內(nèi)部錯(cuò)誤從而導(dǎo)致消息丟失,會(huì)發(fā)送一條nack(not acknowledged,未確認(rèn))消息。
發(fā)送方確認(rèn)模式是異步的,生產(chǎn)者應(yīng)用程序在等待確認(rèn)的同時(shí),可以繼續(xù)發(fā)送消息。當(dāng)確認(rèn)消息到達(dá)生產(chǎn)者應(yīng)用程序,生產(chǎn)者應(yīng)用程序的回調(diào)方法就會(huì)被觸發(fā)來(lái)處理確認(rèn)消息。
我創(chuàng)建了一個(gè)java相關(guān)的公眾號(hào),用來(lái)記錄自己的學(xué)習(xí)之路,感興趣的小伙伴可以關(guān)注一下微信公眾號(hào)哈:niceyoo
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎(jiǎng)勵(lì)來(lái)咯,堅(jiān)持創(chuàng)作打卡瓜分現(xiàn)金大獎(jiǎng)總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息确认机制的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 3D Mapping with an R
- 下一篇: Android 向右滑返回,退出当前ac