第五章 限时订单实战笔记
什么是限時訂單?在各種電商網站下訂單后會保留一個時間段,時間段內未支付則自動將訂單狀態設置為已過期,這種訂單稱之為限時訂單。
代碼地址:https://gitee.com/hankin_chj/rocketmq-platform.git?(rocket-delay-order)
?
一、如何實現限時訂單
1、限時訂單的流程
電商平臺都會包含以下 5 種狀態。
待付款:代表買家下單了但是還沒有付款。
待發貨:代表買家付款了賣家還沒有發貨。
已發貨:代表賣家已經發貨并寄出商品了。
已完成:代表買家已經確認收到貨了。
已關閉:代表訂單過期了買家也沒付款、或者賣家關閉了訂單。?
2、限時訂單實現的關鍵
我們可以看到,訂單中的很多狀態都是可以用戶觸發的,唯獨訂單過期了買家也沒付款我們需要自動的把訂單給關閉,這個操作是沒有用戶或者是人工干預的,所以限時訂單的關鍵就是如何檢查訂單狀態,如果訂單過期了則把該訂單設置為關閉狀態。
3、輪詢數據庫?
輪詢數據庫在實現限時訂單上是可行的,而且實現起來很簡單,寫個定時器去每隔一段時間掃描數據庫,檢查到訂單過期了,做適當的業務處理。
但是輪詢會帶來什么問題?
1)輪詢大部分時間其實是在做無用功,我們假設一張訂單是45分鐘過期,每1分鐘我們掃描一次,對這張訂單來說,要掃描45次以后,才會檢查到這張訂單過期,這就意味著數據庫的資源(連接,IO)被白白浪費了;
2)處理上的不及時,一個待支付的電影票訂單我們假設是12:00:35過期,但是上次掃描的時間是 12:00:30,那么這個訂單實際的過期時間是12:01:30,和我本來的過期時間差了55秒鐘。放在業務上,會帶來什么問題?這張電影票,假設是最后一張,有個人12:00:55來買票,買得到嗎?當然買不到了。那么這張電影票很有可能就浪費了。如果縮短掃描的時間間隔,第一只能改善不能解決,第二,又會對數據庫造成更大的壓力。 那么我們能否有種機制,不用定時掃描,當訂單到期了,自然通知我們的應用去處理這些到期的訂單呢?
4、Java本身的提供的解決方案
java其實已經為我們提供了解決問題的方法。我們想要處理限時支付的問題,肯定是要有個地方保存這些限時訂單的信息的,意味著我們需要一個容器,于是我們在Java容器中去尋找Map? List? Queue?
看看java為我們提供的容器,我們是個多線程下的應用,會有多個用戶同時下訂單,所以所有并發不安全的容器首先被排除,并發安全的容器有哪些?java在阻塞隊列里為我們提供了一種叫延遲隊列delayQueue的容器,剛好可以為我們解決問題。
DelayQueue:阻塞隊列(先進先出)
1)支持阻塞的插入方法:意思是當隊列滿時,隊列會阻塞插入元素的線程,直到隊列不滿。
2)支持阻塞的移除方法:意思是在隊列為空時,獲取元素的線程會等待隊列變為非空。延遲期滿時才能從中提取元素(光隊列里有元素還不行)。
Delayed接口使對象成為延遲對象,它使存放在DelayQueue類中的對象具有了激活日期,該接口強制實現下列兩個方法:
? CompareTo(Delayed o):Delayed接口繼承了Comparable接口,因此有了這個方法,讓元素按激活日期排隊。
? getDelay(TimeUnit unit):這個方法返回到激活日期的剩余時間,時間單位由單位參數指定。 阻塞隊列更多詳情,參考《并發編程》。
5、架構師應該多考慮一點
架構師在設計和實現系統時需要考慮些什么?
功能:這個沒什么好說,實現一個應用,連基本的功能都沒實現,要這個應用有何用?
高性能:能不能盡快的為用戶提供服務和能為多少用戶同時提供服務,性能這個東西是個很綜合性的東西,從前端到后端,從架構(緩存機制、異步機制)到 web 容器、數據庫本身再到虛擬機到算法、java 代碼、sql語句的編寫,全部都對性能有影響。如何提升性能,要建立在充分的性能測試的基礎上,然后一個個的去解決性能瓶頸。對上面提到的應用來講,我們不想去輪詢數據庫,其實跟性能有非常大的關系。
高可用:應用正確處理業務,服務用戶的時間,這個時間當然是越長越好,希望可以7*24小時。而且哪怕服務器出現了升級,宕機等等情況下,能夠以最短的時間恢復,為用戶繼續服務,但是實際過程中沒有哪個網站可以說做到100%,不管是Google、FaceBook、阿里、騰訊,一般來說可以做到99.99%的可用性,已經是相當厲害了,這個水平大概就是一個服務在一年可以做到只有50分鐘不可用。這個需要技術、資金、技術人員的水平和責任心,還要運氣。
高伸縮:伸縮性是指通過不斷向集群中加入服務器的手段來緩解不斷上升的用戶并發訪問壓力和不斷增長的數據存儲需求。就像彈簧一樣掛東西一樣,用戶多,伸一點,用戶少,縮一點。衡量架構是否高伸縮性的主要標準就是是否可用多臺服務器構建集群,是否容易向集群中添加新的服務器。加入新的服務器后是否可以提供和原來服務器無差別的服務。集群中可容納的總的服務器數量是否有限制。
高擴展:的主要標準就是在網站增加新的業務產品時,是否可以實現對現有產品透明無影響,不需要任何改動或者很少改動既有業務功能就可以上線新產品。比如購買電影票的應用,用戶購買電影票,現在我們要增加一個功能,用戶買了票后,隨機抽取用戶送限量周邊。怎么做到不改動用戶下訂單功能的基礎上增加這個功能。熟悉設計模式的同學,應該很眼熟,這是設計模式中的開閉原則(對擴展開放,對修改關閉)在架構層面的一個原則。
6、從系統可用性角度考慮
應用重啟帶來的問題:
保存在Queue中的訂單會丟失,這些丟失的訂單會在什么時候過期,因為隊列里已經沒有這個訂單了,無法檢查了,這些訂單就得不到處理了。
已過期的訂單不會被處理,在應用的重啟階段,可能會有一部分訂單過期,這部分過期未支付的訂單同樣也得不到處理,會一直放在數據庫里,過期未支付訂單所對應的資源比如電影票所對應的座位,就不能被釋放出來,讓別的用戶來購買。
解決之道 :在系統啟動時另行處理
7、從系統伸縮性角度考慮
集群化了會帶來什么問題?應用之間會相互搶奪訂單,特別是在應用重啟的時候,重新啟動的那個應用會把不屬于自己的訂單,也全部加載到自己的隊列里去,一是造成內存的浪費,二來會造成訂單的重復處理,而且加大了數據庫的壓力。
解決方案:讓應用分區處理
1)給每臺服務器編號,然后在訂單表里登記每條訂單的服務器編號;
2)更簡單的,在訂單表里登記每臺服務器的IP地址,修改相應的sql語句即可。
幾個問題:如果有一臺服務器掛了怎么辦?如果是某臺服務器下線或者宕機,起不來怎么搞?這個還是還是稍微有點麻煩,需要人工干預一下,手動把庫里的每條訂單數據的服務器編號改為目前正常的服務器的編號,不過也就是一條sql語句的事,然后想辦法讓正常的服務器進行處理(重啟正常的服務器)。
二、用RocketMQ實現限時訂單
引入RocketMQ使用延時消息,一舉解決我們限時訂單的伸縮性和擴展性問題。
1、延時消息
概念介紹
延時消息:Producer將消息發送到消息隊列RocketMQ服務端,但并不期望這條消息立馬投遞,而是延遲一定時間后才投遞到Consumer進行消費,該消息即延時消息。
適用場景
消息生產和消費有時間窗口要求:比如在電商交易中超時未支付關閉訂單的場景,在訂單創建時會發送一條延時消息。這條消息將會在30分鐘以后投遞給消費者,消費者收到此消息后需要判斷對應的訂單是否已完成支付;如支付未完成,則關閉訂單,如已完成支付則忽略。
2、核心的代碼
整個代碼見delayOrder包,Git地址:https://gitee.com/hankin_chj/rocketmq-platform.git
2.1、配置部分
<!-- rocketMq生產者配置 -->
<bean id="rocketMQProducer" class="com.chj.service.mq.RocketMQProducer"
?????init-method="init" destroy-method="destroy">
???<property name="producerGroup" value="DelayOrderProducer" />
???<property name="namesrvAddr" value="127.0.0.1:9876" />
</bean>
<!-- 消費者監聽 -->
<bean id="messageListeners" class="com.chj.service.mq.MessageListenerImpl"></bean>
<!-- 消費者配置 -->
<bean id="rocketmqConsumer" class="org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"
?????init-method="start" destroy-method="shutdown">
???<property name="consumerGroup" value="TimeOrderGroup" />
???<property name="namesrvAddr" value="127.0.0.1:9876" />
???<property name="messageModel" value="CLUSTERING" />
???<property name="consumeFromWhere" value="CONSUME_FROM_LAST_OFFSET" />
???<property name="messageListener" ref="messageListeners" />
???<property name="subscription">
??????<map>
?????????<entry key="TimeOrder" value="*" />
??????</map>
???</property>
</bean>
訂單處理的控制器代碼實現:
@Controller public class OrderController {private static final String SUCCESS = "suc";private static final String FAILUER = "failure";@Autowiredprivate SaveOrder saveOrder;@RequestMapping("/index")public String userOrder(){return "order";}//保存訂單(界面生成幾個訂單)@RequestMapping("/submitOrder")@ResponseBodypublic String saveOrder(@RequestParam("orderNumber")int orderNumber){saveOrder.insertOrders(orderNumber);return SUCCESS;} }2.2、核心代碼實現
1)保存訂單SaveOrder.java的時候,作為生產者往消息隊列里推入訂單,核心RocketMQProducer,這個類當然是要繼承IDelayOrder,同時也是RocketMQ的生產者。
訂單相關的服務SaveOrder.java代碼實現:
@Service public class SaveOrder {private Logger logger = LoggerFactory.getLogger(SaveOrder.class);public final static short UNPAY = 0;public final static short PAYED = 1;public final static short EXPIRED = -1;@Autowiredprivate OrderExpDao orderExpDao;@Autowired@Qualifier("rocketmq")private IDelayOrder delayOrder;/*** 接收前端頁面參數,生成訂單* @param orderNumber 訂單個數*/public void insertOrders(int orderNumber){Random r = new Random();OrderExp orderExp ;for(int i=0;i<orderNumber;i++) {//這個是設置延時消息的屬性//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ?18個等級long expire_duration =30;long expireTime =4;orderExp = new OrderExp();String orderNo = "DD00_30S";orderExp.setOrderNo(orderNo);orderExp.setOrderNote("享學訂單——"+orderNo);orderExp.setOrderStatus(UNPAY);orderExpDao.insertDelayOrder(orderExp,expire_duration);logger.info("保存訂單到DB:"+orderNo);//TODO 這里需要把訂單信息存入RocketMQdelayOrder.orderDelay(orderExp, expireTime);}}@PostConstructpublic void initDelayOrder() {logger.info("系統啟動,掃描表中過期未支付的訂單并處理.........");int counts = orderExpDao.updateExpireOrders();logger.info("系統啟動,處理了表中["+counts+"]個過期未支付的訂單!");List<OrderExp> orderList = orderExpDao.selectUnPayOrders();logger.info("系統啟動,發現了表中還有["+orderList.size()+"]個未到期未支付的訂單!推入檢查隊列準備到期檢查....");for(OrderExp order:orderList) {long expireTime = order.getExpireTime().getTime()-(new Date().getTime());delayOrder.orderDelay(order, expireTime);}} }消息隊列的實現RocketMQProducer:
@Service @Qualifier("rocketmq") public class RocketMQProducer implements IDelayOrder {@Autowiredprivate DlyOrderProcessor processDelayOrder;private Thread takeOrder;private static final Logger logger = LoggerFactory.getLogger(RocketMQProducer.class);private DefaultMQProducer defaultMQProducer;private String producerGroup;private String namesrvAddr;@PostConstructpublic void init() throws MQClientException {this.defaultMQProducer = new DefaultMQProducer(this.producerGroup);defaultMQProducer.setNamesrvAddr(this.namesrvAddr);defaultMQProducer.start();logger.info("rocketMQ初始化生產者完成[producerGroup:" + producerGroup + "]");}@PreDestroypublic void destroy() {defaultMQProducer.shutdown();logger.info("rocketMQ生產者[producerGroup: " + producerGroup + "]已停止");}public DefaultMQProducer getDefaultMQProducer() {return defaultMQProducer;}public void setProducerGroup(String producerGroup) {this.producerGroup = producerGroup;}public void setNamesrvAddr(String namesrvAddr) {this.namesrvAddr = namesrvAddr;}public void orderDelay(OrderExp order, long timeLevel) {try {//TODO 使用Gson序列化Gson gson = new Gson();String txtMsg = gson.toJson(order);//TODO 發送延時消息Message msg = new Message("TimeOrder", null, txtMsg.getBytes());//這個是設置延時消息的屬性//"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ?18個等級msg.setDelayTimeLevel((int)timeLevel);SendResult result = defaultMQProducer.send(msg);if(result.getSendStatus() !=null && result.getSendStatus()== SendStatus.SEND_OK){System.out.println("訂單被推入延遲隊列,訂單詳情:"+order);logger.info("訂單被推入延遲隊列,訂單詳情:"+order);}else{logger.error("訂單推入RocketMq失敗,訂單詳情:"+order+"SendStatus:"+result.getSendStatus());}} catch (Exception e) {logger.error("單推入RocketMq失敗,失敗詳情:"+e.toString());}} }2)消息隊列會把延時的訂單發給消費者MessageListenerImpl,它是一個RocketMQ的消費者監聽,它來負責檢查訂單是否過期,有消息過來,證明消息訂單過期了,則把訂單狀態修改為過期訂單。RocketMQ本身又如何保證可用性和伸縮性?這個就需要RocketMQ的主從同步(HA機制)。
處理消息隊列返回的延時訂單MessageListenerImpl:
@Service public class MessageListenerImpl implements MessageListenerConcurrently {private Logger logger = LoggerFactory.getLogger(MessageListenerImpl.class);@Autowiredprivate DlyOrderProcessor processDlyOrder;public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {try {//TODO 使用GSON反序列化String txtMsg = new String(msg.getBody());Gson gson = new Gson();System.out.println("接收到RocketMQ的消息:"+txtMsg);OrderExp order = (OrderExp)gson.fromJson(txtMsg, OrderExp.class);//TODO 修改訂單狀態為過期if(order.getId()!=null){processDlyOrder.checkDelayOrder(order);}} catch (Exception e) {e.printStackTrace();return ConsumeConcurrentlyStatus.RECONSUME_LATER;}}// 如果沒有異常會認為都成功消費return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;} }處理過期訂單的服務:
@Service public class DlyOrderProcessor {private Logger logger = LoggerFactory.getLogger(DlyOrderProcessor.class);@Autowiredprivate OrderExpDao orderExpDao;/*** 檢查數據庫中指定id的訂單的狀態,如果為未支付,則修改為已過期* */public void checkDelayOrder(OrderExp record) {OrderExp dbOrder = orderExpDao.selectByPrimaryKey(record.getId());if(dbOrder.getOrderStatus()==SaveOrder.UNPAY) {logger.info("訂單【"+record+"】未支付已過期,需要更改為過期訂單!");orderExpDao.updateExpireOrder(record.getId());}else {logger.info("已支付訂單【"+record+"】,無需修改!");}} }?
總結
以上是生活随笔為你收集整理的第五章 限时订单实战笔记的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cs224n第二讲:简单的词向量表示:w
- 下一篇: 使用Flickr的图片拼出你的句子