Corda服务的异步流调用
如何使流程更快? 如果您已經與Corda合作了一段時間,那么您很有可能已經考慮過這一點。 您可以通過以下幾方面進行合理的調整來提高性能:事務大小,優化查詢并減少整個Flow執行過程中所需的網絡躍點數。 在某種程度上,還有另一種可能也讓您無所適從。 多線程。
更具體地說,從已經執行的流程異步啟動流程/子流程。 這樣做有可能極大地改善您的CorDapps性能。
如果您嘗試此操作,則可能會遇到與我得到的類似的例外。 此外,到目前為止,Corda還不支持子流的線程化。 但是,仍然可以做到。 我們只需要對此保持聰明。 那就是Corda Services中多線程進入的地方。它們可以在Flow中調用,但不會妨礙Flow對其施加的嚴格規則,因為正在執行的Flow不會在服務中掛起或檢查點。
在本文中,我將重點介紹從服務內部以多線程方式啟動流程。 在Corda中還可以使用其他線程,但這是我想更深入研究的有趣領域。 另一方面,從服務啟動流程也充滿了一些陷阱。 這些需要考慮并遍歷。 否則,您將有一天醒來,想知道為什么一切都沒有明顯的原因停止了。
幸運的是,我在這里為您提供幫助。 對我來說,我必須直面這個問題。
對我來說幸運的是,R3能夠提供幫助。
作為參考,我將在本文中使用Corda Enterprise 3.1 。 要真正從本文的內容中受益,您將需要使用Enterprise。 這是由于Enterprise支持多個異步執行的流。 開源目前不允許這樣做。
我還建議您查看我以前的文章Corda Services 101,因為我們將在此基礎上建立基礎。
情境
讓我們從概述本帖子將要使用的場景開始。
- 隨著時間的推移,甲方向甲方發送一些消息。 每個消息來自一個流。
- 甲方回應所有發送給他們的消息。 每個消息都來自單個Flow,但是它們希望在單個位置執行該過程。
可以快速組合一系列流程來滿足此要求。 按順序進行此操作應該證明絕對是零問題(在解決了我們所有犯下的愚蠢錯誤之后)。
盡管這種情況對于需要性能的情況很不利,但是它很容易理解,因此我們可以專注于異步運行。
慢速同步解決方案
在研究異步解決方案之前,快速瀏覽一下將要使用的代碼將是有益的。 下面是ReplyToMessagesFlow的代碼。 我不想遍歷所有底層代碼,而只想專注于與此帖子相關的代碼:
@InitiatingFlow @StartableByRPC class ReplyToMessagesFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { reply(it) }}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == ourIdentity }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)@Suspendableprivate fun reply(message: StateAndRef) = subFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)} }如果您確實閱讀過Corda Services 101,那么您可能已經認識到此類。 正如我之前提到的,為提出的問題組合解決方案非常容易。 從Vault檢索MessageState ,然后啟動子subFlow以subFlow進行回復。
這段代碼將愉快地逐個傳遞消息。
那么,我們可以采用此代碼并使其更快嗎?
異步嘗試失敗
讓我們嘗試通過引入線程來使當前代碼更快! 我們將使用CompletableFutures來做到這一點:
@InitiatingFlow @StartableByRPC class ReplyToMessagesBrokenAsyncFlow : FlowLogic<List>() {@Suspendableoverride fun call(): List {return messages().map { CompletableFuture.supplyAsync { reply(it) }.join() }}// everything else is the same as before }大多數代碼與以前相同,因此已從示例中排除。
對代碼的唯一更改是添加了CompletableFuture及其supplyAsync方法(來自Java)。 它嘗試在單獨的線程上開始為每個消息執行reply功能。
那么為什么將本節命名為“一次失敗的嘗試”? 我指的是執行以上代碼時獲得的堆棧跟蹤:
java.util.concurrent.CompletionException: java.lang.IllegalArgumentException: Required value was null.at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592) ~[?:1.8.0_172]at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1582) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[?:1.8.0_172]at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[?:1.8.0_172] Caused by: java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192) ~[corda-node-3.1.jar:?]at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271) ~[corda-node-3.1.jar:?]at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312) ~[corda-core-3.1.jar:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:57) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:46) ~[classes/:?]at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$poop$$inlined$map$lambda$1.get(ReplyToMessagesBrokenAsyncFlow.kt:19) ~[classes/:?]您將獲得它,以及Corda正在打印的一長串檢查點日志行。 此外,只是為了掩蓋我的屁股并向您證明這不是由于CompletableFuture的問題引起的,這是使用Executor線程池時出現的另一個錯誤:
Exception in thread "pool-29-thread-1" Exception in thread "pool-29-thread-2" java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748) java.lang.IllegalArgumentException: Required value was null.at net.corda.node.services.statemachine.FlowStateMachineImpl.checkDbTransaction(FlowStateMachineImpl.kt:201)at net.corda.node.services.statemachine.FlowStateMachineImpl.processEventImmediately(FlowStateMachineImpl.kt:192)at net.corda.node.services.statemachine.FlowStateMachineImpl.subFlow(FlowStateMachineImpl.kt:271)at net.corda.core.flows.FlowLogic.subFlow(FlowLogic.kt:312)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.reply(ReplyToMessagesBrokenAsyncFlow.kt:48)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow.access$reply(ReplyToMessagesBrokenAsyncFlow.kt:19)at com.lankydanblog.tutorial.flows.ReplyToMessagesBrokenAsyncFlow$call$$inlined$map$lambda$1.run(ReplyToMessagesBrokenAsyncFlow.kt:29)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)希望您在這一點上相信我。 如果不是,請參考我一開始所說的內容。 Corda當前不支持從正在執行的流程異步啟動新流程。 我相信他們正在努力。 但是,截至目前。 不要使用此解決方案。
可行的異步解決方案
我們已經看到,在Flow內部執行線程是行不通的。 為了繼續追求性能,我們現在來看一下Corda服務中的線程。 這并不奇怪,因為標題和開頭的段落已經討論了這一點……
拋開諷刺的評論。 委派服務將需要對原始解決方案進行一些重做,但是大部分代碼將保持不變。 大部分內容將被復制并粘貼到另一個類中。 從流中獲取代碼并將其放入服務中。
以下是新的MessageService ,其中包含原始ReplyToMessagesFlow的代碼,但進行了一些更改和添加了線程代碼:
@CordaService class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {private companion object {val executor: Executor = Executors.newFixedThreadPool(8)!!}fun replyAll() {messages().map {executor.execute {reply(it)}}}private fun messages() =repository().findAll(PageSpecification(1, 100)).states.filter { it.state.data.recipient == serviceHub.myInfo.legalIdentities.first() }private fun repository() = serviceHub.cordaService(MessageRepository::class.java)private fun reply(message: StateAndRef) =serviceHub.startFlow(SendMessageFlow(response(message), message))private fun response(message: StateAndRef): MessageState {val state = message.state.datareturn state.copy(contents = "Thanks for your message: ${state.contents}",recipient = state.sender,sender = state.recipient)} }如您所見,大多數代碼與ReplyToMessagesFlow中的代碼相同。
我要強調的第一點是使用Executor線程池。 我這里沒有使用CompletableFutures ,原因是我們稍后將要討論的原因。
那么,這一切如何運作? replyAll函數在新的系統線程上對從Vault檢索到的每條消息執行reply 。 該新線程又調用startFlow 。 觸發將新的流程放入“流程工作器”隊列中。 這是所有樂趣發生的地方,一切開始變得混亂。
Flow Worker隊列負責執行Flow執行的順序,并將在Flow添加和完成時填充并為空。 該隊列對于協調節點內流的執行至關重要。 當涉及到多線程Flows本身時,它也是痛苦的根源。
下圖顯示了隊列的簡化視??圖:
流進入隊列并在處理后離開
我為什么要談論這個隊列? 好吧,我們需要格外小心,不要將無法完成的流程填滿隊列。
怎么會這樣 通過在正在執行的流程中啟動流程,然后流程等待其完成。 直到隊列的線程池中的所有線程都遇到這種情況,這才不會引起問題。 一旦發生,它將使隊列陷入僵局。 沒有流程可以完成,因為它們都依賴于許多排隊的流程來完成。
流留在隊列中,等待它們調用的流完成
這很可能在多次觸發相同流量的高吞吐量系統上發生。 現在,隊列中充滿了等待其他流完成的流的機會增加了。
這不是很好,使事情變得有些困難。 但是,只要我們意識到這一點,我們就可以適應它。
這也是Executor線程池而不是CompletableFuture的原因。 通過啟動新流程而不等待其完成,可以避免死鎖。 這也是該解決方案的缺點。 沒有新Flow的結果,其功能將極為有限。
話雖如此,如果您的用例適合上面顯示的結構,那么我絕對建議您使用此解決方案。
在下一節中,我將討論如何使用CompletableFuture 。
CompletableFutures的危險解決方案
這很危險的原因很簡單。 僵局。 我建議不要使用此解決方案。 除非您的節點有權訪問足夠的線程,否則要減少用無法完成的線程填充隊列的機會。 另一方面,這是一個更為理想的解決方案,因為您可以等待啟動的流程的結果并對其進行處理。 這使解決方案更加有用。
以下是帶有CompletableFutures的MessageService外觀:
@CordaService class MessageService(private val serviceHub: AppServiceHub) : SingletonSerializeAsToken() {fun replyAll(): List =messages().map { reply(it).returnValue.toCompletableFuture().join() }// everything else is the same as before }除了replyAll函數外,代碼replyAll 。 返回的CordaFuture提供的toCompletableFuture函數,調用join以等待所有期貨的結果并返回總體結果。
如前所述,此解決方案可能導致死鎖。 但是,對于您的情況,也許并非如此。 由您決定發生這種情況的可能性。 如果不利于您,最好走開。 選擇堅持使用類似于上一節中詳述的同步或異步解決方案。
我真的需要這樣做嗎?
現在,是的,我相信你會的。
展望未來,我懷疑您是否需要依靠我在本文中提出的解決方案。
我相信Corda正在努力消除從Flow內部啟動Flow時甚至不必考慮線程的需求。 取而代之的是,您可以簡單地調用帶有選項的subFlow來異步運行它。 這本可以使我們保留原始的同步解決方案,但可以選擇使每個subFlow在單獨的線程上運行。
將各部分結合在一起
總之,在Corda Enterprise 3中,可以在正在執行的流程中異步啟動新流程。 根據您的用例,這可以提供良好的性能優勢。 有缺點。 您不能等待異步流的結果,而不會用死鎖的威脅來威脅您的節點。 節點的基礎隊列無法處理它所處的情況。因此,在將線程引入到Flow調用中時,需要格外小心。 值得慶幸的是,隨著Corda的發展,您甚至不必擔心自己這樣做。 它甚至可能像添加布爾函數參數一樣簡單。 那是夢想!
這篇文章中使用的代碼可以在我的GitHub上找到 。
如果您認為這篇文章有幫助,可以在Twitter上@LankyDanDev關注我,以跟上我的新文章。
翻譯自: https://www.javacodegeeks.com/2018/09/asynchronous-flow-invocations-corda-services.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的Corda服务的异步流调用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 个人电脑cpu性价比推荐(个人电脑cpu
- 下一篇: jsf 后台参数到页面_JSF:直接从页