javascript
Spring Cloud Stream 体系及原理介绍
https://mp.weixin.qq.com/s/e_pDTFmFcSqHH-uSIzNmMg
Spring Cloud Stream?在 Spring Cloud 體系內用于構建高度可擴展的基于事件驅動的微服務,其目的是為了簡化消息在 Spring Cloud 應用程序中的開發(fā)。
?
Spring Cloud Stream?(后面以 SCS 代替 Spring Cloud Stream)?本身內容很多,而且它還有很多外部的依賴,想要熟悉 SCS,必須要先了解 Spring Messaging 和 Spring Integration 這兩個項目,接下來,文章將從圍繞以下三點進行展開:
?
-
什么是 Spring Messaging;
-
什么是 Spring Integration;
-
什么是 SCS 體系及其原理;
?
Spring Messaging
Spring Messaging 是 Spring Framework 中的一個模塊,其作用就是統(tǒng)一消息的編程模型。
-
比如消息 Messaging 對應的模型就包括一個消息體 Payload 和消息頭 Header:
?
package?org.springframework.messaging;
public?interface?Message<T>?{
? ? T?getPayload();
? ? MessageHeaders?getHeaders();
}
-
消息通道 MessageChannel 用于接收消息,調用 send 方法可以將消息發(fā)送至該消息通道中 :
?
@FunctionalInterface
public?interface?MessageChannel?{
? ? long?INDEFINITE_TIMEOUT?=?-1;
? ? default?boolean?send(Message<?>?message) {
? ? ? ? ?return?send(message,?INDEFINITE_TIMEOUT);
? ? ?}
? ? ?boolean?send(Message<?>?message,?long?timeout);
}
消息通道里的消息如何被消費呢?
-
由消息通道的子接口可訂閱的消息通道 SubscribableChannel 實現(xiàn),被 MessageHandler 消息處理器所訂閱:
public?interface?SubscribableChannel?extends?MessageChannel?{
? ? boolean?subscribe(MessageHandler?handler);
? ? boolean?unsubscribe(MessageHandler?handler);
}
-
由MessageHandler 真正地消費/處理消息:
@FunctionalInterface
public?interface?MessageHandler?{
? ? void?handleMessage(Message<?>?message)?throws?MessagingException;
}
Spring Messaging 內部在消息模型的基礎上衍生出了其它的一些功能,如:
1. 消息接收參數(shù)及返回值處理:消息接收參數(shù)處理器 HandlerMethodArgumentResolver 配合 @Header, @Payload 等注解使用;消息接收后的返回值處理器 HandlerMethodReturnValueHandler 配合 @SendTo 注解使用;
2. 消息體內容轉換器 MessageConverter;
3. 統(tǒng)一抽象的消息發(fā)送模板 AbstractMessageSendingTemplate;
4. 消息通道攔截器 ChannelInterceptor;
?
Spring Integration
Spring Integration 提供了 Spring 編程模型的擴展用來支持企業(yè)集成模式(Enterprise Integration Patterns),是對 Spring Messaging 的擴展。
它提出了不少新的概念,包括消息路由 MessageRoute、消息分發(fā) MessageDispatcher、消息過濾 Filter、消息轉換 Transformer、消息聚合 Aggregator、消息分割 Splitter 等等。同時還提供了 MessageChannel 和MessageHandler 的實現(xiàn),分別包括 DirectChannel、ExecutorChannel、PublishSubscribeChannel 和MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等內容。
這里為大家介紹幾種消息的處理方式:
-
消息的分割:
?
-
消息的聚合:
?
?
-
消息的過濾:
?
-
消息的分發(fā):
?
?
接下來,我們以一個最簡單的例子來嘗試一下 Spring Integration:
這段代碼解釋為:
?
SubscribableChannel messageChannel =new DirectChannel(); // 1
messageChannel.subscribe(msg-> { // 2
?System.out.println("receive: " +msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); // 3
?
1. 構造一個可訂閱的消息通道 messageChannel;
2. 使用 MessageHandler 去消費這個消息通道里的消息;
3. 發(fā)送一條消息到這個消息通道,消息最終被消息通道里的 MessageHandler 所消費。
最后控制臺打印出: receive: msg from alibaba;
DirectChannel 內部有個 UnicastingDispatcher 類型的消息分發(fā)器,會分發(fā)到對應的消息通道 MessageChannel 中,從名字也可以看出來,UnicastingDispatcher 是個單播的分發(fā)器,只能選擇一個消息通道。那么如何選擇呢? 內部提供了 LoadBalancingStrategy 負載均衡策略,默認只有輪詢的實現(xiàn),可以進行擴展。
我們對上段代碼做一點修改,使用多個 MessageHandler 去處理消息:
SubscribableChannel?messageChannel?=?new?DirectChannel();
messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});
messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
由于 DirectChannel 內部的消息分發(fā)器是 UnicastingDispatcher 單播的方式,并且采用輪詢的負載均衡策略,所以這里兩次的消費分別對應這兩個 MessageHandler??刂婆_打印出:
receive1: msg from alibaba
receive2: msg from alibaba
既然存在單播的消息分發(fā)器 UnicastingDispatcher,必然也會存在廣播的消息分發(fā)器,那就是 BroadcastingDispatcher,它被 PublishSubscribeChannel 這個消息通道所使用。廣播消息分發(fā)器會把消息分發(fā)給所有的 MessageHandler:
SubscribableChannel?messageChannel?=?new?PublishSubscribeChannel();
messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive1: "?+?msg.getPayload());
});
messageChannel.subscribe(msg?->?{
? ? ?System.out.println("receive2: "?+?msg.getPayload());
});
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
發(fā)送兩個消息,都被所有的 MessageHandler 所消費??刂婆_打印:
receive1: msg from alibaba
receive2: msg from alibaba
receive1: msg from alibaba
receive2: msg from alibaba
?
Spring Cloud Stream
SCS與各模塊之間的關系是:
-
SCS 在 Spring Integration 的基礎上進行了封裝,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;
-
SCS 與 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;
-
SCS 與 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置類;
-
SCS 增強了消息發(fā)送失敗的和消費失敗情況下的處理邏輯等功能。
-
SCS 是 Spring Integration 的加強,同時與 Spring Boot 體系進行了融合,也是 Spring Cloud Bus 的基礎。它屏蔽了底層消息中間件的實現(xiàn)細節(jié),希望以統(tǒng)一的一套 API 來進行消息的發(fā)送/消費,底層消息中間件的實現(xiàn)細節(jié)由各消息中間件的 Binder 完成。
Binder 是提供與外部消息中間件集成的組件,為構造 Binding提供了 2 個方法,分別是 bindConsumer 和 bindProducer ,它們分別用于構造生產者和消費者。目前官方的實現(xiàn)有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 內部已經實現(xiàn)了 RocketMQ Binder。
?
從圖中可以看出,Binding 是連接應用程序跟消息中間件的橋梁,用于消息的消費和生產。我們來看一個最簡單的使用 RocketMQ Binder 的例子,然后分析一下它的底層處理原理:
-
啟動類及消息的發(fā)送:
@SpringBootApplication
@EnableBinding({?Source.class,?Sink.class?})?// 1
public?class?SendAndReceiveApplication?{
?
? ? public?static?void?main(String[]?args) {
? ? ? ? SpringApplication.run(SendAndReceiveApplication.class,?args);
? ? }
?
? ? ? ?@Bean?// 2
? ? public?CustomRunner?customRunner() {
? ? ? ? return?new?CustomRunner();
? ? }
? ? public?static?class?CustomRunner?implements?CommandLineRunner?{
? ? ? ? @Autowired
? ? ? ? private?Source?source;
? ? ? ? @Override
? ? ? ? public?void?run(String...?args)?throws?Exception?{
? ? ? ? ? ? int?count?=?5;
? ? ? ? ? ? for?(int?index?=?1;?index?<=?count;?index++) {
? ? ? ? ? ? ? ? source.output().send(MessageBuilder.withPayload("msg-"?+?index).build());?// 3
? ? ? ? ? ? }
? ? ? ? }
? ? }
}
-
消息的接收:
@Service
public?class?StreamListenerReceiveService?{
? ? @StreamListener(Sink.INPUT)?// 4
? ? public?void?receiveByStreamListener1(String?receiveMsg) {
? ? ? ? System.out.println("receiveByStreamListener: "?+?receiveMsg);
? ? }
}
這段代碼很簡單,沒有涉及到 RocketMQ 相關的代碼,消息的發(fā)送和接收都是基于 SCS 體系完成的。如果想切換成 RabbitMQ 或 Kafka,只需修改配置文件即可,代碼無需修改。
我們來分析下這段代碼的原理:
?
1.?@EnableBinding?對應的兩個接口屬性?Source?和?Sink?是 SCS 內部提供的。SCS 內部會基于?Source?和?Sink?構造?BindableProxyFactory,且對應的 output 和 input 方法返回的 MessageChannel 是?DirectChannel。output 和 input 方法修飾的注解對應的 value 是配置文件中 binding 的 name。
public?interface?Source?{
? ? String?OUTPUT?=?"output";
? ? @Output(Source.OUTPUT)
? ? MessageChannel?output();
}
public?interface?Sink?{
? ? String?INPUT?=?"input";
? ? @Input(Sink.INPUT)
? ? SubscribableChannel?input();
}
配置文件里 bindings 的 name 為 output 和 input,對應 Source 和 Sink 接口的方法上的注解里的 value:
spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
2. 構造 CommandLineRunner,程序啟動的時候會執(zhí)行 CustomRunner 的 run 方法。
3. 調用 Source 接口里的 output 方法獲取 DirectChannel,并發(fā)送消息到這個消息通道中。這里跟之前 Spring Integration 章節(jié)里的代碼一致。
-
Source 里的 output 發(fā)送消息到 DirectChannel 消息通道之后會被 AbstractMessageChannelBinder#SendingHandler 這個 MessageHandler 處理,然后它會委托給 AbstractMessageChannelBinder#createProducerMessageHandler 創(chuàng)建的 MessageHandler 處理(該方法由不同的消息中間件實現(xiàn));
-
不同的消息中間件對應的 AbstractMessageChannelBinder#createProducerMessageHandler 方法返回的 MessageHandler 內部會把 Spring Message 轉換成對應中間件的 Message 模型并發(fā)送到對應中間件的 broker;
4. 使用 @StreamListener 進行消息的訂閱。請注意,注解里的 Sink.input 對應的值是 "input",會根據(jù)配置文件里 binding 對應的 name 為 input 的值進行配置:
-
不同的消息中間件對應的 AbstractMessageChannelBinder#createConsumerEndpoint 方法會使用 Consumer 訂閱消息,訂閱到消息后內部會把中間件對應的 Message 模型轉換成 Spring Message;
-
消息轉換之后會把 Spring Message 發(fā)送至 name 為 input 的消息通道中;
-
@StreamListener 對應的 StreamListenerMessageHandler 訂閱了 name 為 input 的消息通道,進行了消息的消費;
這個過程文字描述有點啰嗦,用一張圖總結一下(黃色部分涉及到各消息中間件的 Binder 實現(xiàn)以及 MQ 基本的訂閱發(fā)布功能):
?
SCS 章節(jié)的最后,我們來看一段 SCS 關于消息的處理方式的一段代碼:
@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='1'")
public?void?receiveByHeader(Message?msg) {
? ? ?System.out.println("receive by headers['index']=='1': "?+?msg);
}
@StreamListener(value?=?Sink.INPUT,?condition?=?"headers['index']=='9999'")
public?void?receivePerson(@Payload?Person?person) {
? ? ?System.out.println("receive Person: "?+?person);
}
@StreamListener(value?=?Sink.INPUT)
public?void?receiveAllMsg(String?msg) {
? ? ?System.out.println("receive allMsg by StreamListener. content: "?+?msg);
}
@StreamListener(value?=?Sink.INPUT)
public?void?receiveHeaderAndMsg(@Header("index")?String?index,?Message?msg) {
? ? ?System.out.println("receive by HeaderAndMsg by StreamListener. content: "?+?msg);
}
有沒有發(fā)現(xiàn)這段代碼跟 Spring MVC Controller 中接收請求的代碼很像? 實際上他們的架構都是類似的,Spring MVC 對于 Controller 中參數(shù)和返回值的處理類分別是org.springframework.web.method.support.HandlerMethodArgumentResolver、 org.springframework.web.method.support.HandlerMethodReturnValueHandler。
Spring Messaging 中對于參數(shù)和返回值的處理類之前也提到過,分別是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler。
它們的類名一模一樣,甚至內部的方法名也一樣。
?
總結
?
上圖是 SCS 體系相關類說明的總結,關于 SCS 以及 RocketMQ Binder 更多相關的示例,可以參考 RocketMQ Binder Demos(Demos 地址:點擊“閱讀原文”),包含了消息的聚合、分割、過濾;消息異常處理;消息標簽、SQL過濾;同步、異步消費等等。
下一篇文章,我們將分析消息總線(Spring Cloud Bus) 在 Spring Cloud 體系中的作用,并逐步展開,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何實現(xiàn) Spring Cloud Stream 標準的。
轉載于:https://www.cnblogs.com/davidwang456/articles/10653269.html
總結
以上是生活随笔為你收集整理的Spring Cloud Stream 体系及原理介绍的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 追源索骥:透过源码看懂Flink核心框架
- 下一篇: 基于 Flink 的严选实时数仓实践