领导看了我写的关闭超时订单,让我出门左转!
前幾天領導突然宣布幾年前停用的電商項目又重新啟動了,帶著復雜的心情仔細賞閱“兒時”的代碼,心中的酸楚只有自己能夠體會。
這不,昨天又被領導叫進了“小黑屋”,讓我把代碼重構下進行升級。看到這么“可愛”的代碼,心中一萬只“xx馬”疾馳而過。
讓我最深惡痛覺的就是里邊竟然用定時任務實現了“關閉超時訂單”的功能,現在想來,哭笑不得。我們先分析一波為什么大家都在抵制用定時任務來實現該功能。
?
定時任務
關閉超時訂單是在創建訂單之后的一段時間內未完成支付而關閉訂單的操作,該功能一般要求每筆訂單的超時時間是一致的。
如果我們使用定時任務來進行該操作,很難把握定時任務輪詢的時間間隔:
時間間隔足夠小,在誤差允許的范圍內可以達到我們說的時間一致性問題,但是頻繁掃描數據庫,執行定時任務,會造成網絡IO和磁盤IO的消耗,對實時交易造成一定的沖擊;
時間間隔比較大,由于每個訂單創建的時間不一致,所以上邊的一致性要求很難達到,舉例如下:
假設30分鐘訂單超時自動關閉,定時任務的執行間隔時間為30分鐘:
我們在第5分鐘進行下單操作;
當時間來到第30分鐘時,定時任務執行一次,但是我們的訂單未滿足條件,不執行;
當時間來到第35分鐘時,訂單達到關閉條件,但是定時任務未執行,所以不執行;
當時間來到第60分鐘時,開始執行我們的訂單關閉操作,而此時,誤差達到25分鐘。
經此種種,我們需要舍棄該方式。
?
延時隊列
為了滿足領導的需求,我便將手伸向了消息隊列:RabbitMQ。盡管它本身并沒有提供延時隊列的功能,但是我們可以利用它的存活時間和死信交換機的特性來間接實現。
首先我們先來簡單介紹下什么是存活時間?什么是死信交換機?
存活時間
存活時間的全拼是Time To Live,簡稱 TTL。它既支持對消息本身進行設置(延遲隊列的關鍵),又支持對隊列進行設置(該隊列中所有消息存在相同的過期時間)。
對消息本身進行設置:即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的;
對隊列進行設置:一旦消息過期,就會從隊列中抹去;
如果同時使用這兩種方法,那么以過期時間小的那個數值為準。當消息達到過期時間還沒有被消費,那么該消息就“死了”,我們把它稱為 死信 消息。
消息變為死信的條件:
消息被拒絕(basic.reject/basic.nack),并且requeue=false;
消息的過期時間到期了;
隊列達到最大長度;
隊列設置注意事項
隊列中該屬性的設置要在第一次聲明隊列的時候設置才有效,如果隊列一開始已存在且沒有這個屬性,則要刪掉隊列再重新聲明才可以;
隊列的 ttl 只能被設置為某個固定的值,一旦設置后則不能更改,否則會拋出異常;
死信交換機
死信交換機全拼Dead-Letter-Exchange,簡稱DLX。
當消息在一個隊列中變成死信之后,如果這個消息所在的隊列設置了x-dead-letter-exchange參數,那么它會被發送到x-dead-letter-exchange對應值的交換機上,這個交換機就稱之為死信交換機,與這個死信交換器綁定的隊列就是死信隊列。
x-dead-letter-exchange:出現死信之后將死信重新發送到指定交換機;
x-dead-letter-routing-key:出現死信之后將死信重新按照指定的routing-key發送,如果不設置默認使用消息本身的routing-key
死信隊列與普通隊列的區別就是它的RoutingKey和Exchange需要作為參數,綁定到正常的隊列上。
?
實戰教學
先來張圖感受下我們的整體思路
生產者發送帶有 ttl 的消息放入交換機路由到延時隊列中;
在延時隊列中綁定死信交換機與死信轉發的routing-key;
等延時隊列中的消息達到延時時間之后變成死信轉發到死信交換機并路由到死信隊列中;
最后供消費者消費。
我們在上文的基礎上進行代碼實現:
配置類
@Configuration public?class?DelayQueueRabbitConfig?{public?static?final?String?DLX_QUEUE?=?"queue.dlx";//死信隊列public?static?final?String?DLX_EXCHANGE?=?"exchange.dlx";//死信交換機public?static?final?String?DLX_ROUTING_KEY?=?"routingkey.dlx";//死信隊列與死信交換機綁定的routing-keypublic?static?final?String?ORDER_QUEUE?=?"queue.order";//訂單的延時隊列public?static?final?String?ORDER_EXCHANGE?=?"exchange.order";//訂單交換機public?static?final?String?ORDER_ROUTING_KEY?=?"routingkey.order";//延時隊列與訂單交換機綁定的routing-key/***?定義死信隊列**/@Beanpublic?Queue?dlxQueue(){return?new?Queue(DLX_QUEUE,true);}/***?定義死信交換機**/@Beanpublic?DirectExchange?dlxExchange(){return?new?DirectExchange(DLX_EXCHANGE,?true,?false);}/***?死信隊列和死信交換機綁定*?設置路由鍵:routingkey.dlx**/@BeanBinding?bindingDLX(){return?BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);}/***?訂單延時隊列*?設置隊列里的死信轉發到的DLX名稱*?設置死信在轉發時攜帶的?routing-key?名稱**/@Beanpublic?Queue?orderQueue()?{Map<String,?Object>?params?=?new?HashMap<>();params.put("x-dead-letter-exchange",?DLX_EXCHANGE);params.put("x-dead-letter-routing-key",?DLX_ROUTING_KEY);return?new?Queue(ORDER_QUEUE,?true,?false,?false,?params);}/***?訂單交換機**/@Beanpublic?DirectExchange?orderExchange()?{return?new?DirectExchange(ORDER_EXCHANGE,?true,?false);}/***?把訂單隊列和訂單交換機綁定在一起**/@Beanpublic?Binding?orderBinding()?{return?BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);} }發送消息
@RequestMapping("/order") public?class?OrderSendMessageController?{@Autowiredprivate?RabbitTemplate?rabbitTemplate;@GetMapping("/sendMessage")public?String?sendMessage(){String?delayTime?=?"10000";//將消息攜帶路由鍵值rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,"發送消息!",message->{message.getMessageProperties().setExpiration(delayTime);return?message;});return?"ok";}}消費消息
@Component @RabbitListener(queues?=?DelayQueueRabbitConfig.DLX_QUEUE)//監聽隊列名稱 public?class?OrderMQReciever?{@RabbitHandlerpublic?void?process(String?message){System.out.println("OrderMQReciever接收到的消息是:"+?message);} }測試
通過調用接口,發現10秒之后才會消費消息
?
問題升級
由于開發環境和測試環境使用的是同一個交換機和隊列,所以發送的延時時間都是30分鐘。但是為了在測試環境讓測試同學方便測試,故手動將測試環境的時間改為了1分鐘。
問題復現
接著問題就來了:延時時間為1分鐘的消息并沒有立即被消費,而是等30分鐘的消息被消費完之后才被消費了。至于原因,我們下邊再分析,先用代碼來給大家復現下該問題。
@GetMapping("/sendManyMessage") public?String?sendManyMessage(){send("延遲消息睡10秒",10000+"");send("延遲消息睡2秒",2000+"");send("延遲消息睡5秒",5000+"");return?"ok"; }private?void?send(String?msg,?String?delayTime){rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,?DelayQueueRabbitConfig.ORDER_ROUTING_KEY,msg,message->{message.getMessageProperties().setExpiration(delayTime);return?message;}); }執行結果如下:
OrderMQReciever接收到的消息是:延遲消息睡10秒 OrderMQReciever接收到的消息是:延遲消息睡2秒 OrderMQReciever接收到的消息是:延遲消息睡5秒原因就是延時隊列也滿足隊列先進先出的特征,當10秒的消息未出隊列時,后邊的消息不能順利出隊,造成后邊的消息阻塞了,未能達到精準延時。
問題解決
我們可以利用x-delay-message插件來解決該問題
消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設置的范圍為 (2^32)-1 毫秒)
生產者發送消息到交換機時,并不會立即進入,而是先將消息持久化到 Mnesia(一個分布式數據庫管理系統);
插件將會嘗試確認消息是否過期;
如果消息過期,消息會通過 x-delayed-type 類型標記的交換機投遞至目標隊列,供消費者消費;
實踐
官網下載:https://www.rabbitmq.com/community-plugins.html
我這邊使用的是v3.8.0.ez,將文件下載下來放到服務器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。
出現如圖所示,代表安裝成功。
配置類
@Configuration public?class?XDelayedMessageConfig?{public?static?final?String?DIRECT_QUEUE?=?"queue.direct";//隊列public?static?final?String?DELAYED_EXCHANGE?=?"exchange.delayed";//延遲交換機public?static?final?String?ROUTING_KEY?=?"routingkey.bind";//綁定的routing-key/***?定義隊列**/@Beanpublic?Queue?directQueue(){return?new?Queue(DIRECT_QUEUE,true);}/***?定義延遲交換機*?args:根據該參數進行靈活路由,設置為“direct”,意味著該插件具有與直連交換機具有相同的路由行為,*?如果想要不同的路由行為,可以更換現有的交換類型如:“topic”*?交換機類型為?x-delayed-message**/@Beanpublic?CustomExchange?delayedExchange(){Map<String,?Object>?args?=?new?HashMap<String,?Object>();args.put("x-delayed-type",?"direct");return?new?CustomExchange(DELAYED_EXCHANGE,?"x-delayed-message",?true,?false,?args);}/***?隊列和延遲交換機綁定**/@Beanpublic?Binding?orderBinding()?{return?BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();}}發送消息
@RestController @RequestMapping("/delayed") public?class?DelayedSendMessageController?{@Autowiredprivate?RabbitTemplate?rabbitTemplate;@GetMapping("/sendManyMessage")public?String?sendManyMessage(){send("延遲消息睡10秒",10000);send("延遲消息睡2秒",2000);send("延遲消息睡5秒",5000);return?"ok";}private?void?send(String?msg,?Integer?delayTime){//將消息攜帶路由鍵值rabbitTemplate.convertAndSend(XDelayedMessageConfig.DELAYED_EXCHANGE,XDelayedMessageConfig.ROUTING_KEY,msg,message->{message.getMessageProperties().setDelay(delayTime);return?message;});} }消費消息
@Component @RabbitListener(queues?=?XDelayedMessageConfig.DIRECT_QUEUE)//監聽隊列名稱 public?class?DelayedMQReciever?{@RabbitHandlerpublic?void?process(String?message){System.out.println("DelayedMQReciever接收到的消息是:"+?message);} }測試
DelayedMQReciever接收到的消息是:延遲消息睡2秒 DelayedMQReciever接收到的消息是:延遲消息睡5秒 DelayedMQReciever接收到的消息是:延遲消息睡10秒這樣我們的問題就順利解決了。
局限性
延遲的消息存儲在一個Mnesia表中,當前節點上只有一個磁盤副本,它們將在節點重啟后存活。
雖然觸發計劃交付的計時器不會持久化,但它將在節點啟動時的插件激活期間重新初始化。顯然,集群中只有一個預定消息的副本意味著丟失該節點或禁用其上的插件將丟失駐留在該節點上的消息。
該插件的當前設計并不適合延遲消息數量較多的場景(如數萬條或數百萬條),另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統中使用了一定數量的長時間計時器之后,它們開始爭用調度程序資源,并且時間漂移不斷累積。
有道無術,術可成;有術無道,止于術
歡迎大家關注Java之道公眾號
好文章,我在看??
總結
以上是生活随笔為你收集整理的领导看了我写的关闭超时订单,让我出门左转!的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Dubbo为什么用Go重写?
- 下一篇: 卸载虚拟机出现用户已存在的错误_BATJ