mall整合RabbitMQ实现延迟消息
摘要
本文主要講解mall整合RabbitMQ實(shí)現(xiàn)延遲消息的過程,以發(fā)送延遲消息取消超時(shí)訂單為例。RabbitMQ是一個(gè)被廣泛使用的開源消息隊(duì)列。它是輕量級(jí)且易于部署的,它能支持多種消息協(xié)議。RabbitMQ可以部署在分布式和聯(lián)合配置中,以滿足高規(guī)模、高可用性的需求。
項(xiàng)目使用框架介紹
RabbitMQ
RabbitMQ是一個(gè)被廣泛使用的開源消息隊(duì)列。它是輕量級(jí)且易于部署的,它能支持多種消息協(xié)議。RabbitMQ可以部署在分布式和聯(lián)合配置中,以滿足高規(guī)模、高可用性的需求。
RabbitMQ的安裝和使用
輸入賬號(hào)密碼并登錄:guest guest
創(chuàng)建帳號(hào)并設(shè)置其角色為管理員:mall mall
RabbitMQ的消息模型
| P | 生產(chǎn)者 | Producer | 消息的發(fā)送者,可以將消息發(fā)送到交換機(jī) |
| C | 消費(fèi)者 | Consumer | 消息的接收者,從隊(duì)列中獲取消息進(jìn)行消費(fèi) |
| X | 交換機(jī) | Exchange | 接收生產(chǎn)者發(fā)送的消息,并根據(jù)路由鍵發(fā)送給指定隊(duì)列 |
| Q | 隊(duì)列 | Queue | 存儲(chǔ)從交換機(jī)發(fā)來的消息 |
| type | 交換機(jī)類型 | type | direct表示直接根據(jù)路由鍵(orange/black)發(fā)送消息 |
Lombok
Lombok為Java語言添加了非常有趣的附加功能,你可以不用再為實(shí)體類手寫getter,setter等方法,通過一個(gè)注解即可擁有。
注意:需要安裝idea的Lombok插件,并在項(xiàng)目中的pom文件中添加依賴。
業(yè)務(wù)場(chǎng)景說明
用于解決用戶下單以后,訂單超時(shí)如何取消訂單的問題。
- 用戶進(jìn)行下單操作(會(huì)有鎖定商品庫存、使用優(yōu)惠券、積分一系列的操作);
- 生成訂單,獲取訂單的id;
- 獲取到設(shè)置的訂單超時(shí)時(shí)間(假設(shè)設(shè)置的為60分鐘不支付取消訂單);
- 按訂單超時(shí)時(shí)間發(fā)送一個(gè)延遲消息給RabbitMQ,讓它在訂單超時(shí)后觸發(fā)取消訂單的操作;
- 如果用戶沒有支付,進(jìn)行取消訂單操作(釋放鎖定商品庫存、返還優(yōu)惠券、返回積分一系列操作)。
整合RabbitMQ實(shí)現(xiàn)延遲消息
在pom.xml中添加相關(guān)依賴
<!--消息隊(duì)列相關(guān)依賴--> <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!--lombok依賴--> <dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional> </dependency> 復(fù)制代碼修改SpringBoot配置文件
修改application.yml文件,在spring節(jié)點(diǎn)下添加Mongodb相關(guān)配置。
rabbitmq: host: localhost # rabbitmq的連接地址 port: 5672 # rabbitmq的連接端口號(hào) virtual-host: /mall # rabbitmq的虛擬host username: mall # rabbitmq的用戶名 password: mall # rabbitmq的密碼 publisher-confirms: true #如果對(duì)異步消息需要回調(diào)必須設(shè)置為true 復(fù)制代碼添加消息隊(duì)列的枚舉配置類QueueEnum
用于延遲消息隊(duì)列及處理取消訂單消息隊(duì)列的常量定義,包括交換機(jī)名稱、隊(duì)列名稱、路由鍵名稱。
package com.macro.mall.tiny.dto;import lombok.Getter;/*** 消息隊(duì)列枚舉配置* Created by macro on 2018/9/14.*/ public enum QueueEnum {/*** 消息通知隊(duì)列*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl隊(duì)列*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交換名稱*/private String exchange;/*** 隊(duì)列名稱*/private String name;/*** 路由鍵*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;} }復(fù)制代碼添加RabbitMQ的配置
用于配置交換機(jī)、隊(duì)列及隊(duì)列與交換機(jī)的綁定關(guān)系。
package com.macro.mall.tiny.config;import com.macro.mall.tiny.dto.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** 消息隊(duì)列配置* Created by macro on 2018/9/14.*/ public class RabbitMqConfig {/*** 訂單消息實(shí)際消費(fèi)隊(duì)列所綁定的交換機(jī)*/DirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單延遲隊(duì)列隊(duì)列所綁定的交換機(jī)*/DirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 訂單實(shí)際消費(fèi)隊(duì)列*/public Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 訂單延遲隊(duì)列(死信隊(duì)列)*/public Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后轉(zhuǎn)發(fā)的交換機(jī).withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后轉(zhuǎn)發(fā)的路由鍵.build();}/*** 將訂單隊(duì)列綁定到交換機(jī)*/Binding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 將訂單延遲隊(duì)列綁定到交換機(jī)*/Binding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());}} 復(fù)制代碼在RabbitMQ管理頁面可以看到以下交換機(jī)和隊(duì)列
交換機(jī)及隊(duì)列說明
- mall.order.direct(取消訂單消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發(fā)過來,會(huì)發(fā)送到此隊(duì)列。
- mall.order.direct.ttl(訂單延遲消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發(fā)送過來,會(huì)轉(zhuǎn)發(fā)到此隊(duì)列,并在此隊(duì)列保存一定時(shí)間,等到超時(shí)后會(huì)自動(dòng)將消息發(fā)送到mall.order.cancel(取消訂單消息消費(fèi)隊(duì)列)。
添加延遲消息的發(fā)送者CancelOrderSender
用于向訂單延遲消息隊(duì)列(mall.order.cancel.ttl)里發(fā)送消息。
package com.macro.mall.tiny.component;import com.macro.mall.tiny.dto.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** 取消訂單消息的發(fā)出者* Created by macro on 2018/9/14.*/ public class CancelOrderSender {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);private AmqpTemplate amqpTemplate;public void sendMessage(Long orderId,final long delayTimes){//給延遲隊(duì)列發(fā)送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {public Message postProcessMessage(Message message) throws AmqpException {//給消息設(shè)置延遲毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});LOGGER.info("send delay message orderId:{}",orderId);} }復(fù)制代碼添加取消訂單消息的接收者CancelOrderReceiver
用于從取消訂單的消息隊(duì)列(mall.order.cancel)里接收消息。
package com.macro.mall.tiny.component;import com.macro.mall.tiny.service.OmsPortalOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** 取消訂單消息的處理者* Created by macro on 2018/9/14.*/ (queues = "mall.order.cancel") public class CancelOrderReceiver {private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);private OmsPortalOrderService portalOrderService;public void handle(Long orderId){LOGGER.info("receive delay message orderId:{}",orderId);portalOrderService.cancelOrder(orderId);} }復(fù)制代碼添加OmsPortalOrderService接口
package com.macro.mall.tiny.service;import com.macro.mall.tiny.common.api.CommonResult; import com.macro.mall.tiny.dto.OrderParam; import org.springframework.transaction.annotation.Transactional;/*** 前臺(tái)訂單管理Service* Created by macro on 2018/8/30.*/ public interface OmsPortalOrderService {/*** 根據(jù)提交信息生成訂單*/CommonResult generateOrder(OrderParam orderParam);/*** 取消單個(gè)超時(shí)訂單*/void cancelOrder(Long orderId); }復(fù)制代碼添加OmsPortalOrderService的實(shí)現(xiàn)類OmsPortalOrderServiceImpl
package com.macro.mall.tiny.service.impl;import com.macro.mall.tiny.common.api.CommonResult; import com.macro.mall.tiny.component.CancelOrderSender; import com.macro.mall.tiny.dto.OrderParam; import com.macro.mall.tiny.service.OmsPortalOrderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;/*** 前臺(tái)訂單管理Service* Created by macro on 2018/8/30.*/ public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);private CancelOrderSender cancelOrderSender;public CommonResult generateOrder(OrderParam orderParam) {//todo 執(zhí)行一系類下單操作,具體參考mall項(xiàng)目LOGGER.info("process generateOrder");//下單完成后開啟一個(gè)延遲消息,用于當(dāng)用戶沒有付款時(shí)取消訂單(orderId應(yīng)該在下單后生成)sendDelayMessageCancelOrder(11L);return CommonResult.success(null, "下單成功");}public void cancelOrder(Long orderId) {//todo 執(zhí)行一系類取消訂單操作,具體參考mall項(xiàng)目LOGGER.info("process cancelOrder orderId:{}",orderId);}private void sendDelayMessageCancelOrder(Long orderId) {//獲取訂單超時(shí)時(shí)間,假設(shè)為60分鐘long delayTimes = 30 * 1000;//發(fā)送延遲消息cancelOrderSender.sendMessage(orderId, delayTimes);}}復(fù)制代碼添加OmsPortalOrderController定義接口
package com.macro.mall.tiny.controller;import com.macro.mall.tiny.dto.OrderParam; import com.macro.mall.tiny.service.OmsPortalOrderService; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.ResponseBody;/*** 訂單管理Controller* Created by macro on 2018/8/30.*/ (tags = "OmsPortalOrderController", description = "訂單管理") ("/order") public class OmsPortalOrderController {private OmsPortalOrderService portalOrderService;("根據(jù)購物車信息生成訂單")(value = "/generateOrder", method = RequestMethod.POST)public Object generateOrder(@RequestBody OrderParam orderParam) {return portalOrderService.generateOrder(orderParam);} }復(fù)制代碼進(jìn)行接口測(cè)試
調(diào)用下單接口
注意:已經(jīng)將延遲消息時(shí)間設(shè)置為30秒
項(xiàng)目源碼地址
github.com/macrozheng/…
公眾號(hào)
mall項(xiàng)目全套學(xué)習(xí)教程連載中,關(guān)注公眾號(hào)第一時(shí)間獲取。
轉(zhuǎn)載于:https://juejin.im/post/5cff98986fb9a07ed36ea139
總結(jié)
以上是生活随笔為你收集整理的mall整合RabbitMQ实现延迟消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Oracle加密解密
- 下一篇: docker操作之mysql容器