javascript
使用Spring Boot和Project Reactor处理SQS消息-第2部分
這是我關于使用Spring Boot和Project Reactor有效處理SQS消息的博客文章的后續文章
我在第一部分中列出了一些方法上的差距。
1.處理SQS客戶端調用中的失敗
2.該方法一次只能處理來自SQS的一條消息,如何并行化 3.它不處理錯誤,管道中的任何錯誤都會中斷整個過程并停止從隊列中讀取更新的消息。
概括
回顧一下,上一篇文章演示了如何使用出色的Project Reactor創建管道來處理來自AWS SQS隊列的消息
該練習的最終結果是一個管道,如下所示:
有了這個管道,讓我現在討論如何彌合差距:
處理SQS客戶端故障
此功能生成從SQS讀取的消息流。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity())現在考慮上述“ sqsClient”存在連接問題的情況, Flux的行為是在發生錯誤的情況下終止了流。 當然,只要服務正在運行,這對于服務于處理消息的服務就不會起作用。
解決方法是在出現錯誤的情況下僅重試處理流程。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry()如果出現任何錯誤,這將導致Flux重新建立消息流。
并行處理消息
Project Reactor提供了幾種并行化處理管道的方式。 我第一次嘗試并行處理是在處理鏈中添加“ subscribeOn”方法。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .subscribeOn(Schedulers.newElastic( "sub" ))但是,這不是“ subscribeOn”的工作方式。 當我向該管道發送一些消息時,輸出如下:
2020 - 04 - 07 20 : 52 : 53.241 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.434 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.493 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.538 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.609 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 07 20 : 52 : 53.700 INFO 1137 --- [ sub- 3 ] sample.msg.MessageListenerRunner : Processed Message hello上面的“ sub-3”是處理消息的線程的名稱,看起來所有消息都在“ sub-3”線程上進行處理,而沒有其他線程在處理!
subscriptionOn只是通過從此調度程序池中借用“線程”來更改執行上下文,而不使用池本身中的所有線程。
那么如何使處理并行化呢? 這個StackOverflow答案提供了我在這里使用的一種非常好的方法,本質上是使用
flatMap運算符,然后在“ flatMap”運算符內添加“ subscribeOn”運算符。
該運算符急切地訂閱其內部發布者,然后將結果展平,其訣竅是可以為內部訂閱者提供他們自己的調度程序,并且對于每個訂閱,最終將使用調度程序池中的線程。 這些并發訂閱者的數量可以使用傳遞給flatMap運算符的“并發”參數來控制。
Flux.generate { sink: SynchronousSink<List<Message>> -> val receiveMessageRequest: ReceiveMessageRequest = ReceiveMessageRequest.builder() .queueUrl(queueUrl) .maxNumberOfMessages( 5 ) .waitTimeSeconds( 10 ) .build() val messages: List<Message> = sqsClient.receiveMessage(receiveMessageRequest).messages() sink.next(messages) } .flatMapIterable(Function.identity()) .retry() .flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .then() .subscribeOn(taskScheduler) }, concurrency)處理多個消息時的輸出如下所示–
2020 - 04 - 08 21 : 03 : 24.582 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.815 INFO 17541 --- [ taskHandler- 4 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 5 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 6 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.816 INFO 17541 --- [ taskHandler- 7 ] sample.msg.MessageListenerRunner : Processed Message hello 2020 - 04 - 08 21 : 03 : 24.817 INFO 17541 --- [ taskHandler- 8 ] sample.msg.MessageListenerRunner : Processed Message hello現在查看日志中,除了線程名(taskHandler- *)之外還有更多!
處理下游錯誤
我以前使用“重試”運算符進行的修復之一是關于使用sqsClient連接處理上游錯誤。 但是,有可能在管道中處理消息并且任何步驟引發錯誤時,整個管道都會失敗。 因此,重要的是要防止每一步失敗。 我一直致力于確保錯誤不會傳播的一種巧妙方法是使用出色的vavr庫及其“嘗試”類型 。 嘗試類型具有兩個結果–一個成功(成功)或一個異常(失敗)。 這使其余的管道可以按可衡量的方式對上一步的結果進行操作:
.flatMap({ (message: String, deleteHandle: () -> Unit) -> task(message) .then(Mono.fromSupplier { Try.of { deleteHandle() } }) .doOnNext { t -> t.onFailure { e -> LOGGER.error(e.message, e) } } .then() .subscribeOn(taskScheduler) }, concurrency)上面的代碼段演示了一種方法,在該方法中,我知道負責刪除消息的“ deleteHandle”會引發異常,Try捕獲了此異常,如果有錯誤記錄了異常,則該異常不會縮短消息流。
結論
我最初的想法是,因為我已經采取了一種被動的方式來處理消息,所以我將在我的sqs消息處理管道中獲得巨大的推動,但是,我的學習是,就像其他所有事情一樣,需要對基于Project的反應堆進行仔細的了解和調整流以有效地處理消息。 我敢肯定,還有更多課程可供我學習,我將像我一樣記錄下來。
整個示例可在我的github存儲庫中找到 -https://github.com/bijukunjummen/boot-with-sns-sqs
翻譯自: https://www.javacodegeeks.com/2020/04/processing-sqs-messages-using-spring-boot-and-project-reactor-part-2.html
總結
以上是生活随笔為你收集整理的使用Spring Boot和Project Reactor处理SQS消息-第2部分的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 二恶英是什么东西(简述二恶英的危害以及主
- 下一篇: 纬度最高的是什么大洲 你知道吗