Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)
轉自: https://juejin.cn/post/6844903903130042376
文末有源代碼,非常棒
摘要:
本篇博文是“Java秒殺系統實戰系列文章”的第十篇,本篇博文我們將采用RabbitMQ的死信隊列的方式處理“用戶秒殺成功生成訂單后,卻遲遲沒有支付”的情況,一起來見識一下RabbitMQ死信隊列在實際業務環境下的強大之處!內容:對于消息中間件RabbitMQ,Debug其實在前面的篇章中已經簡單分享介紹過了,在這里就不再贅述了!在本文我們將采用RabbitMQ的死信隊列實現這樣的業務需求:“用戶在秒殺成功并成功創建一筆訂單記錄后,理論上應該是執行去支付的操作,但是卻存在著一種情況是用戶遲遲不肯去支付~至于原因,不得而知!”對于這種場景,各位小伙伴可以在一些商城平臺體驗一下,即挑選完商品,加入購物車后,點擊去結算,這個時候會有個倒計時,提醒你需要在指定的時間內完成付款,否則訂單將失效!對于這種業務邏輯的處理,傳統的做法是采用“定時器的方式”,定時輪詢獲取已經超過指定時間的訂單,然后執行一系列的處理措施(比如再爭取給用戶發送短信,提醒超過多長時間訂單就要失效了等等。。。),在這個秒殺系統中,我們將借助RabbitMQ死信隊列這一組件,對該訂單執行“失效”的措施!“死信隊列”,顧明思議,是可以延時、延遲一定的時間再處理消息的一種特殊隊列,它相對于“普通的隊列”而言,可以實現“進入死信隊列的消息不立即處理,而是可以等待一定的時間再進行處理”的功能!而普通的隊列則不行,即進入隊列后的消息會立即被對應的消費者監聽消費,如下圖所示為普通隊列的基本消息模型:而對于“死信隊列”,它的構成以及使用相對而言比較復雜一點,在正常情況,死信隊列由三大核心組件組成:死信交換機+死信路由+TTL(消息存活時間~非必需的),而死信隊列又可以由“面向生產者的基本交換機+基本路由”綁定而成,故而生產者首先是將消息發送至“基本交換機+基本路由”所綁定而成的消息模型中,即間接性地進入到死信隊列中,當過了TTL,消息將“掛掉”,從而進入下一個中轉站,即“面下那個消費者的死信交換機+死信路由”所綁定而成的消息模型中。如下圖所示:下面,我們以實際的代碼來構建死信隊列的消息模型,并將此消息模型應用到秒殺系統的上述功能模塊中。(1)首先,需要在RabbitmqConfig配置類創建死信隊列的消息模型,其完整的源代碼如下所示://構建秒殺成功之后-訂單超時未支付的死信隊列消息模型
@Bean
public Queue successKillDeadQueue(){
??? Map<String, Object> argsMap= Maps.newHashMap();
??? argsMap.put("x-dead-letter-exchange",env.getProperty("mq.kill.item.success.kill.dead.exchange"));
??? argsMap.put("x-dead-letter-routing-key",env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
??? return new Queue(env.getProperty("mq.kill.item.success.kill.dead.queue"),true,false,false,argsMap);
}
//基本交換機
@Bean
public TopicExchange successKillDeadProdExchange(){
??? return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"),true,false);
}
//創建基本交換機+基本路由 -> 死信隊列 的綁定
@Bean
public Binding successKillDeadProdBinding(){
??? return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
}
//真正的隊列
@Bean
public Queue successKillRealQueue(){
??? return new Queue(env.getProperty("mq.kill.item.success.kill.dead.real.queue"),true);
}
//死信交換機
@Bean
public TopicExchange successKillDeadExchange(){
??? return new TopicExchange(env.getProperty("mq.kill.item.success.kill.dead.exchange"),true,false);
}
//死信交換機+死信路由->真正隊列 的綁定
@Bean
public Binding successKillDeadBinding(){
??? return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(env.getProperty("mq.kill.item.success.kill.dead.routing.key"));
}
復制代碼其中,環境變量對象實例env讀取的變量是配置在application.properties配置文件中的,取值如下所示:#訂單超時未支付自動失效-死信隊列消息模型
mq.kill.item.success.kill.dead.queue=${mq.env}.kill.item.success.kill.dead.queue
mq.kill.item.success.kill.dead.exchange=${mq.env}.kill.item.success.kill.dead.exchange
mq.kill.item.success.kill.dead.routing.key=${mq.env}.kill.item.success.kill.dead.routing.key
mq.kill.item.success.kill.dead.real.queue=${mq.env}.kill.item.success.kill.dead.real.queue
mq.kill.item.success.kill.dead.prod.exchange=${mq.env}.kill.item.success.kill.dead.prod.exchange
mq.kill.item.success.kill.dead.prod.routing.key=${mq.env}.kill.item.success.kill.dead.prod.routing.key
#單位為ms
mq.kill.item.success.kill.expire=20000
復制代碼(2)成功創建了消息模型之后,緊接著,我們需要在通用的RabbitMQ發送消息服務類RabbitSenderService中開發“發送消息入死信隊列”的功能,在該功能方法中,我們指定了消息的存活時間TTL,取值為配置的變量:mq.kill.item.success.kill.expire 的值,即20s;其完整的源代碼如下所示://秒殺成功后生成搶購訂單-發送信息入死信隊列,等待著一定時間失效超時未支付的訂單
public void sendKillSuccessOrderExpireMsg(final String orderCode){
??? try {
??????? if (StringUtils.isNotBlank(orderCode)){
??????????? KillSuccessUserInfo info=itemKillSuccessMapper.selectByCode(orderCode);
??????????? if (info!=null){
??????????????? rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
??????????????? rabbitTemplate.setExchange(env.getProperty("mq.kill.item.success.kill.dead.prod.exchange"));
??????????????? rabbitTemplate.setRoutingKey(env.getProperty("mq.kill.item.success.kill.dead.prod.routing.key"));
??????????????? rabbitTemplate.convertAndSend(info, new MessagePostProcessor() {
??????????????????? @Override
??????????????????? public Message postProcessMessage(Message message) throws AmqpException {
??????????????????????? MessageProperties mp=message.getMessageProperties();
??????????????????????? mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
??????????????????????? mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,KillSuccessUserInfo.class);
??????????????????????? //TODO:動態設置TTL(為了測試方便,暫且設置20s)
??????????????????????? mp.setExpiration(env.getProperty("mq.kill.item.success.kill.expire"));
??????????????????????? return message;
??????????????????? }
??????????????? });
??????????? }
??????? }
??? }catch (Exception e){
??????? log.error("秒殺成功后生成搶購訂單-發送信息入死信隊列,等待著一定時間失效超時未支付的訂單-發生異常,消息為:{}",orderCode,e.fillInStackTrace());
??? }
}
復制代碼從該“發送消息入死信隊列”的代碼中,我們可以看到,消息首先是先入到“基本交換機+基本路由”所綁定的死信隊列的消息模型中的!當消息到了TTL,自然會從死信隊列中出來(即“解脫了”),然后進入下一個中轉站,即:“死信交換機+死信路由” 所綁定而成的真正隊列的消息模型中,最終真正被消費者監聽消費!此時,可以將整個項目、系統運行在外置的tomcat服務器中,然后打開RabbitMQ后端控制臺應用,找到該死信隊列,可以看到該死信隊列的詳細信息,如下圖所示:(3)最后,是需要在RabbitMQ通用的消息監聽服務類RabbitReceiverService 中監聽“真正隊列”中的消息并進行處理:在這里我們是對該訂單進行失效處理(前提是還沒付款的情況下!),其完整的源代碼如下所示://用戶秒殺成功后超時未支付-監聽者
@RabbitListener(queues = {"${mq.kill.item.success.kill.dead.real.queue}"},containerFactory = "singleListenerContainer")
public void consumeExpireOrder(KillSuccessUserInfo info){
??? try {
??????? log.info("用戶秒殺成功后超時未支付-監聽者-接收消息:{}",info);
??????? if (info!=null){
??????????? ItemKillSuccess entity=itemKillSuccessMapper.selectByPrimaryKey(info.getCode());
??????????? if (entity!=null && entity.getStatus().intValue()==0){
??????????????? itemKillSuccessMapper.expireOrder(info.getCode());
??????????? }
??????? }
??? }catch (Exception e){
??????? log.error("用戶秒殺成功后超時未支付-監聽者-發生異常:",e.fillInStackTrace());
??? }
}
復制代碼其中,失效更新訂單的記錄的操作由 itemKillSuccessMapper.expireOrder(info.getCode()); 來實現,其對應的動態Sql的寫法如下所示:<!--失效更新訂單信息-->
<update id="expireOrder">
? UPDATE item_kill_success
? SET status = -1
? WHERE code = #{code} AND status = 0
</update>
復制代碼(4)至此,關于RabbitMQ死信隊列消息模型的代碼實戰已經完畢了!最后我只需要在“用戶秒殺成功創建訂單的那一刻,發送消息入死信隊列”的地方調用即可,其調用代碼如下所示:/**
?* 通用的方法-記錄用戶秒殺成功后生成的訂單-并進行異步郵件消息的通知
?* @param kill
?* @param userId
?* @throws Exception
?*/
private void commonRecordKillSuccessInfo(ItemKill kill, Integer userId) throws Exception{
??? //TODO:記錄搶購成功后生成的秒殺訂單記錄
??? ItemKillSuccess entity=new ItemKillSuccess();
??? String orderNo=String.valueOf(snowFlake.nextId());
??? //entity.setCode(RandomUtil.generateOrderCode());?? //傳統時間戳+N位隨機數
??? entity.setCode(orderNo); //雪花算法
??? entity.setItemId(kill.getItemId());
??? entity.setKillId(kill.getId());
??? entity.setUserId(userId.toString());
??? entity.setStatus(SysConstant.OrderStatus.SuccessNotPayed.getCode().byteValue());
??? entity.setCreateTime(DateTime.now().toDate());
??? //TODO:學以致用,舉一反三 -> 仿照單例模式的雙重檢驗鎖寫法
??? if (itemKillSuccessMapper.countByKillUserId(kill.getId(),userId) <= 0){
??????? int res=itemKillSuccessMapper.insertSelective(entity);
??????? if (res>0){
??????????? //TODO:進行異步郵件消息的通知=rabbitmq+mail
??????????? rabbitSenderService.sendKillSuccessEmailMsg(orderNo);
??????????? //TODO:入死信隊列,用于 “失效” 超過指定的TTL時間時仍然未支付的訂單
??????????? rabbitSenderService.sendKillSuccessOrderExpireMsg(orderNo);
??????? }
??? }
}
復制代碼最后,是進行自測:點擊“搶購”按鈕,用戶秒殺成功后,會發送一條消息入死信隊列(這一點可以在RabbitMQ后端控制臺中可以看到一條正Ready好的消息),等待20s,即可看到消息轉移到真正的隊列,并被真正的消費者監聽消費,如下所示: 好了,關于“RabbitMQ死信隊列”的介紹以及應用實戰本文就暫且介紹到這里了,此種方式可以很靈活對“超時未支付的訂單”,進行很好的處理,而且整個過程是“自動、自然”的,而無需人為去手動點擊按鈕觸發了!當然啦,萬事萬物都并非十全十美的,死信隊列也是如此,在一篇文章中我們將介紹此種方式的瑕疵之處,并采用相應的解決方案進行處理!補充:1、目前,這一秒殺系統的整體構建與代碼實戰已經全部完成了,
?
完整的源代碼數據庫地址可以來這里下載: https://gitee.com/steadyjack/SpringBoot-SecondKill
?
總結
以上是生活随笔為你收集整理的Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单(转)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: rabbitmq-通配符模式
- 下一篇: 医美产品备案查询(医美备案查询)