RabbitMQ消息幂等性问题
文章目錄
- 1. 什么是冪等性?
- 1.1 消息隊列的冪等性
- 1.2 模擬重試機制
- 1.2.1 生產(chǎn)者代碼
- 1.2.2 消費者代碼
- 1.2.3 消費者 application.yml 配置
- 2. 如何保證消息冪等性,不被重復(fù)消費?
- 解決方法
1. 什么是冪等性?
在編程中一個冪等操作的特點是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。
HTTP方法的冪等性是指一次和多次請求某一個資源應(yīng)該具有同樣的副作用。冪等性屬于語義范疇,正如編譯器只能幫助檢查語法錯誤一樣,HTTP規(guī)范也沒有辦法通過消息格式等語法手段來定義它。
簡之:一個請求,不管重復(fù)來多少次,結(jié)果是不會改變的。
1.1 消息隊列的冪等性
如同HTTP方法的冪等性,消息隊列同樣會出現(xiàn)冪等性問題。
消費者在消費 MQ 中的消息時,MQ 已把消息發(fā)送給消費者,消費者在給 MQ 返回 ack 時網(wǎng)絡(luò)中斷,故 MQ 未收到確認信息,該條消息會重新發(fā)給其他的消費者,或者在網(wǎng)絡(luò)重連后再次發(fā)送給該消費者,但實際上該消費者已成功消費了該條消息,造成消費者消費了重復(fù)的消息;注意,RabbitMQ 這種消息重試(補償)機制是默認的。
所以,MQ 消費者的冪等性問題,主要在于 MQ 的重試機制,因為網(wǎng)絡(luò)原因或客戶端延遲消費導(dǎo)致重復(fù)消費。
那么,如何合適選擇重試機制?我們來看兩種情況。
情況1: 消費者獲取到消息后,調(diào)用第三方接口,但接口暫時無法訪問,是否需要重試?
需要重試
情況2: 消費者獲取到消息后,拋出數(shù)據(jù)轉(zhuǎn)換異常,是否需要重試?
不需要重試
總結(jié):對于情況2,如果消費者代碼拋出異常是需要發(fā)布新版本才能解決的問題,那么不需要重試,重試也無濟于事。應(yīng)該采用日志記錄+定時任務(wù) job 健康檢查+人工進行補償
1.2 模擬重試機制
我們采用一種短信消費者客戶端異常的情況來模擬 RabbitMQ 的重試機制。
@RabbitListener(queues = "fanout_sms_queue") public void process(String msg) {System.out.println("短信消費者獲取生產(chǎn)者消息msg:" + msg);int i = 1/0; }如上代碼,很顯然會報錯,一擔(dān)報錯生產(chǎn)者的消息時不會被消費的?
@RabbitListener 底層使用 AOP 進行異常通知攔截,如果程序沒有拋出異常信息,那么就會自動提交事務(wù);如果 AOP 異常通知攔截有捕獲到異常信息的話,就會自動實現(xiàn)重試(補償)機制,同時,這個補償機制的消息會緩存到 RabbitMQ 服務(wù)器端進行存放,一直重試到不拋出異常為止。
1.2.1 生產(chǎn)者代碼
@Component public class FanoutProducer {@Autowiredprivate AmqpTemplate amqpTemplate;/*** 發(fā)送消息** @param queueName 隊列名稱*/public void send(String queueName) {String msg = "my_fanout_msg:" + System.currentTimeMillis();Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();System.out.println(msg + ":" + msg);amqpTemplate.convertAndSend(queueName, message);} }1.2.2 消費者代碼
@Component public class FanoutEamilConsumer {@RabbitListener(queues = "fanout_eamil_queue")public void process(Message message) throws Exception {String revMessage = Thread.currentThread().getName() + ",郵件消費者獲取生產(chǎn)者消息msg:" + new String(message.getBody(), "UTF-8")+ ",messageId:" + message.getMessageProperties().getMessageId();System.out.println(revMessage);} }1.2.3 消費者 application.yml 配置
spring:rabbitmq:####連接地址host: 127.0.0.1####端口號 port: 5672####賬號 username: guest####密碼 password: guest### 地址virtual-host: /admin_hostlistener:simple:retry:####開啟消費者重試enabled: true####最大重試次數(shù)max-attempts: 5####重試間隔次數(shù)initial-interval: 3000server:port: 8081我們通過 RabbitMQ 配置,增加了 RabbitMQ 重試時間以及重試次數(shù)限制,在一定程度上解決了重復(fù)消費的問題,接下來看一道常問的面試題。
2. 如何保證消息冪等性,不被重復(fù)消費?
其實,這個問題也算是 MQ 面試當中經(jīng)??疾斓囊稽c,因為無論是什么 MQ 都會有這個問題。
首先通過上邊我們了解了什么是“冪等性”,以及 MQ 冪等性問題的產(chǎn)生,所以我們要清楚為什么會出現(xiàn)重復(fù)性消費?在什么場景會出現(xiàn)重復(fù)消費?
解決方法
使用全局 MessageID 判斷消費方使用同一個,解決冪等性問題。
或者使用業(yè)務(wù)邏輯保證唯一(比如訂單號碼)
生產(chǎn)者關(guān)鍵代碼:
@Autowired private AmqpTemplate amqpTemplate;/*** 發(fā)送消息** @param queueName 隊列名稱*/ public void send(String queueName) {String msg = "my_fanout_msg:" + System.currentTimeMillis();Message message = MessageBuilder.withBody(msg.getBytes()).setContentType(MessageProperties.CONTENT_TYPE_JSON).setContentEncoding("utf-8").setMessageId(UUID.randomUUID() + "").build();System.out.println(msg + ":" + msg);amqpTemplate.convertAndSend(queueName, message); }如上,生產(chǎn)者在發(fā)送消息時(convertAndSend),給消息對象設(shè)置了唯一的 MessageID,只有該 MessageID 沒有被消費者標記方能在重試機制中再次被消費。
消費者關(guān)鍵代碼:
@RabbitListener(queues = "fanout_eamil_queue") public void process(Message message) throws Exception {String revMessage = Thread.currentThread().getName()+ ",郵件消費者獲取生產(chǎn)者消息msg:"+ new String(message.getBody(), "UTF-8")+ ",messageId:" + message.getMessageProperties().getMessageId();System.out.println(revMessage);發(fā)送郵件的邏輯XXX }如上,通過 message.getMessageProperties().getMessageId() 獲取 MessageID,獲取的 MessageID 可以用來判斷是否已經(jīng)被消費者消費過了,如果已經(jīng)消費則取消再次消費。
通常怎么判斷呢?
比如上方是一個郵件發(fā)送的消費者,在做補償時,假如上一步郵件發(fā)送成功了,我們會把該 ID 存至 redis中,下次再調(diào)用時,先去 redis 判斷是否存在該 ID 了,如果存在表明已經(jīng)消費過了則直接返回,不再消費,否則消費,然后將記錄存至 redis。
我創(chuàng)建了一個java相關(guān)的公眾號,用來記錄自己的學(xué)習(xí)之路,感興趣的小伙伴可以關(guān)注一下微信公眾號哈:niceyoo
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息幂等性问题的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 在linux上安装jdk(转载)
- 下一篇: Unity中UGUI之Canvas属性解