MQ-消息延迟
?
目錄
首先了解死信交換機
?死信交換機與之前消費者的RepublishMessageRecoverer策略
?總結:
?1.死信交換機的配置
?2.Publisher發送延遲消息
?3.Consumer服務中,定義一個消費者監聽,并且聲明死信交換機以及隊列和key
4.消息超時的兩種方式
延遲隊列(使用mq的插件)
場景:
DelayExchange原理+使用
1.就要注解方式,將消息轉到延遲隊列
結果:
延遲交換機(delayed)優點:
為什么會打印異常,怎么樣進行處理?
首先了解死信交換機
當一個消息隊列滿足以下情況則可以稱為死信(dead letter)
消費者使用basic.reject或 basic.nack聲明消費失敗,并且消息的requeue參數設置為false。
消息是一個過期消息,超時無人消費。
要投遞的隊列消息堆積滿了,最早的消息可能成為死信。
一般來說,消息成為死信就會被我們丟棄,但是有了死信交換機就會變得不一樣
如果這個包含死信的隊列配置了dead-letter-exchange屬性,指定了一個交換機,那么隊列中的死信就會投遞到這個交換機中,而這個交換機稱為死信交換機(Dead Letter Exchange,簡稱DLX)
其實呢,所謂的死信交換機就是一個普通交換機,只不過是某個隊列用dead-letter-exchange這個屬性綁定到一起了,當這個隊列出現了死信,就會丟到我們這個死信交換機里了,就有點像垃圾桶一樣的了。
當一個消息被消費者拒絕,變成了死信
拒絕有三種:1.配置聲明,2.消息的過期,3.消息的堆積
?死信交換機與之前消費者的RepublishMessageRecoverer策略
?消費者這個是從消費者處理消息失敗后,retry模式后消息也依然失敗,就可以用這個策略,消費者將失敗消息投遞到指定的交換機,這樣消費者的壓力會比較大;
?(30條消息) RabbitMQ的高級特性(消息可靠性)_Fairy要carry的博客-CSDN博客
死信交換機:?
?因為simple.queue綁定了死信交換機所以死信會投遞給這個交換機,然后這個死信交換機又綁定了一個隊列,所以消息最終會進入存放的死信隊列
?
?作用:
1.對失敗消息做兜底服務,2.做超時處理
?總結:
?
TTL:Time-To-Love
TTL,也就是Time-To-Live。如果一個隊列中的消息TTL結束仍未消費,則會變為死信,TTL超時分為兩種情況:
-
消息所在的隊列設置了超時時間
-
消息本身設置了超時時間
?1.死信交換機的配置
?聲明隊列,并指定死信傳遞到哪個交換機:QueueBuilder.ttl(xxx):xxx后消息變為死信給到deadLetterExchange死信交換機
package cn.itcast.mq.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @author diao 2022/6/19*/ @Configuration public class TTLMessageConfig {//交換機@Beanpublic DirectExchange ttlDirectExchange() {return new DirectExchange("ttl.direct");}//隊列@Beanpublic Queue ttlQueue() {return QueueBuilder.durable("ttl.queue")//10s后消息變成死信.ttl(10000).deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}//綁定@Beanpublic Binding ttlBinding(){return BindingBuilder.bind(ttlQueue()).to(ttlDirectExchange()).with("ttl");} }?2.Publisher發送延遲消息
setExpiration("xxx"):xxxs后進入死信,消費者才能消費,達到延遲效果
/*** 測試消息發送*/@Testpublic void testTTLMessage(){//1.準備消息Message message = MessageBuilder.withBody("hello,ttlMessage".getBytes(StandardCharsets.UTF_8)) // .setDeliveryMode(MessageDeliveryMode.PERSISTENT)//5s后進入死信(給消息設置ttl).setExpiration("5000").build();//2.發送消息rabbitTemplate.convertAndSend("ttl.direct","ttl",message);//3.記錄日志log.info("消息已經成功發送");}?3.Consumer服務中,定義一個消費者監聽,并且聲明死信交換機以及隊列和key
/*** 消費者監聽死信隊列,延遲監聽*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "dl.queue", durable = "true"),exchange = @Exchange(name = "dl.direct"),key = "dl"))public void listenDlQueue(){log.info("消費者收到了dl.queue的延遲消息");}?因為隊列的TTL值是10000ms,也就是10秒。可以看到消息發送與接收之間的時差剛好是10秒。
注:當發送消息MessageBuilder.setExpiration()指定了延遲時間時,會與基于隊列的延遲做比較,看誰時間短
4.消息超時的兩種方式
1.給隊列設置ttl屬性,當消息進入隊列后,超過ttl時間的消息就會變成死信
2.給消息設置ttl屬性,當消息發送超過ttl時間后變為死信
延遲隊列(使用mq的插件)
我們用啟動mq容器綁定數據卷,然后再在數據卷掛載映射到本機的目錄上下載文件,最后exec容器執行加載文件命令即可
或者進入mq的pluginmul,直接將文件放入,然后執行容器命令即可
?涉及命令:
#啟動容器 docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3.8-management docker exec -it mq bash rabbitmq-plugins enable rabbitmq_delayed_message_exchange場景:
1.延遲發送短信
2.用戶下單,xxx后沒支付就自動取消
3.預約工作會議,xx后通知所有參會人員
DelayExchange原理+使用
DelayExchange的本質還是官方的三種交換機,只是添加了延遲功能。因此使用時只需要聲明一個交換機,交換機的類型可以是任意類型,然后設定delayed屬性為true即可。
1.就要注解方式,將消息轉到延遲隊列
?2.基于Bean的方式
給交換機配置delayed屬性:ExchangeBuilder.delayed()
?3.發送消息
發送消息時,需要攜帶x-delay屬性,來指定延遲時間:MessageBuilder.setHeader("x-delay",5000)
結果:
延遲交換機(delayed)優點:
它的交換機能夠儲存消息DelayExchange,到內存中
聲明交換機類型(延遲...)以及模式(Fanout、topic...)
使用:
使用交換機指定模式以及類型,然后發消息
?自定義延遲交換機
為什么會打印異常,怎么樣進行處理?
?我們的延遲交換機對于普通的,不會立馬將消息轉發而是儲存起來(因為交換機是不具備儲存消息的功能,一般是立馬轉發),因為沒有轉發消息,根據之前的配置,交換機沒有將消息給到隊列,所以log.error了——>之前我們配置這個目的:交換機沒有將消息給到隊列,然后消息重發,但是我們這個是延遲消息,所以具體還得具體判斷;
我們的全局配置重新定義
判斷是否為延遲消息,如果延遲時間>0,就return不再記錄日志和重發消息
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware {@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {//獲取RabbitTemplate對象RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//配置ReturnCallbackrabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) ->{//判斷是否是延遲消息(判斷延遲時間),如果是就returnif(message.getMessageProperties().getReceivedDelay()>0){return;}//記錄日志log.error("消息發送隊列失敗,響應碼:{},失敗原因:{},交換機:{},路由key:{},消息:{}",replyCode,replyText,exchange,routingKey,message.toString());//重發消息rabbitTemplate.convertAndSend("camq.topic",routingKey,"push again");} ));} }總結
- 上一篇: 看雪安全论坛
- 下一篇: 联想计算机管理员权限设置,联想电脑win