MQ延迟队列实现延迟消息
在開發中經常會遇到延時任務的需求,例如在12306購買車票,若生成訂單30分鐘未支付則自動取消;還有在線商城完成訂單后48小時不評價 ,自動5星好評。像這類在某事件觸發后一段時間內執行的需求任務我們稱之為?延時任務。
那么如何實現延遲任務呢?
第一反應是利用cron方案來實現:
啟動一個cron定時任務,每隔一段時間執行一次,比如30分鐘,找到那些超時的數據,直接更新狀態,或者拿出來執行一些操作。如果數據量比較大,需要分頁查詢,分頁update,這將是一個for循環更新操作。
cron方案是很常見的一種方案,但是常見的不一定是最好的,主要有以下幾個問題:
- 當數據量大的時候輪詢效率低;
- 時效性不夠好,如果每小時輪詢一次,最差的情況時間誤差會達到1小時;
- 如果通過增加cron輪詢頻率來減少時間誤差,則會出現輪詢低效和重復計算的問題;
既然cron方案不是很理想,那就請出我們今天的主角,使用RocketMQ的延時消息解決。在創建訂單的時候發送一條延時消息到RocketMQ,30分鐘后消費者消費消息去檢查訂單的狀態,如果發現訂單未支付則取消訂單釋放庫存。
實現
RocketMQ延遲隊列的核心思路是:所有的延遲消息由producer發出之后,都會存放到同一個topic(SCHEDULE_TOPIC_XXXX)下,不同的延遲級別會對應不同的隊列序號,當延遲時間到之后,由定時線程讀取轉換為普通的消息存的真實指定的topic下,此時對于consumer端此消息才可見,從而被consumer消費。
注意:?RocketMQ不支持任意時間的延時,只支持以下幾個固定的延時等級
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
下面我們結合SprintBoot利用RocketMQ發送延時消息
- 引入RocketMQ組件
- 增加RocketMQ的配置
rocketmq:??
????????name-server:?172.31.0.44:9876??
????????producer:????
????????group:?delay-group
- 編寫生產者
- 編寫消費者
- 測試
這里delayLevel設置成5,對應RocketMQ的延時等級就是1分鐘后投遞消息。
- 運行結果
發送時間
消費時間
修改延時級別
RocketMQ的延遲等級可以進行修改,以滿足自己的業務需求,可以修改/添加新的level。例如:你想支持1天的延遲,修改最后一個level的值為1d,這個時候依然是18個level;也可以增加一個1d,這個時候總共就有19個level。
- 打開RocketMQ的配置文件,修改messageDelayLevel?屬性
brokerClusterName = DefaultClusterbrokerName = broker-abrokerId = 0deleteWhen = 04fileReservedTime = 48brokerRole = ASYNC_MASTERflushDiskType = ASYNC_FLUSHstorePathRootDir = /app/rocketmq/datamessageDelayLevel=90s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
這次將延時等級1修改成了90s,生產者發送消息后需要90s后再進行消息投遞。修改完成后重啟RocketMQ。
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
- 使用延時等級1發送消息
public?void?sendDelayMessage()?{?delayProduce.sendDelayMessage("delay-topic","Hello,JAVA日知錄",1);}
- 測試
發送時間
消費時間
通過比對發送時間與消費時間證明延時等級修改生效。
總結
以上是生活随笔為你收集整理的MQ延迟队列实现延迟消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 爬取斗图网的图片
- 下一篇: Ajax(3)--DOM操作HTML 你