當前位置:
首頁 >
前端技术
> javascript
>内容正文
javascript
SpringBoot2.x Nacos RocketMQ 事务消息
生活随笔
收集整理的這篇文章主要介紹了
SpringBoot2.x Nacos RocketMQ 事务消息
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
需求背景:
現在有內容中心(content-center)和 用戶中心(user-center)2個微服務,請求內容中心,發送消息給用戶中心,完成為指定用戶添加積分操作。
文章目錄
- 一、準備工作
- 1. 版本對照
- 2. 下載啟動RocketMQ
- 3. 引入maven依賴
- 二、內容中心(服務端)
- 2.1. 表結構設計
- 2.2. 配置MQ信息
- 2.3. 控制層
- 2.4. service層
- 2.5. RocketMQ 事務消息監聽
- 三、用戶中心(客戶端)
- 3.1. 依賴
- 3.2.配置
- 3.3. 消息監聽
- 開源項目:
一、準備工作
1. 版本對照
| RocketMQ 4.8.0 | 支持RocketMQ 4.8.0 | 2.2.0 |
2. 下載啟動RocketMQ
linux 環境 RocketMQ 4.8.0 安裝、部署控制臺
https://blog.csdn.net/weixin_40816738/article/details/116269833
windows下RocketMQ下載、安裝、部署、控制臺
https://blog.csdn.net/weixin_40816738/article/details/115734482
3. 引入maven依賴
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>二、內容中心(服務端)
消息發送端代碼編寫
2.1. 表結構設計
share分享表和rocketmq_transaction_logRocketMQ事務日志表2張表,
share
rocketmq_transaction_logRocketMQ
-- ----------------------------------------------------- -- Table `rocketmq_transaction_log` -- ----------------------------------------------------- create table rocketmq_transaction_log (id int auto_increment comment 'id'primary key,transaction_Id varchar(45) not null comment '事務id',log varchar(45) not null comment '日志' )comment 'RocketMQ事務日志表';具體詳情:見項目源碼
2.2. 配置MQ信息
- 項目內部yml配置
- nacos服務端配置
2.3. 控制層
package com.gblfy.lyrocketmq.controller;import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.lyrocketmq.entity.Share; import com.gblfy.lyrocketmq.service.ShareService; import lombok.RequiredArgsConstructor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*;@RestController @RequestMapping("/admin/shares") @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareAdminController {private final ShareService shareService;@PutMapping("/audit/{id}")public Share auditById(@PathVariable Integer id, @RequestBody ShareAuditDTO auditDTO) {//TODO 認證授權return this.shareService.auditById(id, auditDTO);} }2.4. service層
package com.gblfy.lyrocketmq.service;import com.gblfy.api.RemoteProductService; import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.common.dto.ShareDTO; import com.gblfy.common.dto.UserAddBonusMsgDTO; import com.gblfy.common.dto.UserDTO; import com.gblfy.common.enums.AuditStatusEnum; import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog; import com.gblfy.lyrocketmq.entity.Share; import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper; import com.gblfy.lyrocketmq.mapper.ShareMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional;import java.util.Objects; import java.util.UUID;@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class ShareService {private final ShareMapper shareMapper;private final RemoteProductService userCenterFeignClient;private final RocketMQTemplate rocketMQTemplate;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;public ShareDTO findById(Integer id) {Share share = this.shareMapper.selectByPrimaryKey(id);Integer userId = share.getUserId();UserDTO userDTO = this.userCenterFeignClient.findById(userId);ShareDTO shareDTO = new ShareDTO();BeanUtils.copyProperties(share, shareDTO);//設置發布人shareDTO.setWxNickname(userDTO.getWxNickname());return shareDTO;}public Share auditById(Integer id, ShareAuditDTO auditDTO) {// 1. 查詢share是否存在,不存在或者當前的audit_status != NOT_YET,那么拋異常Share share = this.shareMapper.selectByPrimaryKey(id);if (share == null) {throw new IllegalArgumentException("參數非法!該分享不存在!");}if (!Objects.equals("NOT_YET", share.getAuditStatus())) {throw new IllegalArgumentException("參數非法!該分享已審核通過或審核不通過!");}//----------------------------------------發送半消息----------------------------------------// 3. 如果是PASS,那么發送消息給rocketmq,讓用戶中心去消費,并為發布人添加積分if (AuditStatusEnum.PASS.equals(auditDTO.getAuditStatusEnum())) {//消息idString transactionId = UUID.randomUUID().toString();this.rocketMQTemplate.sendMessageInTransaction("tx-add-bonus-group",MessageBuilder.withPayload(UserAddBonusMsgDTO.builder().userId(share.getUserId()).bonus(50).build()// Header有妙用).setHeader(RocketMQHeaders.TRANSACTION_ID, transactionId).setHeader("share_id", id).build(),//arg有大用處auditDTO);} else {this.auditByIdInDB(id, auditDTO);}return share;}/*** 審批** @param id* @param auditDTO*/public void auditByIdInDB(Integer id, ShareAuditDTO auditDTO) {Share share = Share.builder().id(id).auditStatus(auditDTO.getAuditStatusEnum().toString()).reason(auditDTO.getReason()).build();this.shareMapper.updateByPrimaryKeySelective(share);}@Transactional(rollbackFor = Exception.class)public void auditByIdWithRoketMqlog(Integer id, ShareAuditDTO auditDTO, String transactionId) {this.auditByIdInDB(id, auditDTO);this.rocketmqTransactionLogMapper.insertSelective(RocketmqTransactionLog.builder().transactionId(transactionId).log("審核分享..").build());} }2.5. RocketMQ 事務消息監聽
package com.gblfy.lyrocketmq.listener;import com.gblfy.common.dto.ShareAuditDTO; import com.gblfy.lyrocketmq.entity.RocketmqTransactionLog; import com.gblfy.lyrocketmq.mapper.RocketmqTransactionLogMapper; import com.gblfy.lyrocketmq.service.ShareService; import lombok.RequiredArgsConstructor; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener; import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState; import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.MessageHeaders;@RocketMQTransactionListener @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class AddBonusTransactionListener implements RocketMQLocalTransactionListener {private final ShareService shareService;private final RocketmqTransactionLogMapper rocketmqTransactionLogMapper;/*** 執行本地事務** @param msg 消息header信息* @param arg 消息體* @return*/@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);Integer share_id = Integer.valueOf((String) headers.get("share_id"));try {this.shareService.auditByIdWithRoketMqlog(share_id, (ShareAuditDTO) arg, transactionId);return RocketMQLocalTransactionState.COMMIT;} catch (Exception e) {e.printStackTrace();return RocketMQLocalTransactionState.ROLLBACK;}}/*** 本地事務的檢查,檢查本地事務是否成功** @param msg* @return*/@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(Message msg) {MessageHeaders headers = msg.getHeaders();String transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);RocketmqTransactionLog rocketmqTransactionLog = this.rocketmqTransactionLogMapper.selectOne(RocketmqTransactionLog.builder().transactionId(transactionId).build());if (rocketmqTransactionLog != null) {return RocketMQLocalTransactionState.COMMIT;}return RocketMQLocalTransactionState.ROLLBACK;} }詳細見源碼:本文底部
三、用戶中心(客戶端)
消息消費端代碼編寫
3.1. 依賴
<!--集成rocketmq--><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version></dependency>3.2.配置
- 項目內部yml配置
- nacos服務端配置
3.3. 消息監聽
package com.gblfy.product.listenner;import com.gblfy.common.dto.UserAddBonusMsgDTO; import com.gblfy.product.entity.BonusEventLog; import com.gblfy.product.entity.User; import com.gblfy.product.mapper.BonusEventLogMapper; import com.gblfy.product.mapper.UserMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service;import java.util.Date;@Slf4j @Service @RequiredArgsConstructor(onConstructor = @__(@Autowired)) @RocketMQMessageListener(topic = "tx-add-bonus-group", consumerGroup = "consumer-group") public class AddBonusListener implements RocketMQListener<UserAddBonusMsgDTO> {private final UserMapper userMapper;private final BonusEventLogMapper bonusEventLogMapper;@Overridepublic void onMessage(UserAddBonusMsgDTO message) {// 1. 為用戶添加積分Integer userId = message.getUserId();Integer bonus = message.getBonus();User user = this.userMapper.selectByPrimaryKey(userId);user.setBonus(user.getBonus() + bonus);this.userMapper.updateByPrimaryKeySelective(user);// 2.記錄日志到bonus_event_log表中this.bonusEventLogMapper.insert(BonusEventLog.builder().userId(userId).value(bonus).event("CONTRIBUTE").createTime(new Date()).description("投稿加積分...").build());log.info("積分添加完畢...");} }開源項目:
https://gitee.com/gb_90/micro-service-parent
總結
以上是生活随笔為你收集整理的SpringBoot2.x Nacos RocketMQ 事务消息的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IntelliJ IDEA 2020.x
- 下一篇: TortoiseGit 基础5部曲