當(dāng)前位置:
                    首頁(yè) >
                            前端技术
>                            javascript
>内容正文                
                        
                    javascript
rabbitmq 取消消息_SpringBoot整合RabbitMQ实现延迟消息
                                                            生活随笔
收集整理的這篇文章主要介紹了
                                rabbitmq 取消消息_SpringBoot整合RabbitMQ实现延迟消息
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.                        
                                ## RabbitMQ
RabbitMQ是一個(gè)被廣泛使用的開源消息隊(duì)列。它是輕量級(jí)且易于部署的,它能支持多種消息協(xié)議。RabbitMQ可以部署在分布式和聯(lián)合配置中,以滿足高規(guī)模、高可用性的需求。RabbitMQ的安裝與部署
在這里就不在陳述了,如果需要的話請(qǐng)看這篇博客https://www.cnblogs.com/chy-op/p/9611124.html
RabbitMQ的消息模型
標(biāo)志中文名英文名描述P生產(chǎn)者Producer消息的發(fā)送者,可以將消息發(fā)送到交換機(jī)C消費(fèi)者Consumer消息的接收者,從隊(duì)列中獲取消息進(jìn)行消費(fèi)X交換機(jī)Exchange接收生產(chǎn)者發(fā)送的消息,并根據(jù)路由鍵發(fā)送給指定隊(duì)列Q隊(duì)列Queue存儲(chǔ)從交換機(jī)發(fā)來(lái)的消息type交換機(jī)類型typedirect表示直接根據(jù)路由鍵(orange/black)發(fā)送消息
半藏商城中應(yīng)用消息隊(duì)列的場(chǎng)景
1、用于解決用戶下單以后,訂單超時(shí)如何取消訂單的問(wèn)題。 - 用戶進(jìn)行提交訂單操作(會(huì)有鎖定商品庫(kù)存等操作); - 生成訂單,獲取訂單的id; - 獲取到設(shè)置的訂單超時(shí)時(shí)間(假設(shè)設(shè)置的為60分鐘不支付取消訂單); - 按訂單超時(shí)時(shí)間發(fā)送一個(gè)延遲消息給RabbitMQ,讓它在訂單超時(shí)后觸發(fā)取消訂單的操作; - 如果用戶沒有支付,進(jìn)行取消訂單操作(釋放鎖定商品庫(kù)存一系列操作)。實(shí)現(xiàn)方法 - 需要一個(gè)訂單延遲消息隊(duì)列 以及一個(gè)取消訂單消息隊(duì)列, - 一旦有消息以延遲訂單設(shè)置的路由鍵發(fā)送過(guò)來(lái),會(huì)轉(zhuǎn)發(fā)到訂單延遲消息隊(duì)列,并在此隊(duì)列保存一定時(shí)間,等到超時(shí)后會(huì)自動(dòng)將消息發(fā)送到取消訂單消息消費(fèi)隊(duì)列。
2、短信驗(yàn)證碼以及郵箱驗(yàn)證碼都采用消息隊(duì)列進(jìn)行消費(fèi)。 - 采用隊(duì)列,交換機(jī),路由鍵進(jìn)行消費(fèi)。一條隊(duì)列,一個(gè)交換機(jī),一個(gè)路由鍵就可以實(shí)現(xiàn)。
在pom.xml中添加相關(guān)依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>修改SpringBoot配置文件
#SpringBoot配置RabbitMqrabbitmq:host: localhost # rabbitmq的連接地址port: 5672 # rabbitmq的連接端口號(hào)virtual-host: /hanzoMall # rabbitmq的虛擬hostusername: hanzoMall # rabbitmq的用戶名password: hanzoMall # rabbitmq的密碼publisher-confirms: true #如果對(duì)異步消息需要回調(diào)必須設(shè)置為true消息隊(duì)列的枚舉配置類QueueEnum
用于延遲消息隊(duì)列及處理取消訂單消息隊(duì)列的常量定義,包括交換機(jī)名稱、隊(duì)列名稱、路由鍵名稱。 以及短信發(fā)送消息隊(duì)列和郵箱驗(yàn)證碼發(fā)送消息隊(duì)列的常量定義,包括交換機(jī)名稱、隊(duì)列名稱、路由鍵名稱。package ltd.hanzo.mall.common;import com.rabbitmq.client.AMQP; import lombok.Getter;/*** @Author 皓宇QAQ* @email 2469653218@qq.com* @Date 2020/5/23 21:00* @link https://github.com/Tianhaoy/hanzomall* @Description: 消息隊(duì)列枚舉配置*/ @Getter public enum QueueEnum {/*** 發(fā)送短信消息通知隊(duì)列*/QUEUE_SMS_SEND("mall.sms.direct", "mall.sms.send", "mall.sms.send"),/*** 發(fā)送郵件消息通知隊(duì)列*/QUEUE_EMAIL_SEND("mall.email.direct", "mall.email.send", "mall.email.send"),/*** 消息通知隊(duì)列* mall.order.direct(取消訂單消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發(fā)過(guò)來(lái),會(huì)發(fā)送到此隊(duì)列。*/QUEUE_ORDER_CANCEL("mall.order.direct", "mall.order.cancel", "mall.order.cancel"),/*** 消息通知ttl隊(duì)列* mall.order.direct.ttl(訂單延遲消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發(fā)送過(guò)來(lái),會(huì)轉(zhuǎn)發(fā)到此隊(duì)列,并在此隊(duì)列保存一定時(shí)間,等到超時(shí)后會(huì)自動(dòng)將消息發(fā)送到mall.order.cancel(取消訂單消息消費(fèi)隊(duì)列)。*/QUEUE_TTL_ORDER_CANCEL("mall.order.direct.ttl", "mall.order.cancel.ttl", "mall.order.cancel.ttl");/*** 交換機(jī)名稱*/private String exchange;/*** 隊(duì)列名稱*/private String name;/*** 路由鍵*/private String routeKey;QueueEnum(String exchange, String name, String routeKey) {this.exchange = exchange;this.name = name;this.routeKey = routeKey;} }添加RabbitMq的配置
用于配置交換機(jī)、隊(duì)列及隊(duì)列與交換機(jī)的綁定關(guān)系。package ltd.hanzo.mall.config;import ltd.hanzo.mall.common.QueueEnum; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;/*** @Author 皓宇QAQ* @email 2469653218@qq.com* @Date 2020/5/23 21:04* @link https://github.com/Tianhaoy/hanzomall* @Description: 消息隊(duì)列配置*/ @Configuration public class RabbitMqConfig {/*** 1.0發(fā)送短信消息通知隊(duì)列所綁定的->交換機(jī)*/@BeanDirectExchange sendSmsDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_SMS_SEND.getExchange()).durable(true).build();}/*** 1.0發(fā)送短信的->消費(fèi)隊(duì)列*/@Beanpublic Queue sendSmsQueue() {return new Queue(QueueEnum.QUEUE_SMS_SEND.getName());}/*** 1.0將發(fā)送短信 隊(duì)列綁定到->交換機(jī)*/@BeanBinding sendSmsBinding(DirectExchange sendSmsDirect, Queue sendSmsQueue){return BindingBuilder.bind(sendSmsQueue).to(sendSmsDirect).with(QueueEnum.QUEUE_SMS_SEND.getRouteKey());}/*** 2.0發(fā)送郵件消息通知隊(duì)列所綁定的->交換機(jī)*/@BeanDirectExchange sendEmailDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_EMAIL_SEND.getExchange()).durable(true).build();}/*** 2.0發(fā)送郵件的->消費(fèi)隊(duì)列*/@Beanpublic Queue sendEmailQueue() {return new Queue(QueueEnum.QUEUE_EMAIL_SEND.getName());}/*** 2.0將發(fā)送郵件 隊(duì)列綁定到->交換機(jī)*/@BeanBinding sendEmailBinding(DirectExchange sendEmailDirect, Queue sendEmailQueue){return BindingBuilder.bind(sendEmailQueue).to(sendEmailDirect).with(QueueEnum.QUEUE_EMAIL_SEND.getRouteKey());}/*** 3.0訂單消息實(shí)際消費(fèi)隊(duì)列所綁定的->交換機(jī)*/@BeanDirectExchange orderDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 3.0訂單實(shí)際消費(fèi)隊(duì)列*/@Beanpublic Queue orderQueue() {return new Queue(QueueEnum.QUEUE_ORDER_CANCEL.getName());}/*** 3.0將訂單隊(duì)列綁定到交換機(jī)*/@BeanBinding orderBinding(DirectExchange orderDirect,Queue orderQueue){return BindingBuilder.bind(orderQueue).to(orderDirect).with(QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey());}/*** 4.0訂單延遲隊(duì)列隊(duì)列所綁定的->交換機(jī)*/@BeanDirectExchange orderTtlDirect() {return (DirectExchange) ExchangeBuilder.directExchange(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange()).durable(true).build();}/*** 4.0訂單延遲隊(duì)列(死信隊(duì)列)*/@Beanpublic Queue orderTtlQueue() {return QueueBuilder.durable(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getName()).withArgument("x-dead-letter-exchange", QueueEnum.QUEUE_ORDER_CANCEL.getExchange())//到期后轉(zhuǎn)發(fā)的交換機(jī).withArgument("x-dead-letter-routing-key", QueueEnum.QUEUE_ORDER_CANCEL.getRouteKey())//到期后轉(zhuǎn)發(fā)的路由鍵.build();}/*** 4.0將訂單延遲隊(duì)列綁定到交換機(jī)*/@BeanBinding orderTtlBinding(DirectExchange orderTtlDirect,Queue orderTtlQueue){return BindingBuilder.bind(orderTtlQueue).to(orderTtlDirect).with(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey());} }在RabbitMQ管理頁(yè)面可以看到以下交換機(jī)和隊(duì)列
交換機(jī)及隊(duì)列說(shuō)明
- mall.order.direct(取消訂單消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel,一旦有消息以mall.order.cancel為路由鍵發(fā)過(guò)來(lái),會(huì)發(fā)送到此隊(duì)列。
- mall.order.direct.ttl(訂單延遲消息隊(duì)列所綁定的交換機(jī)):綁定的隊(duì)列為mall.order.cancel.ttl,一旦有消息以mall.order.cancel.ttl為路由鍵發(fā)送過(guò)來(lái),會(huì)轉(zhuǎn)發(fā)到此隊(duì)列,并在此隊(duì)列保存一定時(shí)間,等到超時(shí)后會(huì)自動(dòng)將消息發(fā)送到mall.order.cancel(取消訂單消息消費(fèi)隊(duì)列)。
添加延遲消息的發(fā)送者CancelOrderSender
用于向訂單延遲消息隊(duì)列(mall.order.cancel.ttl)里發(fā)送消息。package ltd.hanzo.mall.component;import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.common.QueueEnum; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** @Author 皓宇QAQ* @Date 2020/6/6 17:24* @Description:取消訂單消息的發(fā)出者*/ @Component @Slf4j public class CancelOrderSender {@Autowiredprivate AmqpTemplate amqpTemplate;public void sendMessage(String orderNo,final long delayTimes){//給延遲隊(duì)列發(fā)送消息amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderNo, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {//給消息設(shè)置延遲毫秒值message.getMessageProperties().setExpiration(String.valueOf(delayTimes));return message;}});log.info("send delay message orderNo:{}",orderNo);} }添加取消訂單消息的接收者CancelOrderReceiver
用于從取消訂單的消息隊(duì)列(mall.order.cancel)里接收消息。package ltd.hanzo.mall.component;import lombok.extern.slf4j.Slf4j; import ltd.hanzo.mall.service.HanZoMallOrderService; import ltd.hanzo.mall.service.TaskService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;/*** @Author 皓宇QAQ* @Date 2020/6/6 17:25* @Description:取消訂單消息的處理者*/ @Component @RabbitListener(queues = "mall.order.cancel") @Slf4j public class CancelOrderReceiver {@Autowiredprivate HanZoMallOrderService hanZoMallOrderService;@Autowiredprivate TaskService taskService;@RabbitHandlerpublic void handle(String orderNo){log.info("receive delay message orderNo:{}",orderNo);hanZoMallOrderService.cancelOrder(orderNo);taskService.cancelOrderSendSimpleMail(orderNo);} }添加HanZoMallOrderService接口
public interface HanZoMallOrderService {/*** 保存訂單** @param user* @param myShoppingCartItems* @return*/String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems);/*** 取消單個(gè)超時(shí)訂單*/@Transactionalvoid cancelOrder(String orderNo);}添加HanZoMallOrderService實(shí)現(xiàn)類HanZoMallOrderServiceImpl
@Slf4j @Service public class HanZoMallOrderServiceImpl implements HanZoMallOrderService {@Resourceprivate HanZoMallOrderMapper hanZoMallOrderMapper;@Resourceprivate HanZoMallOrderItemMapper hanZoMallOrderItemMapper;@Resourceprivate HanZoMallShoppingCartItemMapper hanZoMallShoppingCartItemMapper;@Resourceprivate HanZoMallGoodsMapper hanZoMallGoodsMapper;@Autowiredprivate CancelOrderSender cancelOrderSender;@Override@Transactionalpublic String saveOrder(HanZoMallUserVO user, List<HanZoMallShoppingCartItemVO> myShoppingCartItems) {//todo 執(zhí)行一系類下單操作,代碼在github中//下單完成后開啟一個(gè)延遲消息,用于當(dāng)用戶沒有付款時(shí)取消訂單 sendDelayMessageCancelOrder(orderNo);//所有操作成功后,將訂單號(hào)返回,以供Controller方法跳轉(zhuǎn)到訂單詳情return orderNo; }@Overridepublic void cancelOrder(String orderNo) {HanZoMallOrder hanZoMallOrder = hanZoMallOrderMapper.selectByOrderNo(orderNo);if (hanZoMallOrder != null && hanZoMallOrder.getOrderStatus() == 0) {//超時(shí)取消訂單hanZoMallOrderMapper.closeOrder(Collections.singletonList(hanZoMallOrder.getOrderId()), HanZoMallOrderStatusEnum.ORDER_CLOSED_BY_EXPIRED.getOrderStatus());}}private void sendDelayMessageCancelOrder(String orderNo) {//獲取訂單超時(shí)時(shí)間,假設(shè)為60分鐘long delayTimes = 36 * 100000;//發(fā)送延遲消息cancelOrderSender.sendMessage(orderNo, delayTimes);}添加OrderController定義接口
代碼在Github中>https://github.com/Tianhaoy/hanzomall總結(jié)
到此為止,整合RabbitMQ實(shí)現(xiàn)延遲消息的相關(guān)流程就介紹完畢了,知識(shí)只有分享出來(lái)才有價(jià)值。如果有問(wèn)題的話,可以在關(guān)于我的頁(yè)面,通過(guò)我的郵箱聯(lián)系我進(jìn)行探討。
總結(jié)
以上是生活随笔為你收集整理的rabbitmq 取消消息_SpringBoot整合RabbitMQ实现延迟消息的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
 
                            
                        - 上一篇: 冰箱制冰要多久(冰箱推荐排名)
- 下一篇: jquery去掉数组最后一个元素_从数组
