给你1分钟,回答下RabbitMQ如何保证消息不丢?
一條消費成功被消費經歷了生產者->MQ->消費者,因此在這三個步驟中都有可能造成消息丟失。
一 消息生產者沒有把消息成功發送到MQ
1.1 事務機制
AMQP協議提供了事務機制,在投遞消息時開啟事務支持,如果消息投遞失敗,則回滾事務。
自定義事務管理器
@Configuration public?class?RabbitTranscation?{@Beanpublic?RabbitTransactionManager?rabbitTransactionManager(ConnectionFactory?connectionFactory){return?new?RabbitTransactionManager(connectionFactory);}@Beanpublic?RabbitTemplate?rabbitTemplate(ConnectionFactory?connectionFactory){return?new?RabbitTemplate(connectionFactory);} }修改yml
spring:rabbitmq:#?消息在未被隊列收到的情況下返回publisher-returns:?true開啟事務支持
rabbitTemplate.setChannelTransacted(true);消息未接收時調用ReturnCallback
rabbitTemplate.setMandatory(true);生產者投遞消息
@Service public?class?ProviderTranscation?implements?RabbitTemplate.ReturnCallback?{@AutowiredRabbitTemplate?rabbitTemplate;@PostConstructpublic?void?init(){//?設置channel開啟事務rabbitTemplate.setChannelTransacted(true);rabbitTemplate.setReturnCallback(this);}@Overridepublic?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{System.out.println("這條消息發送失敗了"+message+",請處理");}@Transactional(rollbackFor?=?Exception.class,transactionManager?=?"rabbitTransactionManager")public?void?publishMessage(String?message)?throws?Exception?{rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);} }但是,很少有人這么干,因為這是同步操作,一條消息發送之后會使發送端阻塞,以等待RabbitMQ-Server的回應,之后才能繼續發送下一條消息,生產者生產消息的吞吐量和性能都會大大降低。
1.2 發送方確認機制
發送消息時將信道設置為confirm模式,消息進入該信道后,都會被指派給一個唯一ID,一旦消息被投遞到所匹配的隊列后,RabbitMQ就會發送給生產者一個確認。
開啟消息確認機制
spring:rabbitmq:#?消息在未被隊列收到的情況下返回publisher-returns:?true#?開啟消息確認機制publisher-confirm-type:?correlated消息未接收時調用ReturnCallback
rabbitTemplate.setMandatory(true);生產者投遞消息
@Service public?class?ConfirmProvider?implements?RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback?{@AutowiredRabbitTemplate?rabbitTemplate;@PostConstructpublic?void?init()?{rabbitTemplate.setReturnCallback(this);rabbitTemplate.setConfirmCallback(this);}@Overridepublic?void?confirm(CorrelationData?correlationData,?boolean?ack,?String?cause)?{if(ack){System.out.println("確認了這條消息:"+correlationData);}else{System.out.println("確認失敗了:"+correlationData+";出現異常:"+cause);}}@Overridepublic?void?returnedMessage(Message?message,?int?replyCode,?String?replyText,?String?exchange,?String?routingKey)?{System.out.println("這條消息發送失敗了"+message+",請處理");}public?void?publisMessage(String?message){rabbitTemplate.setMandatory(true);rabbitTemplate.convertAndSend("javatrip",message);} }如果消息確認失敗后,我們可以進行消息補償,也就是消息的重試機制。當未收到確認信息時進行消息的重新投遞。設置如下配置即可完成。
spring:rabbitmq:#?支持消息發送失敗后重返隊列publisher-returns:?true#?開啟消息確認機制publisher-confirm-type:?correlatedlistener:simple:retry:#?開啟重試enabled:?true#?最大重試次數max-attempts:?5#?重試時間間隔initial-interval:?3000二 消息發送到MQ后,MQ宕機導致內存中的消息丟失
消息在MQ中有可能發生丟失,這時候我們就需要將隊列和消息都進行持久化。
@Queue注解為我們提供了隊列相關的一些屬性,具體如下:
name: 隊列的名稱;
durable: 是否持久化;
exclusive: 是否獨享、排外的;
autoDelete: 是否自動刪除;
arguments:隊列的其他屬性參數,有如下可選項,可參看圖2的arguments:
x-message-ttl:消息的過期時間,單位:毫秒;
x-expires:隊列過期時間,隊列在多長時間未被訪問將被刪除,單位:毫秒;
x-max-length:隊列最大長度,超過該最大值,則將從隊列頭部開始刪除消息;
x-max-length-bytes:隊列消息內容占用最大空間,受限于內存大小,超過該閾值則從隊列頭部開始刪除消息;
x-overflow:設置隊列溢出行為。這決定了當達到隊列的最大長度時消息會發生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊列類型僅支持drop-head;
x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊列長度超長或因空間超出閾值)的消息可指定發送到該交換器中;
x-dead-letter-routing-key:死信消息路由鍵,在消息發送到死信交換器時會使用該路由鍵,如果不設置,則使用消息的原來的路由鍵值
x-single-active-consumer:表示隊列是否是單一活動消費者,true時,注冊的消費組內只有一個消費者消費消息,其他被忽略,false時消息循環分發給所有消費者(默認false)
x-max-priority:隊列要支持的最大優先級數;如果未設置,隊列將不支持消息優先級;
x-queue-mode(Lazy mode):將隊列設置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設置,隊列將保留內存緩存以盡可能快地傳遞消息;
x-queue-master-locator:在集群模式下設置鏡像隊列的主節點信息。
持久化隊列
創建隊列的時候將持久化屬性durable設置為true,同時要將autoDelete設置為false
@Queue(value?=?"javatrip",durable?=?"true",autoDelete?=?"false")持久化消息
發送消息的時候將消息的deliveryMode設置為2,在Spring Boot中消息默認就是持久化的。
三 消費者消費消息的時候,未消費完畢就出現了異常
消費者剛消費了消息,還沒有處理業務,結果發生異常。這時候就需要關閉自動確認,改為手動確認消息。
修改yml為手動簽收模式
spring:rabbitmq:listener:simple:#?手動簽收模式acknowledge-mode:?manual#?每次簽收一條消息prefetch:?1消費者手動簽收
@Component @RabbitListener(queuesToDeclare?=?@Queue(value?=?"javatrip",?durable?=?"true")) public?class?Consumer?{@RabbitHandlerpublic?void?receive(String?message,?@Headers?Map<String,Object>?headers,?Channel?channel)?throws?Exception{System.out.println(message);//?唯一的消息IDLong?deliverTag?=?(Long)?headers.get(AmqpHeaders.DELIVERY_TAG);//?確認該條消息if(...){channel.basicAck(deliverTag,false);}else{//?消費失敗,消息重返隊列channel.basicNack(deliverTag,false,true);}} }四 總結
消息丟失的原因?
生產者、MQ、消費者都有可能造成消息丟失
如何保證消息的可靠性?
發送方采取發送者確認模式
MQ進行隊列及消息的持久化
消費者消費成功后手動確認消息
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的给你1分钟,回答下RabbitMQ如何保证消息不丢?的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2019春第四周作业软件
- 下一篇: arm的开发工具