javascript
Spring Cloud【Finchley】实战-07异步下单
文章目錄
- 概述
- Product微服務改造
- 接入配置中心
- Step1 引入依賴
- Step2 遠端Git存儲中心,新增artisan product模塊的配置文件
- 接入消息隊列
- Step1 引入依賴
- Step2 配置RabbitMQ的信息
- 商品扣減完成后通知訂單模塊創建訂單
- Step1 分析
- Step2 扣減庫存方法中增加發送消息隊列的代碼
- Step3 驗證發送消息隊列的數據
- 訂單模塊接收消息隊列中的消息
- Step1 開發消息接收類
- Step2 驗證
- 糾正錯誤
- JackSon 的操作
- 代碼
概述
學習了RabbitMQ、Spring Boot整合RabbitMQ以及使用Spring Cloud Stream操作RabbitMQ之后,我們著手改造下吧
Product微服務改造
接入配置中心
既然引入了配置中心,那么我們把artisan product微服務也接入到配置中心吧 。
Step1 引入依賴
作為客戶端引入spring-cloud-config-client即可
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-config-client</artifactId></dependency>Step2 遠端Git存儲中心,新增artisan product模塊的配置文件
將原來配置在application.yml中的配置信息拆分到bootstrap.yml及遠端的git文件中
規劃一下:
- application.yml —端口信息
- 遠端git:artisan-product-dev.yml --數據庫等
- bootstrap.yml: Config Server,Eureka等信息
application.yml
server:port: 8080遠端git:artisan-product-dev.yml
spring: # datasourcedatasource:driver-class-name: com.mysql.jdbc.Driverurl: jdbc:mysql://localhost:3306/o2o?serverTimezone=UTC&useUnicode=true&characterEncoding=utf8&useSSL=falseusername: rootpassword: root# jpa 輸出sqljpa:show-sql: truebootstrap.yml
spring:application:name: artisan-ordercloud:config: # profile: dev# 可配置多個,不推薦使用,因為需要設置具體的ip.服務端修改或者新增IP后,要同步修改# uri: http://localhost:9898/,http://localhost:9999/discovery:# 指定Config Server在服務發現中的service Id ,默認為configserverservice-id: ARTISAN-CONFIG# 表示使用服務發現組件中的Config Server,而不自己指定Config Server的uri,默認為falseenabled: trueprofile: home# 修復github webhook 只能刷新config server 無法刷新config client的問題bus:#Workaround for defect in https://github.com/spring-cloud/spring-cloud-bus/issues/124id: ${vcap.application.name:${spring.application.name:application}}:${vcap.application.instance_index:${spring.cloud.config.profile:${local.server.port:${server.port:0}}}}:${vcap.application.instance_id:${random.value}} #Eureka eureka:client:service-url:defaultZone: http://localhost:8762/eureka/通過config server訪問下遠端的Git配置文件
http://localhost:9898/artisan-product-dev.yml
OK,訪問正常,說明配置讀取沒問題。
啟動artisan product微服務,查看下Eureka 上的注冊情況
成功注冊上來了,8080端口
接入消息隊列
Step1 引入依賴
這里我們選擇使用 spring-boot-starter-amqp
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>Step2 配置RabbitMQ的信息
我們放到遠端的Git上
spring:rabbitmq:host: 192.168.31.34password: guestport: 5672username: guest通過config server訪問下 http://localhost:9898/artisan-product-dev.yml
OK ,說明配置沒有問題 .
商品扣減完成后通知訂單模塊創建訂單
Step1 分析
流程如下:
商品----------->消息隊列<---------------訂單
商品在扣減完成之后,通知消息隊列,訂單模塊訂閱消息隊列處理請求。
那我們先看下商品扣減的方法原來的邏輯吧,不求一步到位,步步分析,逐步完善
Controller入口
/*** 扣減庫存* 提供給Order微服務用** @param decreaseStockInputList* @return*/@PostMapping("/decreseProduct")private void decreseProduct(@RequestBody List<DecreaseStockInput> decreaseStockInputList) {productService.decreaseProduct(decreaseStockInputList);}調用ProductServiceImpl#decreaseProduct方法,按照設計,商品扣減完成以后就要發送消息到消息隊列 。
為解決Spring Cloud實戰-04將訂單微服務與商品微服務分別拆分為多模塊 中的問題一,我們把數據模型對應的實體類又封裝了一層,因此發送給消息隊列的對象,建議也是封裝后的對象,所以使用ProductOutput 。
Step2 扣減庫存方法中增加發送消息隊列的代碼
增加如下代碼:
// 發送消息隊列ProductOutput productOutput = new ProductOutput();BeanUtils.copyProperties(product,productOutput);amqpTemplate.convertAndSend("productOutput",JsonUtil.toJson(productOutput));log.info("發送消息到MQ,內容為:{}",JsonUtil.toJson(productOutput));為了方便觀察,將productOutput轉成了Json格式,建議這樣做,萬一有消息擠壓的話,方便在RabbitMQ的管理頁面查看擠壓的消息。 這里我們使用了Jackson。 你選擇其他的庫也可以,比如我們前面用到的Gson 。
Step3 驗證發送消息隊列的數據
因為還沒有寫接收方,我們先在RabbitMQ中手工創建一個名為productOutput的消息隊列,先看下是否正確。
啟動product 微服務, 在postman測試如下
[{"productId": "1","productQuantity": 2} ]先看下數據庫中的數據 productId=1的商品,目前還剩下81件,我們這次扣除兩件,應該剩余79件,待會驗證下 。
發送請求,觀察RabbitMQ 和 數據
DB 數據OK
RabbitMQ 接收正常,只是沒有被消費
查看擠壓的消息:
到目前為止,起碼消息發送到了消息隊列,數據扣減正常。 下一步就是該在訂單服務中去寫消息接收方的業務邏輯了。
訂單模塊接收消息隊列中的消息
Step1 開發消息接收類
package com.artisan.order.message;import com.artisan.order.utils.JsonUtil; import com.artisan.product.common.ProductOutput; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @Slf4j public class ProductOutputReceive {// 自動創建productOutput隊列@RabbitListener(queuesToDeclare = @Queue("productOutput"))public void process(String message) {// message --> ProductOutputProductOutput productOutput = JsonUtil.JsonToBean(message, ProductOutput.class);log.info("接收到的消息為:{}",productOutput);}}Step2 驗證
我們在上面的消息接收方中使用了@RabbitListener(queuesToDeclare = @Queue("productOutput"))會自動創建消息隊列,因此我們測試之前,把之前手工新建的productOutput刪掉吧,驗證下消息隊列通過代碼自動創建
啟動后,在postman中發送消息,觀察RabbitMQ和日志
RabbitMQ:
日志
糾正錯誤
上述雖然實現了發送消息到消息隊列,并接收消息處理業務。但業務上有個比較大的邏輯錯誤,那就是前臺傳遞的是個List啊,Product中扣減庫存方法中 有異常的判斷,如果前臺買了3個商品, 但庫存只有2個,數據如何保持一致?
建議: 將商品扣減的DB操作的部分作為一個整體,如果都成功了,將List發送到消息隊列。 同樣的Order微服務也要做下相應的調整
Product
@Overridepublic void decreaseProduct(List<DecreaseStockInput> decreaseStockInputList) {List<Product> productList = operateProducts(decreaseStockInputList);List<ProductOutput> productOutputList = productList.stream().map(e -> {ProductOutput productOutput = new ProductOutput();BeanUtils.copyProperties(e, productOutput);return productOutput;}).collect(Collectors.toList());// 發送消息隊列amqpTemplate.convertAndSend("productOutput", JsonUtil.toJson(productOutputList));log.info("發送消息到MQ,內容為:{}", JsonUtil.toJson(productOutputList));}// 因為是對List操作,所以加個事務控制@Transactionalpublic List<Product> operateProducts(List<DecreaseStockInput> decreaseStockInputList) {List<Product> productList = new ArrayList<>();// 遍歷DecreaseStockInputfor (DecreaseStockInput decreaseStockInput : decreaseStockInputList) {// 根據productId查詢ProductOptional<Product> productOptional = productRepository.findById(decreaseStockInput.getProductId());// 商品是否存在if (!productOptional.isPresent()) {throw new ProductException(ResultEnum.PRODUCT_NOT_EXIST);}// 是否庫存充足Product product = productOptional.get();int leftStock = product.getProductStock() - decreaseStockInput.getProductQuantity();if (leftStock < 0) {throw new ProductException(ResultEnum.PRODUCT_STOCK_ERROR);}// 將剩余庫存設置到product,并更新數據庫product.setProductStock(leftStock);productRepository.save(product);productList.add(product);}return productList;}Order
package com.artisan.order.message;import com.artisan.order.utils.JsonUtil; import com.artisan.product.common.ProductOutput; import com.fasterxml.jackson.core.type.TypeReference; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.util.List;@Component @Slf4j public class ProductOutputReceive {// 自動創建productOutput隊列@RabbitListener(queuesToDeclare = @Queue("productOutput"))public void process(String message) {// message --> ProductOutput // ProductOutput productOutput = JsonUtil.JsonToBean(message, ProductOutput.class); // log.info("接收到的消息為:{}",productOutput);// message -> List<ProductOutput>List<ProductOutput> productOutputList = JsonUtil.JsonToList(message,new TypeReference<List<ProductOutput>>(){});log.info("接收到的消息為:{}",productOutputList);}}測試一把
[{"productId": "1","productQuantity": 2},{"productId": "2","productQuantity": 5} ]product的日志
2019-04-14 01:28:24.058 INFO 22272 --- [nio-8080-exec-1] c.a.p.service.impl.ProductServiceImpl : 發送消息到MQ,內容為:[{"productId":"1","productName":"拿鐵咖啡","productPrice":20.99,"productStock":71,"productDescription":"咖啡,提神醒腦","productIcon":null,"productStatus":0,"categoryType":99},{"productId":"2","productName":"青島純生","productPrice":7.50,"productStock":180,"productDescription":"啤酒","productIcon":null,"productStatus":0,"categoryType":98}]order的日志:
2019-04-14 01:28:24.086 INFO 18036 --- [cTaskExecutor-1] c.a.order.message.ProductOutputReceive : 接收到的消息為:[ProductOutput(productId=1, productName=拿鐵咖啡, productPrice=20.99, productStock=71, productDescription=咖啡,提神醒腦, productIcon=null, productStatus=0, categoryType=99), ProductOutput(productId=2, productName=青島純生, productPrice=7.50, productStock=180, productDescription=啤酒, productIcon=null, productStatus=0, categoryType=98)]消息接收方接收到消息后,比如可以把 List<ProductOutput>信息放到redis里,查詢商品服務的話,就可以從redis中查詢了。
JackSon 的操作
這里我們選擇了JackSon 來操作JavaBean和Json之間的互轉,當然了你也可以選擇其他的API,比如我們上次用的Gson。。
https://blog.csdn.net/qq_37936542/article/details/79268402
package com.artisan.order.utils;import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.ObjectMapper;import java.io.IOException;public class JsonUtil {private static ObjectMapper objectMapper = new ObjectMapper();/*** 對象轉換為json字符串** @param object* @return*/public static String toJson(Object object) {try {return objectMapper.writeValueAsString(object);} catch (JsonProcessingException e) {e.printStackTrace();}return null;}/*** json 轉 javabean* @param jsonStr* @param objClass* @param <T>* @return*/public static <T> T JsonToBean(String jsonStr, Class<T> objClass) {try {return objectMapper.readValue(jsonStr, objClass);} catch (IOException e) {e.printStackTrace();}return null;}/*** json 轉 List* @param jsonStr* @param typeReference* @param <T>* @return*/public static <T> T JsonToList(String jsonStr, TypeReference typeReference) {try {return objectMapper.readValue(jsonStr, typeReference);} catch (IOException e) {e.printStackTrace();}return null;} }代碼
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan_order
https://github.com/yangshangwei/springcloud-o2o/tree/master/artisan-product
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Spring Cloud【Finchley】实战-07异步下单的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cloud【Finchle
- 下一篇: Elasticsearch-01Cent