RabbitMQ消息
生活随笔
收集整理的這篇文章主要介紹了
RabbitMQ消息
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
如何確保RabbitMQ消息的可靠性?
- 開(kāi)啟生產(chǎn)者確認(rèn)機(jī)制,確保生產(chǎn)者的消息能到達(dá)隊(duì)列
- 開(kāi)啟持久化功能,確保消息未消費(fèi)前在隊(duì)列中不會(huì)丟失
- 開(kāi)啟消費(fèi)者確認(rèn)機(jī)制為auto,由spring確認(rèn)消息處理成功后完成ack
- 開(kāi)啟消費(fèi)者失敗重試機(jī)制,并設(shè)置MessageRecoverer,多次重試失敗后將消息投遞到異常交換機(jī),交由人工處理
1.生產(chǎn)者確認(rèn)機(jī)制
- 對(duì)應(yīng)配置:
- 啟動(dòng)配置類
每個(gè)RabbitTemplate只能配置一個(gè)ReturnCallback,因此需要在項(xiàng)目啟動(dòng)過(guò)程中配置
ApplicationContextAware ->bean工廠通知->拿到rabbitTemplate
@Slf4j @Configuration //生產(chǎn)者消息確認(rèn),確認(rèn)信心到達(dá)隊(duì)列 public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//獲取RabbitTemplate對(duì)象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {//判斷是否是延遲消息if(message.getMessageProperties().getReceivedDelay()>0){return;}//失敗時(shí)才會(huì)回調(diào)//處理:記錄日志log.error("消息發(fā)送到隊(duì)列失敗,響應(yīng)碼:{},失敗原因:{},交換機(jī):{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message);//可以得到所有的錯(cuò)誤信息,有需要的話,可以選擇重發(fā)信息});} }- 消息發(fā)送
2.消息持久化
- 交換機(jī)持久化
RabbitMQ中交換機(jī)默認(rèn)是非持久化的,mq重啟后就丟失
默認(rèn)情況下,由SpringAMQP聲明的交換機(jī)都是持久化的
- 隊(duì)列持久化
RabbitMQ中隊(duì)列默認(rèn)是非持久化的,mq重啟后就丟失
默認(rèn)情況下,由SpringAMQP聲明的隊(duì)列都是持久化的
- 消息持久化
利用SpringAMQP發(fā)送消息時(shí),可以設(shè)置消息的屬性(MessageProperties),指定delivery-mode
默認(rèn)情況下,SpringAMQP發(fā)出的任何消息都是持久化的,不用特意指定
3.1消費(fèi)者確認(rèn)機(jī)制
RabbitMQ是閱后即焚機(jī)制,RabbitMQ確認(rèn)消息被消費(fèi)者消費(fèi)后會(huì)立刻刪除。
而RabbitMQ是通過(guò)消費(fèi)者回執(zhí)來(lái)確認(rèn)消費(fèi)者是否成功處理消息的:消費(fèi)者獲取消息后,應(yīng)該向RabbitMQ發(fā)送ACK回執(zhí),表明自己已經(jīng)處理消息。
設(shè)想這樣的場(chǎng)景:
這樣,消息就丟失了。因此消費(fèi)者返回ACK的時(shí)機(jī)非常重要。
而SpringAMQP則允許配置三種確認(rèn)模式:
- manual:手動(dòng)ack,需要在業(yè)務(wù)代碼結(jié)束后,調(diào)用api發(fā)送ack。
- auto:自動(dòng)ack,由spring監(jiān)測(cè)listener代碼是否出現(xiàn)異常,沒(méi)有異常則返回ack;拋出異常則返回nack
- none:關(guān)閉ack,MQ假定消費(fèi)者獲取消息后會(huì)成功處理,因此消息投遞后立即被刪除
由此可知:
- none模式下,消息投遞是不可靠的,可能丟失
- auto模式類似事務(wù)機(jī)制,出現(xiàn)異常時(shí)返回nack,消息回滾到mq;沒(méi)有異常,返回ack
- manual:自己根據(jù)業(yè)務(wù)情況,判斷什么時(shí)候該ack
一般,我們都是使用默認(rèn)的auto即可
3.2消費(fèi)失敗重試機(jī)制
- 重試接收的交換機(jī)及隊(duì)列配置類
消費(fèi)者兩種模式配置
logging:pattern:dateformat: HH:mm:ss:SSSlevel:cn.itcast: debug spring:rabbitmq:host: 192.168.23.130 # rabbitMQ的ip地址port: 5672 # 端口username: itcastpassword: 123321virtual-host: /listener:simple:prefetch: 1#acknowledge-mode: none # 關(guān)閉ack 消息處理拋異常時(shí),消息依然被RabbitMQ刪除acknowledge-mode: auto # ack 自動(dòng)返回結(jié)果retry:enabled: true # 開(kāi)啟消費(fèi)者失敗重試 在消費(fèi)者本地重試,不會(huì)返回隊(duì)列initial-interval: 1000 # 初識(shí)的失敗等待時(shí)長(zhǎng)為1秒multiplier: 1 # 失敗的等待時(shí)長(zhǎng)倍數(shù),下次等待時(shí)長(zhǎng) = multiplier * last-intervalmax-attempts: 3 # 最大重試次數(shù)stateless: true # true無(wú)狀態(tài);false有狀態(tài)。如果業(yè)務(wù)中包含事務(wù),這里改為false總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 手机充电应该注意什么
- 下一篇: Flex布局 让你的布局更完美