风袖使用RocketMQ实现订单状态转变
生活随笔
收集整理的這篇文章主要介紹了
风袖使用RocketMQ实现订单状态转变
小編覺得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
搭建MQ
使用docker搭建RocketMQ
我這里是使用docker里面的RocketMQ鏡像和本地idea交互的
編寫代碼
導(dǎo)入依賴pom.xml
<!-- rocketMQ --><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.1.0</version></dependency>在application.yml文件添加
apache:rocketmq:consumer:pushConsumer: PushConsumer # 消費(fèi)者的組名producer:producerGroup: ProducerGroup # 生產(chǎn)者的組名namesrvAddr: localhost:9876 # NameServer地址在原來訂單service里面添加
/*** 生產(chǎn)者的組名*/@Value("${apache.rocketmq.producer.producerGroup}")private String producerGroup;/*** NameServer 地址*/@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;/*** rocketmq 異步發(fā)送訂單** @param oid 訂單id* @param uid user_id* @param couponId 優(yōu)惠券id*/public void asyncProducer(Long oid, Long uid, Long couponId) {//生產(chǎn)者的組名DefaultMQProducer producer = new DefaultMQProducer(producerGroup);//指定NameServer地址,多個(gè)地址以 ; 隔開producer.setNamesrvAddr(namesrvAddr);try {// 生產(chǎn)者開始producer.start();// 生產(chǎn)者發(fā)送失敗重試次數(shù)producer.setRetryTimesWhenSendAsyncFailed(2);// 關(guān)鍵信息String orderMsg = uid.toString() + "," + oid.toString() + "," + couponId.toString();// 發(fā)送的信息Message msg = new Message("order","push",orderMsg.getBytes(RemotingHelper.DEFAULT_CHARSET));/*** 目前RocketMQ免費(fèi)版只支持固定精度級別的定時(shí)消息,服務(wù)器按照1-N定義了如下級別:* “1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h”;* 若要發(fā)送定時(shí)消息,在應(yīng)用層初始化Message消息對象之后,* 調(diào)用setDelayTimeLevel(int level)方法來設(shè)置延遲級別,按照序列取相應(yīng)的延遲級別,例如level=2,則延遲為5s:*///msg.setDelayTimeLevel(16);msg.setDelayTimeLevel(4); // 這里我為了方便查看將世界設(shè)置成為30S//重點(diǎn)在這里 異步發(fā)送回調(diào)producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("發(fā)送異步響應(yīng):MsgId:" + sendResult.getMsgId() + ",發(fā)送狀態(tài):" + sendResult.getSendStatus());// 取消生產(chǎn)者producer.shutdown();}@Overridepublic void onException(Throwable e) {// 自定義錯(cuò)誤e.printStackTrace();// 取消生產(chǎn)者producer.shutdown();}});} catch (Exception e) {e.printStackTrace();}}// 這里是將原來發(fā)送redis修改成為rocketmq// 這里搜索替換吧//加入到延遲消息隊(duì)列 // this.sendToRedis(order.getId(), uid, couponId);this.asyncProducer(order.getId(), uid, couponId);設(shè)置消費(fèi)者(控制訂單狀態(tài))
@Component public class RocketMQServer {/*** 消費(fèi)者的組名*/@Value("${apache.rocketmq.consumer.pushConsumer}")private String consumerGroup;/*** NameServer 地址*/@Value("${apache.rocketmq.namesrvAddr}")private String namesrvAddr;@Resourceprivate OrderCancelService orderCancelService;@Resourceprivate CouponBackService couponBackService;@PostConstruct // 注入了就開始執(zhí)行、監(jiān)聽生產(chǎn)者public void defaultMQPushConsumer() {//消費(fèi)者的組名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);//指定NameServer地址,多個(gè)地址以 ; 隔開consumer.setNamesrvAddr(namesrvAddr);try {//訂閱order下Tag為push的消息consumer.subscribe("order", "push");//設(shè)置Consumer第一次啟動(dòng)是從隊(duì)列頭部開始消費(fèi)還是隊(duì)列尾部開始消費(fèi)//如果非第一次啟動(dòng),那么按照上次消費(fèi)的位置繼續(xù)消費(fèi)consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.registerMessageListener((MessageListenerConcurrently) (list, context) -> {try {for (MessageExt messageExt : list) {System.out.println("messageExt: " + messageExt);//輸出消息內(nèi)容String messageBody = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);System.out.println("消費(fèi)響應(yīng):msgId : " + messageExt.getMsgId() + ", msgBody : " + messageBody);//輸出消息內(nèi)容OrderMessageBO messageBO = new OrderMessageBO(messageBody);// 對訂單進(jìn)行操作this.orderCancelService.cancel(messageBO);this.couponBackService.returnBack(messageBO);}} catch (Exception e) {e.printStackTrace();// 接收失敗重試if (list.get(0).getReconsumeTimes() == 3){// 重試3次return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功}else {return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功});consumer.start();} catch (Exception e) {e.printStackTrace();}}}結(jié)果
剛剛下單 數(shù)據(jù)庫狀態(tài)是1
剛剛下單 發(fā)送rocketmq
到了時(shí)間接收rocketmq消費(fèi)的信息狀態(tài)變成5
接收rocketmq消費(fèi)的信息
總結(jié)
以上是生活随笔為你收集整理的风袖使用RocketMQ实现订单状态转变的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: iPhone升级iOS 16后Siri无
- 下一篇: 真菌元胞自动机Python实现