javascript
SpringCloud 微服务 (十) 消息队列MQ 基础
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
壹
之前學(xué)習(xí)了SpringCloud Bus結(jié)合MQ,沒有多學(xué)習(xí)MQ,本次學(xué)習(xí)相關(guān)內(nèi)容,先了解異步,同步就不說了
?
異步: 客戶端非阻塞進(jìn)程,服務(wù)端響應(yīng)可以是非即時(shí)的
應(yīng)用場景:?
①通知類的服務(wù)->發(fā)出去即可,無需回應(yīng);?
②請求的異步響應(yīng)->就是客戶端發(fā)送請求,服務(wù)端異步響應(yīng)請求,客戶端不會(huì)產(chǎn)生阻塞且是默認(rèn)響應(yīng),但不會(huì)立刻送達(dá);
①②都屬于1對1交互模式;
③消息->可以實(shí)現(xiàn)1對多的交互模式,比如發(fā)布訂閱模式下,客戶端發(fā)布消息通知,可以被0到N個(gè)服務(wù)消費(fèi),再例如客戶端發(fā)送消息等待其他服務(wù)的響應(yīng)
?
MQ在分布式系統(tǒng)中都會(huì)用到的組件,還是很重要的
應(yīng)用場景:?
①異步處理->比如用戶注冊的時(shí)候,需要手機(jī)(郵箱)驗(yàn)證,有的平臺(tái)用戶注冊還可以加用戶積分(經(jīng)驗(yàn))獎(jiǎng)勵(lì),還有什么身份證認(rèn)證等等很多,這時(shí)候在用戶信息寫入數(shù)據(jù)庫后,可以通過異步消息讓驗(yàn)證服務(wù)或者獎(jiǎng)勵(lì)各自執(zhí)行各自的服務(wù),提升效率,提升用戶體驗(yàn);
②流量控制->比如電商秒殺活動(dòng),秒殺時(shí)一般會(huì)因?yàn)榱髁客蝗槐q,過大流量導(dǎo)致服務(wù)掛掉,解決此問題,一般會(huì)在服務(wù)前端加入消息隊(duì)列,控制可容流量,如果消息隊(duì)列長度超過最大可容數(shù)量,需要丟棄額外的請求(或者跳轉(zhuǎn)其他頁面)來控制流量,接著秒殺業(yè)務(wù)可以根據(jù)消息隊(duì)列中有效的請求信息再做后續(xù)的處理;
③日志處理->比如kafka消息隊(duì)列,其最初設(shè)計(jì)是為了處理日志,大數(shù)據(jù)中應(yīng)用的比較多,當(dāng)日志數(shù)據(jù)采集的時(shí)候,定時(shí)寫入kafka隊(duì)列,然后kafka對數(shù)據(jù)進(jìn)行儲(chǔ)存,轉(zhuǎn)發(fā)等處理;
④服務(wù)解耦->接著上面秒殺話題,假設(shè)用戶搶到,會(huì)有訂單order服務(wù),商品product服務(wù),用戶下單后,order服務(wù)需要通知product服務(wù),如果order服務(wù)直接調(diào)用product服務(wù)的接口,這兩個(gè)服務(wù)之間是耦合的;那么使用MQ,用戶下單后,order完成持久化并將消息寫入MQ隊(duì)列,返回order訂單完成,product服務(wù)訂閱MQ隊(duì)列中order的消息,采用推拉的方式獲取order下單信息,product服務(wù)根據(jù)order下單的信息,進(jìn)行相關(guān)product商品的信息的變動(dòng)(扣庫存),如果過程中product服務(wù)不能正常執(zhí)行,也不會(huì)影響order服務(wù),因?yàn)閛rder服務(wù)寫入MQ隊(duì)列之后,就不在關(guān)心其他訂閱服務(wù)的后續(xù)操作了,這樣就現(xiàn)實(shí)了服務(wù)解耦;
?
貳
就用order,product應(yīng)用來做測試
order 服務(wù)
第一步老套路先引入maven依賴
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId> </dependency>第二步y(tǒng)ml配置還是和上篇差不多,將rabbitmq的配置寫入了碼云git倉庫里,利用SpringCloud Bus讀取
spring:application:name: ordercloud:config:discovery:enabled: trueservice-id: CONFIGprofile: deveureka:client:service-url:defaultZone: http://localhost:8761/eureka/希望碼云git早點(diǎn)支持動(dòng)態(tài)SpringCloud Bus,能用還是能用,先用著,自己手動(dòng)post請求好了
MQ接收方: 新建一個(gè)ReceiverMsg類,用來測試接收MQ的消息
package com.cloud.order.MQmsg;import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;@Component @Slf4j public class ReceiverMsg {//此處注解指定去獲取名為myQueue的queues隊(duì)列消息@RabbitListener(queues = "myQueue")public void printMQ(String message){log.info("【隊(duì)列消息】ReceiverMsg ,printMQ={}",message);}}MQ發(fā)送方: 這邊直接寫在單元測試中,測試是否能成功,代碼如下
package order;import com.cloud.order.ServerApplication; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;//由于之前對項(xiàng)目進(jìn)行了項(xiàng)目多模塊化,運(yùn)行單元測試的時(shí)候需要引導(dǎo)main啟動(dòng)類 //沒有可以不考慮classes = ServerApplication.class @SpringBootTest(classes = ServerApplication.class) @RunWith(SpringRunner.class) public class MQTest extends OrderApplicationTests{//AmqpTemplate 操作MQ的API@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void send() {amqpTemplate.convertAndSend("myQueue","hello , MQ");}}RabbitMQ : 開啟docker中的rabbitmq,在http://192.168.99.100:15672/#/queues中新增一個(gè)名為myQueue的隊(duì)列,用于上面測試
啟動(dòng)服務(wù),可以看到有已經(jīng)有兩個(gè)SpringCloudBus隊(duì)列了,一個(gè)是SpringCloud Config,一個(gè)是上面的order服務(wù),注冊中心eureka一直都是開啟狀態(tài)
接著執(zhí)行單元測試,在myQueue中的波動(dòng),說明有流量:?↓↓↓
調(diào)用方法中的print,也在控制臺(tái)中打印日志 :?↓↓↓
?
循序漸進(jìn),每次操作推拉隊(duì)列信息的時(shí)候,需要方法寫一下,rabbitmq面板中手動(dòng)加一次,肯定是很lower的
將ReceiverMsg類中的注解,改成以下方式,即可自動(dòng)批量創(chuàng)建Queue
@RabbitListener(queuesToDeclare = {@Queue("myQueue"),@Queue("myQueue2")})啟動(dòng)項(xiàng)目后,即可生成Queue
?
Exchange交換機(jī),消息不直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī),通過隊(duì)列綁定交換機(jī)轉(zhuǎn)給隊(duì)列
如果需要綁定Exchange,可以改動(dòng)注解
@RabbitListener(bindings = @QueueBinding(value = @Queue("myQueue"),exchange = @Exchange("MyExchange") ))?
繼續(xù)order服務(wù)中,假設(shè)有很多賣方參與,賣奶茶的、賣mac的等等
不同賣方服務(wù)會(huì)發(fā)出不同的MQ消息,比如賣奶茶的只關(guān)心奶茶訂單、賣mac的只關(guān)心mac訂單,相互不關(guān)心其他的組的信息,機(jī)子跑不動(dòng)了,就模擬多服務(wù)處理方式?↓↓↓
接收方:?
@Component @Slf4j public class ReceiverMsg {@RabbitListener(bindings = @QueueBinding(key = "milkTea",value = @Queue("milkTeaOrder"),exchange = @Exchange("MyOrder")))public void milkTeaMQ(String message){log.info("【隊(duì)列消息】ReceiverMsg.milkTeaMQ ,milkTea={}",message);}@RabbitListener(bindings = @QueueBinding(key = "mac",value = @Queue("macOrder"),exchange = @Exchange("MyOrder")))public void macMQ(String message){log.info("【隊(duì)列消息】ReceiverMsg.macMQ ,macMQ={}",message);} }測試 發(fā)送方:
@SpringBootTest(classes = ServerApplication.class) @RunWith(SpringRunner.class) public class MQTest extends OrderApplicationTests{@Autowiredprivate AmqpTemplate amqpTemplate;@Testpublic void sendMilkTea() {//第一個(gè)參數(shù)exchange; 第二個(gè)參數(shù)key; 第三個(gè)參數(shù)發(fā)送的msgamqpTemplate.convertAndSend("myOrder","milkTea","hello , milkTeaMQ");}@Testpublic void sendMac() {amqpTemplate.convertAndSend("myOrder","mac","hello , macMQ");} }?
以上是RabbitMQ的基礎(chǔ)使用學(xué)習(xí),還有很多高級(jí)內(nèi)容,比如消息延遲處理,優(yōu)先級(jí)等等
之后慢慢研究,有相關(guān)學(xué)習(xí)的同學(xué),可以分享一下地址,Mark一起學(xué)習(xí)學(xué)習(xí),謝謝
?
----------------------------------------------------------------
轉(zhuǎn)載于:https://my.oschina.net/u/3829444/blog/1836946
總結(jié)
以上是生活随笔為你收集整理的SpringCloud 微服务 (十) 消息队列MQ 基础的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前端组件库 - 搭建web app常用的
- 下一篇: elasticsearch 内存溢出,节