Spring整合基础
本文是我們名為“ Spring Integration for EAI ”的學(xué)院課程的一部分。
在本課程中,向您介紹了企業(yè)應(yīng)用程序集成模式以及Spring Integration如何解決它們。 接下來,您將深入研究Spring Integration的基礎(chǔ)知識(shí),例如通道,轉(zhuǎn)換器和適配器。 在這里查看 !
目錄
-  1.簡(jiǎn)介 2.什么是Spring Integration? 3. Spring Integration消息傳遞系統(tǒng)的核心概念 
- 
-  3.1訊息 3.2消息通道 3.3消息端點(diǎn) 
 
- 
-  4.1通道適配器 4.2變壓器 4.3過濾器 4.4路由器 4.5拆分器和聚合器 4.6輪詢器 4.7消息橋 4.8消息處理程序鏈 
 
- 
-  5.1信息渠道 5.2網(wǎng)關(guān) 
 
4.組成
5.同步和異步通信
6.錯(cuò)誤處理
1.簡(jiǎn)介
在第二篇教程中,您將學(xué)習(xí)構(gòu)成Spring Integration核心的基本概念。 在解釋了這些概念之后,我們將審查項(xiàng)目隨附的不同組件。 此修訂版基于3.0.1版本。 考慮到4.0.0版本即將發(fā)布,您可能會(huì)發(fā)現(xiàn)一些本教程中未介紹的新組件。 無論如何,您將獲得足夠的框架知識(shí),以了解未來組件的行為。
總結(jié)本教程,您將學(xué)習(xí)Spring Integration如何支持不同類型的通信(異步和同步),以及該決定如何影響您的設(shè)計(jì)。 錯(cuò)誤處理是一種特殊情況,上一節(jié)對(duì)此進(jìn)行了說明。
本教程由以下部分組成:
- 介紹
- 什么是Spring Integration?
- Spring Integration消息傳遞系統(tǒng)的核心概念
- 組件
- 同步和異步通訊
- 錯(cuò)誤處理
2.什么是Spring Integration?
如上一節(jié)所述,Spring Integration基于Enterprise Integration Patterns一書中解釋的概念。 這是一個(gè)輕量級(jí)的消息傳遞解決方案,它將為您的Spring應(yīng)用程序添加集成功能。 作為消息傳遞策略,它提供了一種快速共享信息的方式,并且所涉及的組件或應(yīng)用程序之間具有高度的去耦性。 您將學(xué)習(xí)如何在Spring處理任何底層基礎(chǔ)架構(gòu)問題的同時(shí)完成此任務(wù)。 這將使您可以專注于業(yè)務(wù)邏輯。
當(dāng)前,Spring Integration配置主要基于xml,盡管一些注釋已開始包含在內(nèi)。 本教程中顯示的示例也將基于xml,盡管我將盡可能顯示其各自的注釋。
在解釋了這一點(diǎn)之后,出現(xiàn)了一個(gè)問題:Spring Integration可以做什么? 該框架基本上允許您執(zhí)行以下操作:
-  它允許基于內(nèi)存消息傳遞在應(yīng)用程序中的組件之間進(jìn)行通信。 這允許這些應(yīng)用程序組件彼此松散耦合,并通過消息通道共享數(shù)據(jù)。  
圖1 
-  它允許與外部系統(tǒng)通信。 您只需要發(fā)送信息; Spring Integration將處理將其發(fā)送到指定的外部系統(tǒng),并在必要時(shí)帶回響應(yīng)。 當(dāng)然,這是相反的。 Spring Integration將處理從外部系統(tǒng)到您的應(yīng)用程序的傳入調(diào)用。 本教程稍后將對(duì)此進(jìn)行解釋。  
圖2 
Spring Integration面向Spring框架的最佳實(shí)踐,例如使用接口進(jìn)行編程或在繼承技術(shù)上進(jìn)行組合。 它的主要優(yōu)點(diǎn)是:
- 組件之間的耦合松散。
- 面向事件的體系結(jié)構(gòu)。
- 集成邏輯(由框架處理)與業(yè)務(wù)邏輯分離。
在下一節(jié)中,您將學(xué)習(xí)此消息傳遞系統(tǒng)所基于的三個(gè)基本概念。
3. Spring Integration消息傳遞系統(tǒng)的核心概念
消息驅(qū)動(dòng)的體系結(jié)構(gòu)的基本概念是: 消息 , 消息通道和消息端點(diǎn) 。
該API非常簡(jiǎn)單:
- 消息發(fā)送到端點(diǎn)
- 端點(diǎn)之間通過MessageChannels連接
- 端點(diǎn)可以從MessageChannel接收消息
3.1訊息
一條消息包含將在應(yīng)用程序的不同組件之間共享或發(fā)送到外部系統(tǒng)的信息。 但是,這是什么信息? 消息的結(jié)構(gòu)如下:
圖3
如您在以下代碼片段中所見,消息是一個(gè)接口,以GenericMessage作為其主要實(shí)現(xiàn)(也由框架提供):
圖4
- 標(biāo)頭 :包含有關(guān)消息的元信息。 如果檢查MessageHeaders類,您將看到它只是Map的包裝,但是其插入操作被標(biāo)記為不支持。 框架這樣標(biāo)記它們,因?yàn)橄⒈徽J(rèn)為是不可變的。 創(chuàng)建消息后,您將無法對(duì)其進(jìn)行修改。 您可以以鍵值對(duì)的形式添加自己的標(biāo)頭,但它們主要用于傳遞傳輸信息。 例如,如果您要發(fā)送電子郵件,它將包含標(biāo)題,例如to,subject,from。
- 有效載荷 :這只是一個(gè)普通的Java類,其中包含您要共享的信息。 它可以是任何Java類型。
如果要?jiǎng)?chuàng)建消息,則有兩種選擇。 第一個(gè)涉及使用構(gòu)建器類( MessageBuilder )。
Message<String> message = MessageBuilder.withPayload("my message payload").setHeader("key1", "value1").setHeader("key2", "value2").build();構(gòu)建消息之前,您必須設(shè)置有效負(fù)載和必需的標(biāo)頭,因?yàn)橐坏﹦?chuàng)建了消息,您將無法執(zhí)行該操作,除非您創(chuàng)建新消息。
另一個(gè)選擇是使用框架提供的實(shí)現(xiàn):
Map<String, Object> headers = new HashMap<>();
headers.put("key1", "value1");
headers.put("key2", "value2");Message<String> message = new GenericMessage<String>("my message payload", headers);3.2消息通道
消息通道是連接端點(diǎn)和消息通過的管道。 生產(chǎn)者向通道發(fā)送消息,而消費(fèi)者從通道接收消息。 通過這種機(jī)制,您不需要任何類型的經(jīng)紀(jì)人。
消息通道也可以用作攔截點(diǎn)或用于消息監(jiān)視。
圖5
根據(jù)消息的使用方式,消息通道分類如下:
3.2.1點(diǎn)對(duì)點(diǎn)
消息通道上只有一個(gè)接收器。 好吧,這并非完全是100%正確。 如果是可訂閱的頻道,則可以有多個(gè)接收者,但只有一個(gè)可以處理該消息。 現(xiàn)在,請(qǐng)忘記這一點(diǎn),因?yàn)檫@是一個(gè)高級(jí)主題,將在本課程的后面部分介紹(調(diào)度程序配置)。 這種類型的渠道有幾種實(shí)現(xiàn)方式:
圖6
-  DirectChannel :實(shí)現(xiàn)SubscribableChannel 。 該消息通過同一接收者的線程發(fā)送給訂戶。 此通信是同步的,并且生產(chǎn)方將阻塞,直到收到響應(yīng)為止。 怎么運(yùn)行的:
- 生產(chǎn)者將消息發(fā)送到通道。
 
-  QueueChannel :實(shí)現(xiàn)PollableChannel 。 有一個(gè)端點(diǎn)連接到通道,沒有用戶。 這種通信是異步的。 接收者將通過其他線程檢索消息。 怎么運(yùn)行的:
- 生產(chǎn)者將消息發(fā)送到通道。
 
-  ExecutorChannel :實(shí)現(xiàn)SubscribableChannel。 發(fā)送被委托給TaskExecutor。 這意味著send()方法將不會(huì)阻塞。
-  PriorityChannel :實(shí)現(xiàn)PollableChannel。 與QueueChannel相似,但消息按優(yōu)先級(jí)而不是FIFO排序。
-  RendezvousChannel :實(shí)現(xiàn)PollableChannel。 與QueueChannel類似,但容量為零。 生產(chǎn)者將阻塞,直到接收者調(diào)用其receive()方法。
3.2.2發(fā)布-訂閱
該通道可以有多個(gè)端點(diǎn)訂閱。 因此,該消息將由不同的接收者處理。
圖7
-  PublishSubscribeChannel :實(shí)現(xiàn)SubscribableChannel。 訂閱的接收者可以通過生產(chǎn)者的線程連續(xù)調(diào)用。 如果我們指定TaskExecutor,則接收者將通過不同的線程并行調(diào)用。
3.2.3臨時(shí)頻道
 這是一種特殊的通道,由沒有明確定義輸出通道的端點(diǎn)自動(dòng)創(chuàng)建。 創(chuàng)建的通道是點(diǎn)對(duì)點(diǎn)匿名通道。 您可以在消息頭中的replyChannel名稱下看到它的定義。 
發(fā)送響應(yīng)后,會(huì)自動(dòng)刪除這些類型的通道。 建議您不要顯式定義輸出通道(如果不需要)。 該框架將為您處理。
圖8
3.3消息端點(diǎn)
它的目標(biāo)是以非侵入方式將應(yīng)用程序與消息傳遞框架連接。 如果您熟悉Spring MVC,則端點(diǎn)將以與MVC控制器處理HTTP請(qǐng)求相同的方式處理消息。 端點(diǎn)將以MVC控制器映射到URL模式的相同方式映射到消息通道。
圖9
以下是帶有可用消息端點(diǎn)的簡(jiǎn)要說明的列表:
- 通道適配器 :將應(yīng)用程序連接到外部系統(tǒng)(單向)。
- 網(wǎng)關(guān) :將應(yīng)用程序連接到外部系統(tǒng)(雙向)。
- 服務(wù)激活器 :可以調(diào)用服務(wù)對(duì)象上的操作。
- 變壓器 :轉(zhuǎn)換消息的內(nèi)容。
- 過濾器 :確定消息是否可以繼續(xù)發(fā)送到輸出通道。
- 路由器 :決定將消息發(fā)送到哪個(gè)通道。
- 拆分器 :將郵件拆分為幾個(gè)部分。
- 聚合器 :將多個(gè)消息合并為一個(gè)消息。
本教程的下一部分將說明這些端點(diǎn)中的每個(gè)端點(diǎn)。
4.組成
在本節(jié)中,您將學(xué)習(xí)什么是不同的端點(diǎn),以及如何在Spring Integration中使用它們。
4.1通道適配器
通道適配器是允許您的應(yīng)用程序與外部系統(tǒng)連接的端點(diǎn)。 如果您查看參考,您將看到所提供的類型,例如連接到JMS隊(duì)列,MongoDB數(shù)據(jù)庫(kù),RMI,Web服務(wù)等。
適配器有四種類型:
- 入站通道適配器 :單向。 它從外部系統(tǒng)接收消息。 然后,它通過消息通道進(jìn)入我們的消息傳遞系統(tǒng),我們將在其中進(jìn)行處理。
- 出站通道適配器 :單向。 我們的消息系統(tǒng)創(chuàng)建一條消息并將其發(fā)送到外部系統(tǒng)。
- 入站網(wǎng)關(guān) :雙向。 一條消息進(jìn)入應(yīng)用程序,并期望得到響應(yīng)。 響應(yīng)將發(fā)送回外部系統(tǒng)。
- 出站網(wǎng)關(guān) :雙向。 該應(yīng)用程序創(chuàng)建一條消息并將其發(fā)送到外部系統(tǒng)。 然后,網(wǎng)關(guān)將等待響應(yīng)。
4.2變壓器
該端點(diǎn)用于有效負(fù)載轉(zhuǎn)換。 它將有效負(fù)載的類型轉(zhuǎn)換為另一種類型。 例如,從String到XML文檔。 只要考慮到轉(zhuǎn)換有效負(fù)載會(huì)產(chǎn)生一條新消息(請(qǐng)記住該消息是不可變的!)。 這種類型的端點(diǎn)增加了生產(chǎn)者與消費(fèi)者之間的松散耦合,因?yàn)橄M(fèi)者不需要知道生產(chǎn)者是什么類型的。 轉(zhuǎn)換器將負(fù)責(zé)處理并交付用戶正在等待的內(nèi)容類型。
Spring Integration提供了Transformer的幾種實(shí)現(xiàn) 。 這里有些例子:
- HeaderEnricher:允許在消息中添加標(biāo)題值。
- ObjectToMapTransformer:將對(duì)象轉(zhuǎn)換為地圖,將其屬性轉(zhuǎn)換為地圖值。
- ObjectToStringTransformer:將對(duì)象轉(zhuǎn)換為字符串。 它通過調(diào)用其toString()操作對(duì)其進(jìn)行轉(zhuǎn)換。
- PayloadSerializingTransformer / PayloadDeserializingTransformer:從Object轉(zhuǎn)換為字節(jié)數(shù)組, 反之亦然 。
讓我們看幾個(gè)例子:
假設(shè)我們有以下模型:
public class Order implements Serializable {private static final long serialVersionUID = 1L;private int id;private String description;public Order() {}public Order(int id, String description) {this.id = id;this.description = description;}@Overridepublic String toString() {return String.valueOf(this.getId());}//Setters & Getters
}將其發(fā)送到名為“ requestChannel”的消息通道時(shí),以下代碼段將通過調(diào)用Order實(shí)例的toString()方法將其自動(dòng)轉(zhuǎn)換為String:
<int:object-to-string-transformer input-channel="requestChannel" output-channel="transformedChannel"/> 結(jié)果字符串將被發(fā)送到名為transformedChannel的輸出通道。 
如果需要更定制的轉(zhuǎn)換,則可以實(shí)現(xiàn)自己的轉(zhuǎn)換器,這是一個(gè)普通的bean。 您將需要在transformer元素中指定引用的bean,如下所示:
<int:transformer ref="myTransformer" method="transform"input-channel="requestChannel" output-channel="transformedChannel"/>轉(zhuǎn)換器將調(diào)用名為“ myTransformer”的bean的“ transform”方法。 該bean如下所示:
@Component("myTransformer")
public class MyTransformer {public Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()+"_modified");}
} 在此示例中,變壓器元素的method屬性不是必需的,因?yàn)樽儔浩髦挥幸环N方法。 如果它有幾種方法,則需要設(shè)置“ method”屬性以告知框架要調(diào)用的方法。 或者,如果您更喜歡注釋,則可以在方法級(jí)別使用@Transformer注釋指定方法: 
@Component("myTransformer")
public class MyTransformer {@Transformerpublic Order transform(Order requestOrder) {return new Order(requestOrder.getId(), requestOrder.getDescription()+"_modified");}public Order doOtherThings(Order requestOrder) {//do other things}
}4.3過濾器
過濾器用于確定消息是否應(yīng)繼續(xù)其發(fā)送方式,或者相反,是否已丟棄。 要決定要做什么,它基于一些標(biāo)準(zhǔn)。
以下過濾器實(shí)現(xiàn)將從輸入通道接收Order實(shí)例,并丟棄帶有無效描述的實(shí)例。 有效訂單將發(fā)送到輸出通道:
<int:filter ref="myFilter" method="filterInvalidOrders" input-channel="requestChannel" output-channel="filteredChannel"/>過濾器方法返回布爾類型。 如果返回false,則該消息將被丟棄:
@Component("myFilter")
public class MyFilter {public boolean filterInvalidOrders(Order order) {if (order == null || "invalid order".equals(order.getDescription())) {return false;}return true;}
} 與轉(zhuǎn)換器一樣,僅當(dāng)在filter bean中定義了多個(gè)method , method屬性才是必需的。 要指定您要調(diào)用的方法,請(qǐng)使用@Filter批注: 
@Filter
public boolean filterInvalidOrders(Order order) {Spring表達(dá)語言
如果您的過濾器非常簡(jiǎn)單,則可以跳過任何Java類來實(shí)現(xiàn)過濾器。 您可以使用SpEL定義過濾器。 例如,以下代碼片段將實(shí)現(xiàn)與上述相同的過濾器,但沒有Java代碼:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"/>丟棄消息
使用默認(rèn)配置,丟棄的消息只是被靜默丟棄。 我們可以更改它,如果我們決定這樣做,我們有兩個(gè)選擇:
1.我們可能不想丟失任何消息。 在這種情況下,我們可以拋出一個(gè)異常:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"throw-exception-on-rejection="true"/>2.我們要注冊(cè)所有丟棄的消息。 我們可以配置一個(gè)丟棄通道:
<int:filter expression="!payload.description.equals('invalid order')" input-channel="requestChannel" output-channel="filteredChannel"discard-channel="discardedOrders"/>4.4路由器
路由器允許您根據(jù)條件將消息重定向到特定的消息通道。
與往常一樣,該框架提供了一些最基本的實(shí)現(xiàn)。 以下示例使用有效負(fù)載類型路由器。 它將從請(qǐng)求通道接收消息,并且根據(jù)有效負(fù)載的類型,它將把它發(fā)送到另一個(gè)輸出通道:
<int:payload-type-router input-channel="requestChannel"><int:mapping type="String" channel="stringChannel"/><int:mapping type="Integer" channel="integerChannel"/>
</int:payload-type-router>您可以在此處查看完整列表。
現(xiàn)在讓我們回到訂單示例,我們將實(shí)現(xiàn)一個(gè)路由器,該路由器將根據(jù)訂單描述重定向消息。
<int:router ref="myRouter" input-channel="requestChannel" default-output-channel="genericOrders"/>路由器實(shí)現(xiàn)包含一個(gè)方法,該方法返回將消息重定向到的消息通道的名稱:
@Component("myRouter")
public class MyRouter {public String routeOrder(Order order) {String returnChannel = "genericOrders";if (order.getDescription().startsWith("US-")) {returnChannel = "usOrders";}else if (order.getDescription().startsWith("EU-")) {returnChannel = "europeOrders";}return returnChannel;}
} 如果有幾種方法,可以使用@Router批注: 
@Router
public String routeOrder(Order order) {與過濾器相同,您可以基于Spring表達(dá)式語言路由消息。
4.5拆分器和聚合器
拆分器的目標(biāo)是接收消息并將其劃分為幾個(gè)部分。 這些零件然后分別發(fā)送,以便可以獨(dú)立處理。 該端點(diǎn)通常與聚合器組合。
聚合器獲取消息列表,并將它們組合為一條消息。 這與拆分器相反。
您將通過一個(gè)示例更好地看到這一點(diǎn):
我們將修改訂單示例,以便拆分器接收訂單包。 該軟件包包含拆分器將分離的幾個(gè)相關(guān)訂單。 拆分器獲取訂單包并返回訂單列表:
<int:splitter input-channel="requestChannel" ref="mySplitter" output-channel="splitChannel"/>拆分器的實(shí)現(xiàn)非常簡(jiǎn)單:
@Component("mySplitter")
public class MySplitter {public List<Order> splitOrderPackage(OrderPackage orderPackage) {return orderPackage.getOrders();}
}拆分器返回訂單列表,但它可以返回以下任意值:
- 消息的集合或數(shù)組。
- Java對(duì)象的集合或數(shù)組。 每個(gè)列表元素將作為消息有效內(nèi)容包含在內(nèi)。
- 一個(gè)消息。
- 一個(gè)Java對(duì)象(將包含在消息有效負(fù)載中)。
在此示例之后,有一個(gè)聚合器端點(diǎn),該端點(diǎn)連接到“ splitChannel”通道。 該聚合器獲取列表并合并其訂單以形成訂單確認(rèn),并添加每個(gè)訂單的數(shù)量:
<int:channel id="splitChannel"/><int:aggregator ref="myAggregator" input-channel="splitChannel" output-channel="outputChannel"/>聚合器實(shí)現(xiàn):
@Component("myAggregator")
public class MyAggregator {public OrderConfirmation confirmOrders(List<Order> orders) {int total = 0;for (Order order:orders) {total += order.getQuantity();}OrderConfirmation confirmation = new OrderConfirmation("3");confirmation.setQuantity(total);return confirmation;}
}4.5.1相關(guān)和發(fā)布策略
當(dāng)消息由拆分器端點(diǎn)拆分時(shí),將設(shè)置兩個(gè)標(biāo)頭:
- MessageHeaders.CORRELATION_ID
- MessageHeaders.SEQUENCE_SIZE
聚合器端點(diǎn)使用這些標(biāo)頭能夠正確組合消息。 它將保留消息,直到準(zhǔn)備好一組具有相同相關(guān)性ID的消息為止。 何時(shí)準(zhǔn)備就緒? 達(dá)到序列大小后即可準(zhǔn)備就緒。
 相關(guān)策略 
 允許對(duì)郵件進(jìn)行分組。 默認(rèn)情況下,它將在CORRELATION_ID標(biāo)頭中將所有具有相同值的消息分組。 有幾種策略可供選擇。 
 發(fā)布策略 
 默認(rèn)情況下,當(dāng)一組消息的大小達(dá)到消息頭SEQUENCE_SIZE指定的值時(shí),它將被視為完整。 
4.6輪詢器
在Spring Integration中,有兩種類型的使用者:
- 活躍的消費(fèi)者
- 被動(dòng)消費(fèi)者
被動(dòng)組件是那些訂閱了可訂閱頻道的組件。 這樣,當(dāng)消息發(fā)送到這種類型的通道時(shí),該通道將調(diào)用其訂戶。 消費(fèi)者的方法將被被動(dòng)調(diào)用。
活動(dòng)組件是連接到可輪詢通道的組件。 這樣,消息將排隊(duì)進(jìn)入通道,等待用戶主動(dòng)從通道中檢索消息。
輪詢程序用于指定活動(dòng)使用者如何檢索這些消息。 以下是幾個(gè)示例:
基本輪詢器配置
它將在一秒鐘的間隔內(nèi)輪詢消息通道
<int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"><int:poller fixed-rate="1000"/>
</int:service-activator>使用Cron表達(dá)式配置的輪詢器
它將每30分鐘輪詢一次消息通道
<int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"><int:poller cron="0 0/30 * * * ?"/>
</int:service-activator>要考慮的一件事是,如果使用者連接到可輪詢的頻道,則將需要一個(gè)輪詢器。 如果不是,將引發(fā)異常。 如果不想為每個(gè)活動(dòng)的使用者配置輪詢器,則可以定義一個(gè)默認(rèn)輪詢器:
<int:poller id="defaultPoller" fixed-rate="1000" default="true"/> 不要忘記設(shè)置default和id屬性。 
4.7消息橋
 這種類型的端點(diǎn)連接兩個(gè)消息通道或兩個(gè)通道適配器。 例如,您可以將SubscribableChannel通道連接到PollableChannel通道。 
這是一個(gè)示例:
<int:channel id="requestChannel"/><int:bridge input-channel="requestChannel" output-channel="pollableChannel"/><int:channel id="pollableChannel"><int:queue capacity="5"/>
</int:channel><int:service-activator method="processOrder" input-channel="pollableChannel" ref="orderProcessor"/><int:poller id="defaultPoller" fixed-rate="1000" default="true"/>在此示例中,消息傳遞橋從輸入通道接收消息,并將其發(fā)布到輸出通道。 在這種情況下,我們將服務(wù)激活器連接到輸出通道。 訂單處理器(服務(wù)激活器)將每隔一秒鐘輪詢一次消息通道。
4.8消息處理程序鏈
當(dāng)您以線性方式連接多個(gè)消息處理程序時(shí),消息處理程序鏈用于簡(jiǎn)化配置。 以下示例顯示了將通過處理程序鏈進(jìn)行簡(jiǎn)化的消息傳遞配置:
<int:channel id="requestChannel"/>
<int:channel id="responseChannel"/><int:filter ref="myFilter" method="filterInvalidOrders" input-channel="requestChannel" output-channel="filteredChannel"/><int:channel id="filteredChannel"/><int:transformer ref="myTransformer" method="transform"input-channel="filteredChannel" output-channel="transformedChannel"/><int:channel id="transformedChannel"/><int:service-activator method="processOrder" input-channel="transformedChannel" ref="orderProcessor" output-channel="responseChannel"/>消息通過過濾器,然后到達(dá)轉(zhuǎn)換器,最后,消息將由服務(wù)激活器處理。 完成后,消息將發(fā)送到輸出通道“ responseChannel”。
使用消息過濾器鏈,配置將簡(jiǎn)化如下:
<int:channel id="requestChannel"/>
<int:channel id="responseChannel"/><int:chain input-channel="requestChannel" output-channel="responseChannel"><int:filter ref="myFilter" method="filterInvalidOrders"/><int:transformer ref="myTransformer" method="transform"/><int:service-activator ref="orderProcessor" method="processOrder"/>
</int:chain>5.同步和異步通信
如本課程的第一篇教程中所述,通信可以同步或異步執(zhí)行。 本節(jié)說明如何更改此通信。
5.1信息渠道
根據(jù)您配置消息通道的方式,將同步或異步檢索消息。 無需更改很多東西,只需更改配置即可。
例如,假設(shè)我們有一個(gè)類似下面的點(diǎn)對(duì)點(diǎn)直接渠道:
<int:channel id="requestChannel"/>發(fā)送到該通道的消息將立即傳遞給被動(dòng)使用者(訂戶)。 如果期望得到響應(yīng),則發(fā)件人將等待直到將其發(fā)送給他。 為了改變這一點(diǎn),我們只需要添加一個(gè)隊(duì)列:
<int:channel id="requestChannel"><int:queue capacity="5"/>
</int:channel>而已。 現(xiàn)在,該通道最多可以將五個(gè)消息排隊(duì)。 使用者將從與發(fā)件人不同的線程中主動(dòng)檢索在此通道中排隊(duì)的消息。
現(xiàn)在,發(fā)布-訂閱頻道如何? 讓我們以配置同步通道為例:
<int:publish-subscribe-channel id="mySubscribableChannel"/>在這種情況下,我們將使用任務(wù)執(zhí)行程序來更改其行為:
<int:publish-subscribe-channel id="mySubscribableChannel" task-executor="myTaskExecutor"/><task:executor id="myTaskExecutor" pool-size="5"/>5.2網(wǎng)關(guān)
網(wǎng)關(guān)是一種通道適配器,可用于:
- 提供消息傳遞系統(tǒng)的進(jìn)入/退出機(jī)制。 這樣,應(yīng)用程序可以將消息發(fā)送到消息傳遞系統(tǒng),消息傳遞系統(tǒng)將通過其消息端點(diǎn)對(duì)其進(jìn)行處理。
- 向外部系統(tǒng)發(fā)送消息并等待響應(yīng)(輸出網(wǎng)關(guān))
- 接收來自外部系統(tǒng)的消息,并在處理后發(fā)送響應(yīng)(入站網(wǎng)關(guān))。
本示例使用第一種情況。 該應(yīng)用程序?qū)⑼ㄟ^網(wǎng)關(guān)發(fā)送消息,并等待消息傳遞系統(tǒng)對(duì)其進(jìn)行處理。 在這里,我們將使用同步網(wǎng)關(guān)。 因此,測(cè)試應(yīng)用程序?qū)l(fā)送消息并阻止,等待響應(yīng)。
介面
 網(wǎng)關(guān)將捕獲對(duì)它的sendOrder方法的所有調(diào)用。 看到?jīng)]有該接口的實(shí)現(xiàn)。 網(wǎng)關(guān)將包裝它以攔截那些呼叫。 
public interface OrderService {@Gatewaypublic OrderConfirmation sendOrder(Order order);
}配置
網(wǎng)關(guān)鏈接到接口,以便攔截其呼叫并將消息發(fā)送到消息傳遞系統(tǒng)。
<int:gateway default-request-channel="requestChannel" service-interface="xpadro.spring.integration.service.OrderService"/><int:channel id="requestChannel"/>考試
 服務(wù)接口(網(wǎng)關(guān))被注入到應(yīng)用程序中。 調(diào)用"sendOrder"方法會(huì)將Order對(duì)象發(fā)送到消息傳遞系統(tǒng),并包裝在消息中。 
@Autowired
private OrderService service;@Test
public void testSendOrder() {OrderConfirmation confirmation = service.sendOrder(new Order(3, "a correct order"));Assert.assertNotNull(confirmation);Assert.assertEquals("confirmed", confirmation.getId());
}在另一個(gè)示例中,測(cè)試類將阻塞,直到將訂單確認(rèn)發(fā)送回為止。 現(xiàn)在我們將對(duì)其進(jìn)行配置以使其異步:
介面
唯一的變化是返回未來
public interface OrderService {@Gatewaypublic Future<OrderConfirmation> sendFutureOrder(Order order);
}考試
現(xiàn)在,測(cè)試必須處理將從網(wǎng)關(guān)返回的Future對(duì)象。
@Autowired
private OrderService service;@Test
public void testSendCorrectOrder() throws ExecutionException {Future<OrderConfirmation> confirmation = service.sendFutureOrder(new Order(3, "a correct order"));OrderConfirmation orderConfirmation = confirmation.get();Assert.assertNotNull(orderConfirmation);Assert.assertEquals("confirmed", orderConfirmation.getId());
}6.錯(cuò)誤處理
本教程的最后一部分將說明錯(cuò)誤處理的差異,具體取決于我們配置的通信類型是同步還是異步。
在同步通信中,發(fā)送者在使用同一線程將消息發(fā)送到消息傳遞系統(tǒng)時(shí)阻塞。 顯然,如果引發(fā)異常,它將到達(dá)應(yīng)用程序(上一節(jié)示例中的測(cè)試)。
但是,在異步通信中,使用者從另一個(gè)線程檢索消息。 如果引發(fā)異常,它將無法到達(dá)應(yīng)用程序。 Spring Integration如何處理它? 這是錯(cuò)誤通道進(jìn)入的地方。
引發(fā)異常時(shí),它將包裝到MessagingException中 ,成為新消息的有效內(nèi)容。 該消息發(fā)送至:
- 錯(cuò)誤通道:此通道在原始消息頭中定義為名為“ errorChannel”的頭。
- 全局錯(cuò)誤通道:如果消息頭中未定義錯(cuò)誤通道,則將其發(fā)送到全局錯(cuò)誤通道。 這個(gè)通道是Spring Integration默認(rèn)定義的。
全局錯(cuò)誤通道
該頻道是發(fā)布-訂閱頻道。 這意味著我們可以將自己的端點(diǎn)訂閱到此通道,并接收引發(fā)的任何錯(cuò)誤。 實(shí)際上,Spring Integration已經(jīng)預(yù)訂了一個(gè)端點(diǎn):一個(gè)日志處理程序。 該處理程序記錄發(fā)送到全局錯(cuò)誤通道的所有消息的有效負(fù)載。
要訂閱另一個(gè)端點(diǎn)以處理異常,我們只需要按以下方式進(jìn)行配置:
<int:service-activator input-channel="errorChannel" ref="myExceptionHandler" method="handleInvalidOrder"/><bean id="myExceptionHandler" class="xpadro.spring.integration.activator.MyExceptionHandler"/> 我們的服務(wù)激活器端點(diǎn)的handleInvalidOrder方法將收到消息傳遞異常: 
public class MyExceptionHandler {@ServiceActivatorpublic void handleInvalidOrder(Message<MessageHandlingException> message) {//Retrieve the failed order (payload of the source message)Order requestedOrder = (Order) message.getPayload().getFailedMessage().getPayload();//Handle exception...}
}翻譯自: https://www.javacodegeeks.com/2015/09/spring-integration-fundamentals.html
總結(jié)
以上是生活随笔為你收集整理的Spring整合基础的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: Java中多线程的性能比较
- 下一篇: java与java ee_Java EE
