javascript
使用Spring WebFlux从Corda节点流式传输数据
自上次發布以來已經有一段時間了,但我終于回來了! 由于我仍在我的項目中,因此我將再次撰寫有關使用Corda的文章。 這次,我們將不再關注Corda,而是將Spring與Corda結合使用。 更具體地說,Spring WebFlux。 為什么這樣 第一,因為我們可以。 第二,因為它允許我們流式傳輸來自Corda節點的事件。 這使我們能夠跟蹤流的進度或檢索對Vault的更新,并將其發送給注冊到相關端點的任何客戶端。 將WebFlux與Corda結合使用確實會帶來一些問題。 有些來自Corda,有些來自Spring。 雖然,Spring問題與我有關,但我期望Spring Boot + WebFlux組合默認對我有更多作用。
在本文中,我假設您對Corda有一定的經驗,但是如果您確實需要有關此主題的更多信息,我建議您閱讀我以前的文章: 什么是Corda和Corda 開發 。 此外,我還建議您看一下使用Spring WebFlux做事作為WebFlux的介紹。
本教程的內容將使用Corda的3.2開源版本。 我實際上是根據3.1開始撰寫這篇文章的,但是在此期間發布了較新的版本。 因此,有一些基于在這些版本之間移動的注釋。
我們還將在Kotlin中實現所有內容,但本文的內容也可以在Java中實現。
示例應用程序簡介
我們將為一個非常簡單的應用程序建模,該應用程序不會提供太多使用,因此,出于這篇博文的目的,我將其合并在一起。 該應用程序將由一方向另一方發送消息(由MessageState表示)組成。 為此, SendMessageFlow將運行,一旦運行,雙方將擁有消息的副本,僅此而已。 簡短而簡單,但應向我們提供足夠的知識來證明WebFlux如何與Corda配合使用。
結構體
通常,我從查看依賴關系開始。 盡管由于將代碼分成了單獨的模塊,所以最好先查看小示例應用程序的結構。
+-- app | +-- {spring code} | +-- build.gradle +-- cordapp | +-- {flow code} | +-- build.gradle +-- contracts-and-states | +-- {contracts and states code} | +-- build.gradle +-- build.gradle這是應用程序結構的快速視圖。 app將包含所有Spring代碼,并將通過RPC委托給Corda節點。 cordapp模塊包含流邏輯, contracts-and-states按照名稱的建議進行操作,并包含契約和狀態代碼。 cordapp模塊和contracts-and-states模塊都打包到Cordapp Jars中,并轉儲到Corda節點中。
這些模塊中的每個模塊都包含一個build.gradle文件,該文件包含其相關的構建信息和相關性。 由于本文不是直接著眼于編寫Corda代碼,因此我們將不繼續詳細研究每個模塊及其構建文件。 取而代之的是,我們僅在帖子末尾重新整理流代碼,以便我們專注于Spring實現。
Spring模塊的依賴
以下是app模塊的build.gradle文件(包含Spring代碼):
我不是Gradle的專家,因此該代碼段中可能有一些可以做得更好的事情,但是它確實可以滿足需要。
因此,我想強調一些事情。 使用Spring Boot 2.0.3.RELEASE ,使用kotlin-spring插件向所有標有某些Spring注釋的Kotlin類添加open 。 在很多情況下都需要這樣做,因為Spring要求某些類是非最終的。 這在Java中不是問題,但對于Kotlin來說是有問題的,因為默認情況下所有類都是final。 有關該插件的更多信息,請訪問kotlinlang.org 。
spring-boot-starter-webflux了WebFlux依賴關系以及常規的Spring Web服務器代碼,以使一切正常運行。
rxjava-reactive-streams ,這是一個有趣的內容,我們將在以后看到它。 由于Corda使用RxJava 1.xx而不是較新的RxJava2,因此其Observable不能實現Spring WebFlux用于返回反應流的Java 8 Publisher接口。 這種依賴性將這些較舊的Observable轉換為Publisher因此它們與WebFlux兼容。 稍后,當我們查看執行轉換的代碼時,將再次涉及到這一點。
最后, netty-all版本被強制為4.1.25.Final以解決依賴關系問題。
路由功能
WebFlux引入了一種功能性方法,用于將請求路由到處理請求的功能。 有關更多信息,請參見使用Spring WebFlux進行操作 。 我不想深入探討WebFlux的工作方式,但我們將快速定義路由功能。 主要原因是由于使用Kotlin而不是Java。 Kotlin提供了一種使用DSL定義功能的不同方法。
下面是定義本教程路由的代碼:
routes bean接收MessageHandler bean(我們將在后面進行介紹),并將兩個URI映射到該MessageHandler找到的函數。 與Java實現相比,DSL允許的版本略短。 此片段中有幾個部分需要重點關注。
("/messages")定義兩個路由功能的基本請求路徑。 DSL允許功能從此基本路徑嵌套自己,并幫助傳達路由的結構。
一個函數在發送請求后返回的響應中接受TEXT_EVENT_STREAM ( text/event-stream ),同時還將APPLICATION_JSON ( application/stream+json )指定為正文的內容。 由于我們已經定義了Content-Type ,因此在大多數情況下,我們可以假設我們將發送一個POST請求(就是這樣)。 POST從先前的配置中進一步嵌套,并添加了另一個MessageHandler函數來接受請求。
第二個功能從Corda節點接收更新。 為此,它返回APPLICATION_STREAM_JSON并期望將GET請求發送到/messages/updates 。
處理函數
在本節中,我們將看一下上一節中幾次提到的MessageHandler 。 此類包含執行實際業務邏輯的所有功能。 路由只是達到這一點的一種方法。
我以前的文章“用Spring WebFlux做事”將比我在本文中更深入地解釋這些示例中更多WebFlux的特定部分。
下面是處理程序代碼:
首先,我們應突出顯示NodeRPCConnection類及其類型為CordaRPCOps的屬性proxy 。 我從示例Corda和Spring應用程序 (由R3員工編寫)中竊取了NodeRPCConnection 。 簡而言之, NodeRPCConnection創建到Corda節點的RPC連接,并且proxy返回CordaRPCOps 。 CordaRPCOps包含所有可用的RPC操作。 這就是Spring與Corda節點交互的方式。
讓我們仔細看看updates功能:
此功能返回新消息,將其保存到Vault中。 如果您有一個監視來自Corda節點的更新的應用程序,則這種端點會很好。
此代碼段中與Corda相關的代碼全部包含在trackNewMessages函數中。 它使用CordaRPCOps的vaultTrackBy訪問保管庫服務,并開始跟蹤對任何MessageState的更新。 由于我們尚未將任何參數傳遞給該函數,因此它將僅跟蹤UNCONSUMED狀態。 vaultTrackBy返回一個DataFeed對象, DataFeed對象可以用于通過snapshot屬性檢索保管庫的snapshot也可以通過訪問updates屬性來返回一個Observable以允許其訂閱更新事件。 我們將使用此RxJava Observable將數據流式傳輸回調用方。
這是我們需要使用我前面提到的rxjava-reactive-streams的第一個實例。 toPublisher方法接受一個Observable并將其轉換為Publisher 。 請記住,WebFlux需要兼容Java 8的反應性流庫,這些庫必須實現Publisher 。 例如,Spring傾向于使用提供Mono和Flux類的Reactor 。
創建Publisher ,需要將其饋送到ServerResponse 。 由于目前一切順利,我們將通過ok方法返回200響應。 然后將Content-Type設置為APPLICATION_STREAM_JSON因為它包含流數據。 最后,響應的主體從trackNewMessages中獲取Publisher trackNewMessages 。 現在,端點已準備好由發出請求的客戶端進行訂閱。
現在,完成了從節點到客戶端的流更新功能。 實際保存新消息怎么辦? 此外,是否有任何信息可以傳遞給發送者有關執行流程的信息? 因此,讓我們回答這兩個問題。 是的,我們可以使用WebFlux保存新消息。 是的,流程可以返回其當前進度。
下面是post函數的代碼,該函數在流的流進度時將新消息保存到發件人和收件人的節點上:
proxy.startTrackedFlow啟動一個流程,該流程的進度可以由添加到該流程的任何ProgressTracker跟蹤。 此類中定義的startTrackedFlow委托給上述函數,并返回其progress屬性; 一個Observable<String>其事件由ProgressTracker的進度組成。
傳遞到流中的MessageState是從請求傳遞的Message對象創建的。 這是因為它包含的信息比MessageState本身少,因此可以更輕松地將消息數據輸入到端點。 parse將Message傳遞的字符串X500名稱轉換為CordaX500Name ,然后假定存在網絡中,轉換為網絡中的Party 。
然后通過created方法將其打包到響應中。 指定Content-Type來告訴客戶端它包含text/event-stream 。 消息的路徑使用在執行流之前創建的UUID 。 例如,這可以用于檢索特定的消息,但是您需要自己實現該消息,因為我太懶了,因此本文不做。
創建一個客戶
現在已經設置了端點,我們應該創建一個可以發送請求并使用發送回它的流的客戶端。 稍后,我們將簡要地看一下流代碼,以更全面地了解正在發生的事情。
為了將請求發送到響應式后端,Spring WebFlux提供了WebClient類。 發送請求后, WebClient可以對響應中發送的每個事件做出反應。 下面的MessageClient就是這樣做的:
MessageClient包裝并使用WebClient將請求發送到WebClient的構建器中指定的地址。 在該課程中,關于反序列化還有一些額外的配置,但是我想暫時重新介紹一下,因為還有一部分內容涉及該主題。
和以前一樣, 使用Spring WebFlux做事提供了WebFlux特定方法的深入解釋。
因此,讓我們單獨查看每個請求,首先將POST請求發送到/messages端點:
post方法創建一個用于指定請求內容的構建器。 這應該與我們之前定義的端點匹配。 建立請求后,請調用exchange方法將其發送到服務器。 然后,將響應的主體映射到Flux<String> ,以使其可以訂閱。 那就是使用反應流的本質。 訂閱響應后,客戶端將決定對每個事件執行他們希望執行的任何處理。 在這種情況下,它只是打印出ProgressTracker的當前步驟。
如果我們通過這段代碼發送請求,我們將收到以下信息:
STEP: Verifying STEP: Signing STEP: Sending to Counterparty STEP: Collecting signatures from counterparties. STEP: Verifying collected signatures. STEP: Done STEP: Finalising STEP: Requesting signature by notary service STEP: Broadcasting transaction to participants STEP: Done STEP: Done這些是SendMessageFlow的ProgressTracker定義的步驟。 是的,我知道我還沒有向您顯示該代碼,但是請相信我。 真的沒有太多其他。 如您所見,流返回的每個字符串值都將“ STEP”附加到自身
現在進入到/messages/update端點的GET請求:
同樣,在這一點上沒有什么可顯示的。 但是,在幕后,實際上需要大量工作才能使它工作。 我需要打通電話才能解決的所有問題都圍繞著序列化和反序列化。 我們將在下一部分中進行介紹。
對此請求的響應如下:
UPDATE: 0 consumed, 1 producedConsumed:Produced: 56781DF3CEBF2CDAFACE1C5BF04D4962B5483FBCD2C2E428352AD82BC951C686(0) : TransactionState(data=MessageState(sender=O=PartyA, L=London, C=GB, recipient=O=PartyB, L=London, C=GB, contents=hello there, linearId=1afc6144-32b1-4265-a06e-73b6bb81aef3_b0fa8491-c9b9-418c-ba6e-8b7840faaf30, participants=[O=PartyA, L=London, C=GB, O=PartyB, L=London, C=GB]), contract=com.lankydanblog.tutorial.contracts.MessageContract, notary=O=Notary, L=London, C=GB, encumbrance=null, constraint=net.corda.core.contracts.WhitelistedByZoneAttachmentConstraint@4a1febb5)關于此端點的好處是,它現在維持與該節點的連接,該節點將繼續將所有相關更新發送回此客戶端。 上面的請求是原始POST消息的更新。 客戶端收到的任何新事件都會在客戶端上輸出更新。 這就是使此類端點非常適合觸發進程或僅在與Corda節點本身分開的前端上顯示最新數據的理想之選。
序列化和反序列化
在本節中,我想集中精力正確設置序列化和反序列化。 從/messages/updates端點檢索的數據需要正確地序列化其數據,以傳遞給客戶端,客戶端也需要能夠反序列化響應數據。
通常,Spring會為您執行很多操作,而且仍然可以執行,但是似乎WebFlux仍需要一些額外的步驟來正確設置它。 免責聲明,這是根據我的經驗,如果您知道執行此操作的更好方法,那么我很想聽聽您的意見。
Corda Jackson支持
Spring傾向于默認使用杰克遜,并且很方便地,Corda本身提供了很多杰克遜設置。 JacksonSupport.cordaModule為諸如Party和CordaX500Name類的類提供了一些序列化和反序列化。 如果在某些基本情況下需要對Corda類進行序列化或反序列化,則這可能會滿足您的需求。 在Spring中,您可以創建一個默認的ObjectMapper將檢索并添加到其自身的bean。
但是,此路線有一些警告。 由于模塊依賴于ObjectMapper可以訪問節點信息,例如通過RPC客戶端CordaRPCOps訪問節點信息,因此無法反序列化某些類。 否則,反序列化Party , AbstractParty或AnonymousParty將會失敗。 不僅如此,由于不安全,此功能現已從Corda 3.2棄用。 JacksonSupport.cordaModule也已移入其自己的類( CordaModule )。
我下面提供的解決方案也是Corda從現在開始建議采用的解決方案。
以下是MessageClient從/messages/updates端點檢索更新時引發的異常(對于本節的其余部分,將使用相同的端點):
com.fasterxml.jackson.databind.ObjectMapper cannot be cast to net.corda.client.jackson.JacksonSupport$PartyObjectMapper由此,我們可以確定我們的ObjectMapper類型錯誤,并且實際上需要是PartyObjectMapper的子類型。 進一步介紹一下,我們可以看到在JacksonSupport類中也找到了該映射器。 現在,剩下要做的就是創建這個映射器,并使用它而不是默認的ObjectMapper 。
因此,讓我們看看如何做到這一點:
這將創建一個RpcObjectMapper ,它實現PartyObjectMapper并利用RPC檢索節點信息,從而可以反序列化各種參與方類。 在createDefaultMapper, CordaModule了之前的CordaModule ,感謝Spring,對于大多數需要序列化或反序列化的實例(以后要CordaModule注意),它現在是默認的對象映射器。
一些更多的序列化和反序列化配置
現在……我實際上處于一個很奇怪的位置。 我想通過所有其他步驟來使端點正常工作。 但是,無論我做什么,我似乎都無法重新創建曾經遇到的所有錯誤,然后才能使其正常工作。 我不知道該說些什么……我的異常被吞沒在某處,阻止我看到正在發生的事情。 無論如何,我們必須繼續。 值得慶幸的是,我知道為什么我添加了其余的代碼,但是我無法再為您提供每個更改都已修復的例外……
Soooo,讓我們看一下我們早先開始研究的rpcObjectMapper的最終產品:
這里有一些補充。 JsonComponentModule作為bean添加,以便它拾取定義的@JsonSerializer和@JsonDeserializer自定義組件(在其他類中)。 似乎即使將它作為模塊添加到映射器,如果要查找和注冊自定義JSON組件,它仍然需要創建bean本身。
接下來是MixinModule 。 此類解決了在反序列化Vault.Update和SecureHash時出現的問題。 讓我們仔細看看。
Mixin允許我們將Jackson注釋添加到類上,而實際上沒有訪問類本身的權限,而這顯然是我們無法控制的,因為這是Corda代碼庫中的一個對象。 另一個選擇是將其添加到我們之前討論的CordaModule ,但這是CordaModule 。
Vault.Update需要這種方法,是因為它擁有一個名為isEmpty的方法,該方法不能很好地與Jackson配合使用,后者感到困惑,并認為isEmpty與一個名為empty的布爾字段匹配。 因此,當將JSON反序列化回對象時,它將嘗試為該字段傳遞一個值。
MixinModule本身只是一個類,其構造函數將VaultUpdateMixin和SecureHashMixin添加到其自身中。 然后,映射器會像添加其他模塊一樣添加該模塊。 任務完成。
添加到VaultUpdateMixin的Jackson批注是@JsonIgnore ,這可以說明@JsonIgnore 。 序列化或反序列化時, isEmpty函數將被忽略。
接下來是SecureHashMixin :
從3.1移到3.2后,我已經添加了這個。 對我來說,似乎忘記了為SecureHash添加Mixin。 CordaModule包括用于SecureHash.SHA256序列化和反序列化,但不包括SecureHash 。 上面的代碼是從CordaModule復制和粘貼的, CordaModule一個類與Mixin綁定在一起。
包含此內容后,將解決3.1和3.2之間的差異。
我想我會為此提出一個問題!
定制序列化器和反序列器
要序列化Vault.Update僅AttachmentConstraint接口需要它自己的自定義序列化程序:
HashAttachmentConstraint因為只有HashAttachmentConstraint實際上有任何字段。 這與稍后反序列化器匹配,在反序列化器上讀取type JSON字段以確定創建哪個對象。
需要自定義反序列器的最后兩個類是ContractState和AttachmentContract (與之前的序列化程序匹配):
ContractStateDeserialiser是一個非常懶惰的實現,因為在本教程中僅使用一種狀態。 AttachmentConstraintDeserialiser使用序列化程序中定義的type字段來確定應將其轉換為AttachmentConstraint哪種實現。
WebFlux特定的配置
由于使用了WebFlux,本小節將介紹額外的必需配置。 您已經在MessageClient看到了一些配置,但是還需要做一些額外的工作:
客戶端需要這個bean能夠反序列化application/stream+json以及響應中返回的對象。
要使用配置中定義的Jackson2JsonDecoder ,必須指定WebClient的ExchangeStrategies 。 不幸的是,沒有編寫ExchangeStrategies類來拾取我們已經創建的Jackson2JsonDecoder 。 我希望這種配置在默認情況下可以工作,但是,哦,很好。 要添加ExchangeStrategies ,必須使用WebClient構建器。 一旦完成,我們終于到了。 完成打包響應的所有序列化以及從客戶端使用響應序列的反序列化已完成。
總結了我希望在本文中討論的所有與Spring相關的代碼。
快速了解Flow代碼
在結束之前,我將簡要展示為完成本教程的目的而編寫的流程:
這是一個非常簡單的流程,增加了一個ProgressTracker , /messages請求用于跟蹤流程的當前狀態。 簡而言之,該流程將MessageState傳遞給它,并將其發送給交易對手。 在流程中移動時, ProgressTracker將更新為相關步驟。 可以在Corda文檔中找到有關使用ProgressTracker更多文檔。
關閉時間
老實說,這比我想象的要長得多,而且花了我的時間比我希望的要長得多。
總之,Spring WebFlux提供了使用響應流在響應事件到達時進行處理的功能。 當與Corda一起使用時,可以跟蹤流程的進度,并可以保持持久的庫更新流,隨時準備在它們到達時采取行動。 為了充分利用帶有Corda的WebFlux,我們還必須研究確保對象由服務器正確序列化,然后由客戶端反序列化以便可以使用它們。 Lucky Corda確實提供了其中一些功能,但是缺少一兩個類或功能,我們需要確保使用提供的對象映射器使用它們。 不幸的是,使用Spring模塊時,WebFlux需要比我通常習慣的更多的配置,但是沒有什么不能解決的。
這篇文章的其余代碼可以在我的GitHub上找到
如果您喜歡這篇文章,可以在@LankyDanDev上的Twitter上關注我,我在其中發布新帖子的更新(盡管最近它們的速度有所放緩)。
翻譯自: https://www.javacodegeeks.com/2018/07/streaming-data-corda-node-spring-webflux.html
總結
以上是生活随笔為你收集整理的使用Spring WebFlux从Corda节点流式传输数据的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 树莓派电脑系统更新(树莓派最新系统)
- 下一篇: 华硕进入blos快捷键(华硕进入blos