queue,是很好的削峰填谷工具,在業內也是主流;發布訂閱,可以有效的解耦兩個應用,所以dapr把他們進行了有效的封裝,我們使用起來更簡單高效。
本篇的案例是下完訂單后,會把消息發布到redis(當然也可以是其他)中,通知系統和支付系統會訂單這個消息,同時,通知系統和支付系統的兩個實例中,只會有一個實例接收到這個消息,進行處理,調用示意圖如下:
項目結構如下:
一、配置
用docker-compose部署,docker-compose.yml內容
version:?'3.4'services:#┌────────────────────────────────┐#│ ordersystem app + Dapr sidecar │#└────────────────────────────────┘ordersystem:image: ${DOCKER_REGISTRY-}ordersystemdepends_on:- redis- placementbuild:context: ../dockerfile: OrderSystem/Dockerfileports:- "3500:3500"volumes: - ../OrderSystem:/OrderSystem networks:- b2c-daprordersystem-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "order", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components"]depends_on:- ordersystemnetwork_mode: "service:ordersystem"volumes: - ../components:/components #┌───────────────────────────────────┐#│ paymentsystem1 app + Dapr sidecar │#└───────────────────────────────────┘ paymentsystem1:image: ${DOCKER_REGISTRY-}paymentsystembuild:context: ../dockerfile: PaymentSystem/Dockerfileports:- "3601:3500"volumes: - ../PaymentSystem:/PaymentSystem networks:- b2c-dapr paymentsystem1-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "pay", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- paymentsystem1network_mode: "service:paymentsystem1"volumes: - ../components:/components #┌───────────────────────────────────┐#│ paymentsystem2 app + Dapr sidecar │#└───────────────────────────────────┘ paymentsystem2:image: ${DOCKER_REGISTRY-}paymentsystembuild:context: ../dockerfile: PaymentSystem/Dockerfilevolumes: - ../PaymentSystem:/PaymentSystem ports:- "3602:3500"networks:- b2c-dapr paymentsystem2-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "pay", "-app-port", "80" ,"-placement-host-address", "placement:50006","-components-path","/components"]depends_on:- paymentsystem2network_mode: "service:paymentsystem2"volumes: - ../components:/components #┌───────────────────────────────────┐#│ noticesystem1 app + Dapr sidecar │#└───────────────────────────────────┘ noticesystem1:image: ${DOCKER_REGISTRY-}noticesystembuild:context: ../dockerfile: NoticeSystem/Dockerfileports:- "3701:3500"volumes: - ../NoticeSystem:/NoticeSystem networks:- b2c-dapr noticesystem1-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- noticesystem1network_mode: "service:noticesystem1"volumes: - ../components:/components #┌───────────────────────────────────┐#│ noticesystem2 app + Dapr sidecar │#└───────────────────────────────────┘ noticesystem2:image: ${DOCKER_REGISTRY-}noticesystembuild:context: ../dockerfile: NoticeSystem/Dockerfileports:- "3702:3500"volumes: - ../NoticeSystem:/NoticeSystem networks:- b2c-dapr noticesystem2-dapr:image: "daprio/daprd:latest"command: [ "./daprd", "-app-id", "notice", "-app-port", "80","-placement-host-address", "placement:50006","-components-path","/components" ]depends_on:- noticesystem2network_mode: "service:noticesystem2"volumes: - ../components:/components #┌────────────────────────┐#│ Dapr placement service │#└────────────────────────┘ placement:image: "daprio/dapr"command: ["./placement", "-port", "50006"]ports:- "50006:50006"networks:- b2c-dapr#┌───────────────────┐#│ Redis state store │#└───────────────────┘ redis:image: "redis:latest"ports:- "6380:6379"networks:- b2c-dapr
networks:b2c-dapr:
pubsub.yaml(在components文件夾下?)內容是默認,如下
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:name: pubsub
spec:type: pubsub.redisversion: v1metadata:- name: redisHostvalue: redis:6379- name: redisPasswordvalue: ""
訂閱配置文件如下subscription.yaml(在components文件夾下?)
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:name: myevent-subscription
spec:topic: orderCompleteroute: /ordercompletepubsubname: pubsub
scopes:
- pay
- notice
二、代碼
OrderSystem項目的appsettings.json
"PublishUrl": "http://localhost:3500/v1.0/publish/pubsub/orderComplete"
OrderSystem項目的發布方法
[HttpGet("/orderpub/{orderno}")]public async Task<IActionResult> OrderPub(string orderno){try{_logger.LogInformation($"Order,publish");await Task.Delay(400);var client = _clientFactory.CreateClient();var stringContent = new StringContent(Newtonsoft.Json.JsonConvert.SerializeObject(new { OrderNo = orderno, Amount = 30000, OrderTime = DateTime.UtcNow}), System.Text.Encoding.UTF8, "application/json");_logger.LogInformation(stringContent.ToString());var content = await client.PostAsync(_publishUrl, stringContent);return new JsonResult(new { order_result = "Order success,and publish", pay_result = content });}catch (Exception exc){_logger.LogCritical(exc, exc.Message);return new JsonResult(new { order_result = "Order success,and publish,pay exception", message = exc.Message });}}
PaymentSystem和NoticeSystem項目中的訂閱實現
兩個實體類
public class PubBody
{public string id { get; set; }public string source { get; set; }public string pubsubname { get; set; }public string traceid { get; set; }public PubOrder data { get; set; }public string specversion { get; set; }public string datacontenttype { get; set; }public string type { get; set; }public string topic { get; set; }
}public class PubOrder
{public string OrderNo { get; set; }public decimal Amount { get; set; }public DateTime OrderTime { get; set; }
}
NoticeSystem和PaymentSystem兩個項目中的訂閱方法如下
[HttpPost("/ordercomplete")]public async Task<IActionResult> OrderComplete(){try{_logger.LogInformation("PaymentSystem OrderComplete runing……");using var reader = new StreamReader(Request.Body, System.Text.Encoding.UTF8);var content = await reader.ReadToEndAsync();var pubBody = Newtonsoft.Json.JsonConvert.DeserializeObject<PubBody>(content);_logger.LogInformation($"--------- HostName:{Dns.GetHostName()},OrderNo:{pubBody?.data.OrderNo},OrderAmount:{pubBody?.data.Amount},OrderTime:{pubBody?.data.OrderTime} -----------");await Task.Delay(200);_logger.LogInformation($"subscription pay complete");_logger.LogInformation($"return SUCCESS");return new JsonResult(new{Status = "SUCCESS"});}catch (Exception exc){_logger.LogCritical(exc, exc.Message);_logger.LogInformation($"return RETRY");return new JsonResult(new{Status = "RETRY"});}}
三、發布測試
進入在B2C目發,用命令行啟動docker compose
docker-compose?up?-d
可以測試了,調用OrderSystem的對外地址,下訂單NO0001,和NO0002
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001和
localhost:3500/v1.0/invoke/order/method/orderpub/NO0001
查看容器noticesystem1
查看容器noticesystem2
查看容器paymentsystem1
查看容器paymentsystem2
NoticeSystem和PaymentSystem同時訂閱OrderSystem項目的發布orderComplete,兩個實例會輪詢處理訂閱結果。Dapr就這樣,把復雜的發布訂閱,封裝成一個api一樣的簡單調用和接收,項目中沒有一點的痕跡。
總結
以上是生活随笔為你收集整理的Dapr牵手.NET学习笔记:发布-订阅的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。