javascript
介绍Spring Cloud Stream与RabbitMQ集成
一. 首先安裝rabbitmq-management
這里用的是rabbitmq的docker鏡像,我們可以在Docker Hub中搜索rabbitmq, 找到最新的版本安裝
sudo docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.7.8-management安裝之后使用
docker ps -a檢查下, rabbitmq的鏡像是否啟動, 正常啟動狀態如下:
通過http://192.168.12.12:15672, 訪問到rabbitmq的管理端,
默認賬戶/密碼是:?guest/guest
?
二. Spring Cloud Stream與RabbitMQ集成
?引入依賴
<!-- Spring Cloud Stream RabbitMQ --> <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>創建消息接受類
@Component @Slf4j @EnableBinding(Processor.class) public class MyMQReciver {@StreamListener(Processor.INPUT)public void process(String message){log.info("hahahah : "+message);System.out.println("hahahah : "+message);} }代碼解讀兩處:
1. @StreamListener(Processor.INPUT)
? ? 這里其實是要聲明一個訂閱的鍵值, Processor類是一個org.springframework.cloud.stream.messaging jar包中內置的接口,查看其源碼可以看到它繼承了Source和Sink兩個類
? ?結構很簡單, 我們也可以仿照去實現自己的Processor
2. @EnableBinding(Processor.class)
? ?這里綁定的就是Processor(或者我們自己實現的Processor)
?
創建發送消息測試類
@RestController public class SendController {@Autowiredprivate Processor pipe;@GetMapping("/send")public void send(@RequestParam String message){pipe.output().send(MessageBuilder.withPayload(message).build());} }在application.yml中增加配置
spring:cloud:stream: bindings: input: destination: jeecgCloudExchangebinder: local_rabbitgroup: logMessageConsumersoutput: destination: jeecgCloudExchangebinder: local_rabbitbinders: local_rabbit: type: rabbitenvironment: spring: rabbitmq: host: localhostport: 5672username: guestpassword: guestvirtual-host: /啟動SpringCloud項目
首先瀏覽器進入rabbitmq管理端查看, 發現我們在application.yml中創建的output destination被自動創建出來了
input destination也被自動創建出來了,并且自動添加了綁定
?
確認rabbitmq這邊沒有問題后, 我們通過訪問消息接口測試,?http://localhost:8001/send?message=hello
發現MyMQReciver已經成功接受到了消息
?
三. 理論知識點
Spring Cloud Stream核心架構1
兩個比較重要的地方:inputs(輸入)消息接收端、outputs(輸出)消息發送端
一個 Spring Cloud Stream 應用以消息中間件為核心,應用通過Spring Cloud Stream注入的輸入/輸出通道 channels 與外部進行通信。channels 通過特定的Binder實現與外部消息中間件進行通信。
Spring Cloud Stream核心架構2
黃色:表示RabbitMQ
綠色:插件,消息的輸入輸出都套了一層插件,插件可以用于各種各樣不同的消息,也可以用于消息中間件的替換。
核心概念:
Barista接口:Barista接口是定義來作為后面類的參數,這一接口定義來通道類型和通道名稱,通道名稱是作為配置用,通道類型則決定了app會使用這一通道進行發送消息還是從中接收消息。
通道接口如何定義:
@Output:輸出注解,用于定義發送消息接口
@Input:輸入注解,用于定義消息的消費者接口
@StreamListener:用于定義監聽方法的注解
使用Spring Cloud Stream 非常簡單,只需要使用好這3個注解即可,在實現高性能消息的生產和消費的場景非常合適,但是使用SpringCloudStream框架有一個非常大的問題,就是不能實現可靠性的投遞,也就是沒法保證消息的100%可靠性,會存在少量消息丟失的問題。目前SpringCloudStream整合了RabbitMQ與Kafka,我們都知道Kafka是無法進行消息可靠性投遞的,這個原因是因為SpringCloudStream框架為了和Kafka兼顧所以在實際工作中使用它的目的就是針對高性能的消息通信的!這點就是在當前版本SpringCloudStream的定位。
因此在實際的工作中,可以采用SpringCloudStream,如果需要保證可靠性投遞,也可以單獨采用RabbitMQ,也是可以的。
總結
以上是生活随笔為你收集整理的介绍Spring Cloud Stream与RabbitMQ集成的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: P1862
- 下一篇: Istio微服务平台集成实践