javascript
艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~
本文在提供完整代碼示例,可見 https://github.com/YunaiV/SpringBoot-Labs 的 lab-27 目錄。
原創(chuàng)不易,給點個 Star 嘿,一起沖鴨!
1. 概述
友情提示:Reactive Programming ,翻譯為反應式編程,又稱為響應式編程。本文,我們統(tǒng)一使用響應式。不過,比較正確的叫法還是反應式。
Spring Framework?5?在 2017 年 9 月份,發(fā)布了 GA 通用版本。既然是一個新的大版本,必然帶來了非常多的改進,其中比較重要的一點,就是將響應式編程帶入了 Spring 生態(tài)。又或者說,將響應式編程“真正”帶入了 Java 生態(tài)之中。
在此之前,相信絕大多數(shù) Java 開發(fā)者,對響應式編程的概念是非常模糊的。甚至說,截止到目前 2019 年 11 月份,對于國內(nèi)的 Java 開發(fā)者,也是知之甚少。
對于我們來說,最早看到的就是 Spring5?提供了一個新的 Web 框架,基于響應式編程的 Spring WebFlux 。至此,SpringMVC 在“干掉” Struts 之后,難道要開始進入 Spring 自己的兩個 Web 框架的雙雄爭霸?
實際上,WebFlux 在出來的兩年時間里,據(jù)艿艿所了解到的情況,鮮有項目從采用 SpringMVC 遷移到 WebFlux ,又或者新項目直接采用 WebFlux 。這又是為什么呢?
艿艿:V2EX 上還有這樣一個討論 《現(xiàn)在有公司在使用 Spring Boot 2.0 的 WebFlux 嗎?》 。
響應式編程,對我們現(xiàn)有的編程方式,是一場顛覆,對于框架也是。
-
在 Spring 提供的框架中,實際并沒有全部實現(xiàn)好對響應式編程的支持。例如說,Spring Transaction 事務組件,在 Spring 5.2 M2 版本,才提供了支持響應式編程的 ReactiveTransactionManager 事務管理器。
-
更不要說,Java 生態(tài)常用的框架,例如說 MyBatis、Jedis 等等,都暫未提供響應式編程的支持。
所以,WebFlux 想要能夠真正普及到我們的項目中,不僅僅需要 Spring 自己體系中的框架提供對響應式編程的很好的支持,也需要 Java 生態(tài)中的框架也要做到如此。例如說:
艿艿:😈 Java 框架存在大量基于 ThreadLocal 線程變量,實現(xiàn)參數(shù)的透傳,改造的成本,實際是不小的。
當然,即使如此,這也并不妨礙我們來對 WebFlux 進行一個小小的入門。畢竟,響應式編程這把火,終將熊熊燃起,燒死那些異性戀。哈哈哈~
艿艿:下面的會涉及比較多的概念,不想看的胖友,直接跳到 「2. 快速入門」 小節(jié),直接開始 WebFlux 的入門。
1.1 響應式編程
我們先簡單來了解下響應式編程的相關(guān)姿勢,以保證能夠看懂 WebFlux 入門的代碼示例,哈哈哈~
維基百科對響應式編程定義如下:
FROM https://en.wikipedia.org/wiki/Reactive_programming
Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).
反應式編程是一種異步編程范式,它關(guān)注數(shù)據(jù)流和變化的傳播。這意味著可以通過使用編程語言輕松地表示靜態(tài)(如數(shù)組)或動態(tài)(如事件發(fā)射器)數(shù)據(jù)流。
Spring 官方文檔對響應式編程定義如下:
FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html#web-reactive-programming
In plain terms reactive programming is about non-blocking applications that are asynchronous and event-driven and require a small number of threads to scale vertically (i.e. within the JVM) rather than horizontally (i.e. through clustering).
簡單地說,響應式編程是關(guān)于非阻塞應用程序的,這些應用程序是異步的、事件驅(qū)動的,并且需要少量的線程來垂直伸縮(即在 JVM 中),而不是水平伸縮(即通過集群)。
😈 兩個看起來都不很易懂。不過如果胖友看過 Netty 框架的介紹,會發(fā)現(xiàn)跟 Spring 的描述非常相像。定義如下:
FROM https://www.oschina.net/p/netty
Netty 是一個 Java 開源框架。Netty 提供異步的、事件驅(qū)動的網(wǎng)絡應用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡服務器和客戶端程序。
是不是都看到了異步 + 事件驅(qū)動。本質(zhì)上,Netty 也是有基于響應式編程的思想。所以在下文中,我們會看到,可以使用 Netty 作為 WebFlux 的服務器。
嗶嗶了這么多,艿艿來用簡單但不完全精準的語言嘗試下。以后端 API 請求的處理來舉例子。
-
在現(xiàn)在主流的編程模型中,請求是被同步阻塞處理完成,返回結(jié)果給前端。
-
在響應式的編程模型中,請求是被作為一個事件丟到線程池中執(zhí)行,等到執(zhí)行完畢,異步回調(diào)結(jié)果給主線程,最后返回給前端。
通過這樣的方式,主線程(實際是多個,這里只是方便描述哈)不斷接收請求,不負責直接同步阻塞處理,從而避免自身被阻塞。
1.2 Reactor 框架
在 Java 生態(tài)中,提供響應式編程的框架主要有 Reactor、RxJava、JDK9 Flow API 。
那么,Spring 會選擇哪個框架作為其響應式編程的基礎呢?
-
首先,可以排除 JDK9 Flow API ,因為 Spring5 要支持 JDK8 版本開始。
-
其次,Reactor 是 Spring?母公司 Pivotal(咳咳咳,2019 年竟然被 VMWare 收購了)是開源的框架,所以必然是“強強聯(lián)合”,嘿嘿。
如果胖友想要了解 Reactor 和 RxJava 的對比,可以看看 《八個層面比較 Java 8, RxJava, Reactor》 文章,挺詳細的。
讓我們一起來看看 Reactor 官方對自己的介紹:
FROM https://projectreactor.io/
Reactor is a fourth-generation Reactive library for building non-blocking applications on the JVM based on the Reactive Streams Specification
Reactor 是一個第四代響應式編程框架,用于構(gòu)建非阻塞 JVM 應用程序,基于 Reactive Streams Specification 來實現(xiàn)。
Reactor Operators and Schedulers can sustain high throughput rates on the order of 10's of millions of messages per second.
Reactor 的操作和調(diào)度可以提供每秒千萬條消息的高吞吐量。
Plus its low memory footprint should go under most of the radars.
再加上它的低內(nèi)存占用,應該在大多數(shù)雷達(radars)之下??瓤瓤?#xff0c;這個 radars 怎么翻譯。
簡單來說,Reactor 說是一個響應式編程框架,又快又不占用內(nèi)存的那種。😈
關(guān)于 Reactor 的使用,這里艿艿就不過多介紹,感興趣的胖友,可以看看 《使用 Reactor 進行反應式編程》 文章。如下是對其中的一段內(nèi)容的節(jié)選并修改:
Reactor 有兩個非常重要的基本概念:
-
Flux ,表示的是包含 0 到 N 個元素的異步序列。當消息通知產(chǎn)生時,訂閱者(Subscriber)中對應的方法?#onNext(t),?#onComplete(t)?和?#onError(t)?會被調(diào)用。
-
Mono 表示的是包含 0 或者 1 個元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。
-
同時,Flux 和 Mono 之間可以進行轉(zhuǎn)換。例如:
-
對一個 Flux 序列進行計數(shù)操作,得到的結(jié)果是一個?Mono<Long>?對象。
-
把兩個 Mono 序列合并在一起,得到的是一個 Flux 對象。
-
😈 其實,可以先暫時簡單把 Mono 理解成 Object ,Flux 理解成 List 。嘿嘿~
1.3 Spring WebFlux
Spring 官方文檔對 Spring WebFlux 介紹如下:
FROM https://docs.spring.io/spring-framework/docs/5.0.0.BUILD-SNAPSHOT/spring-framework-reference/html/web-reactive.html
Spring Framework 5 includes a new spring-webflux module. The module contains support for reactive HTTP and WebSocket clients as well as for reactive server web applications including REST, HTML browser, and WebSocket style interactions.
Spring Framework 5 提供了一個新的?spring-webflux?模塊。該模塊包含了:
-
對響應式支持的 HTTP 和 WebSocket 客戶端。
-
對響應式支持的 Web 服務器,包括 Rest API、HTML 瀏覽器、WebSocket 等交互方式。
On the server-side WebFlux supports 2 distinct programming models:
-
Annotation-based with @Controller and the other > annotations supported also with Spring MVC
-
Functional, Java 8 lambda style routing and handling
在服務端方面,WebFlux 提供了 2 種編程模型(翻譯成使用方式,可能更易懂):
-
方式一,基于 Annotated Controller 方式實現(xiàn):基于?@Controller?和 SpringMVC 使用的其它注解。😈 也就是說,我們大體上可以像使用 SpringMVC 的方式,使用 WebFlux 。
-
方式二,基于函數(shù)式編程方式:函數(shù)式,Java 8 lambda 表達式風格的路由和處理。😈 可能有點晦澀,晚點我們看了示例就會明白。
Both programming models are executed on the same reactive foundation that adapts non-blocking HTTP runtimes to the Reactive Streams API.
這兩個編程模型,都是在同一個響應式基礎(foundation)上執(zhí)行的,該基礎將非阻塞 HTTP 運行時(runtime)適配成響應式 API 。😈 簡單來說,就是將原有的 API ,使用 Reactor 封裝成響應式 API ,讓我們開發(fā)者使用更加便捷。
The diagram below shows the server-side stack including traditional, Servlet-based Spring MVC on the left from the spring-webmvc module and also the reactive stack on the right from the spring-webflux module.
下圖顯示了服務端的技術(shù)棧,左側(cè)是?spring-webmvc?模塊中傳統(tǒng)的、基于 Servlet 的 Spring MVC ,右側(cè)是?spring-webflux?模塊中的響應式技術(shù)棧。
webflux-overview
-
😈 仔細看第一層的兩個框框,分別是上面提到的 WebFlux 的兩種編程模型。表達的是 SpringMVC 不支持 Router Functions 方式,而 WebFlux 支持。
WebFlux can run on Servlet containers with support for the Servlet 3.1 Non-Blocking IO API as well as on other async runtimes such as Netty and Undertow.
WebFlux 可以運行在:
-
支持 Servlet 3.1 非阻塞 IO API 的 Servlet 容器上
-
也可以運行在支持異步運行時的,例如說 Netty 或者 Undertow 上
Each runtime is adapted to a reactive ServerHttpRequest and ServerHttpResponse exposing the body of the request and response as Flux, rather than InputStream and OutputStream, with reactive backpressure.
每一個運行時(runtime)適用于將響應式的 ServerHttpRequest 和 ServerHttpResponse 中 request 和 response 的 body 暴露成?Flux<DataBuffer>?對象,而不是 InputStream ?和 InputStream ?對象,可用于響應式中的背壓(backpressure)。😈 這段有點晦澀,簡單來說:
-
對于 Servlet 來說,?ServletRequest#getInputStream()?方法,獲得請求的主體內(nèi)容返回的是 InputStream 對象。
-
對于 WebFlux 來說,ServerHttpRequest#getBody()?方法,獲得請求的主體內(nèi)容返回的是?Flux<DataBuffer>?對象。
REST-style JSON and XML serialization and deserialization is supported on top as a?Flux<Object>, and so is HTML view rendering and Server-Sent Events.
REST 風格 API 使用到的 JSON 和 XML 序列化和反序列化,需要提供對?Flux<Object>?的支持。對于 HTML 渲染,和 SSE 也要提供對?Flux<Object>?的支持。
😈 咳咳咳,看完了這一大段,是不是突然有點想捶死艿艿,說的什么 XX 玩樣啊!其實,在我們初學 SpringMVC 的時候,也是一臉懵逼的學完。隨著我們對 SpringMVC 的日趨熟練,逐步對其提供的組件、原理、源碼慢慢熟悉。所以,對于我們來說,WebFlux 乃至響應式編程來說,都是足夠新穎的知識,我們要抱著空杯心態(tài),「Stay Hungry, Stay Foolish」 。
如果胖友的時間比較充分,可以選擇把 《Spring 文檔 —— Web on Reactive Stack》 仔細看看,詳盡的介紹了 Spring 在 Web 方面,響應式相關(guān)的技術(shù)棧。
雖然說上面我們在介紹 WebFlux ,把它搞的很復雜,實際在快速入門使用它,還是非常簡單的。下面,開始讓我們開始愉快的快速入門下~
艿艿:考慮到艿艿之前已經(jīng)寫了 《芋道 Spring Boot SpringMVC 入門》 文章,所以本文我們提供的示例,盡量覆蓋到在 SpringMVC 提到的內(nèi)容。
當然,很多相似的概念,艿艿也不重復介紹,不然顯得我老啰嗦了。
2. 快速入門
示例代碼對應倉庫:lab-27-webflux-01 。
本小節(jié),我們會使用?spring-boot-starter-webflux?實現(xiàn) WebFlux 的自動化配置。然后實現(xiàn)用戶的增刪改查接口。接口列表如下:
| GET | /users/list | 查詢用戶列表 |
| GET | /users/get | 獲得指定用戶編號的用戶 |
| POST | /users/add | 添加用戶 |
| POST | /users/update | 更新指定用戶編號的用戶 |
| POST | /users/delete | 刪除指定用戶編號的用戶 |
下面,開始遨游~
2.1 引入依賴
在?pom.xml?文件中,引入相關(guān)依賴。
<?xml?version="1.0"?encoding="UTF-8"?> <project?xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0?http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.1.RELEASE</version><relativePath/>?<!--?lookup?parent?from?repository?--></parent><modelVersion>4.0.0</modelVersion><artifactId>lab-27-webflux-01</artifactId><dependencies><!--?實現(xiàn)對?Spring?WebFlux?的自動化配置?--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId></dependency><!--?方便等會寫單元測試?--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies></project>-
具體每個依賴的作用,胖友自己認真看下艿艿添加的所有注釋噢。
我們使用 IDEA Maven 插件 ,查看下?spring-boot-starter-webflux?依賴中,所引入的依賴。如下圖所示:
-
引入?reactor-core?依賴,使用 Reactor 作為 WebFlux 的響應式框架的基礎。
-
引入?spring-boot-starter-reactor-netty?依賴,使用 Netty 構(gòu)建 WebFlux 的 Web 服務器。其中 RxNetty 庫,是基于 Reactor 的響應式框架的基礎之上,提供出 Netty 的響應式 API 。
當然,我們除了使用可以使用其它作為 WebFlux 的 Web 服務器,如下表格:
| Netty | Netty API | Reactor Netty |
| Undertow | Undertow API | spring-web: Undertow to Reactive Streams bridge |
| Tomcat | Servlet 3.1 non-blocking I/O; Tomcat API to read and write ByteBuffers vs byte[] | spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge |
| Jetty | Servlet 3.1 non-blocking I/O; Jetty API to write ByteBuffers vs byte[] | spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge |
| Servlet 3.1 container | Servlet 3.1 non-blocking I/O | spring-web: Servlet 3.1 non-blocking I/O to Reactive Streams bridge |
-
當然,也需要基于 Reactor 的響應式框架的基礎之上,封裝相應的響應式 API 。
可能胖友會有疑惑,為什么 WebFlux 運行在 Servlet 容器上時,需要?Servlet 3.1+?以上的容器呢?在 Servlet 3.1 規(guī)范發(fā)布時,它定義了非常重要的特性,Non-blocking I/O 非阻塞 IO ,提供了異步處理請求的支持。我們來詳細展開下:
-
在 Servlet 3.1 規(guī)范之前的版本,請求是只能被 Servlet 同步阻塞處理完成,返回結(jié)果給前端。
-
在 Servlet 3.1 規(guī)范開始的版本,請求是允許被 Servlet 丟到線程池中處執(zhí)行,等到執(zhí)行完畢,異步回調(diào)結(jié)果給 Servlet ,最后返回給前端。
艿艿:推薦胖友在閱讀完本文之后,可以看看 《Servlet 3.0/3.1 中的異步處理》 文章,可以對 WebFlux 有更好的理解。
2.2 Application
創(chuàng)建?Application.java?類,配置?@SpringBootApplication?注解即可。代碼如下:
//?Application.java@SpringBootApplication public?class?Application?{public?static?void?main(String[]?args)?{SpringApplication.run(Application.class,?args);}}-
先暫時不啟動項目。等我們添加好 API 接口。
在 「1.3 Spring WebFlux」 小節(jié)中,我們提到了 WebFlux 有兩種編程模型,分別是:
-
方式一,基于 Annotated Controller 方式實現(xiàn)
-
方式二,基于函數(shù)式編程方式
我們分別在下面兩個小節(jié)來看看。
2.3 基于 Annotated Controller 方式實現(xiàn)
在?cn.iocoder.springboot.lab27.springwebflux.controller?包路徑下,創(chuàng)建 UserController 類。代碼如下:
//?UserController.java@RestController @RequestMapping("/users") public?class?UserController?{/***?查詢用戶列表**?@return?用戶列表*/@GetMapping("/list")public?Flux<UserVO>?list()?{//?查詢列表List<UserVO>?result?=?new?ArrayList<>();result.add(new?UserVO().setId(1).setUsername("yudaoyuanma"));result.add(new?UserVO().setId(2).setUsername("woshiyutou"));result.add(new?UserVO().setId(3).setUsername("chifanshuijiao"));//?返回列表return?Flux.fromIterable(result);}/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/@GetMapping("/get")public?Mono<UserVO>?get(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername("username:"?+?id);//?返回return?Mono.just(user);}/***?添加用戶**?@param?addDTO?添加用戶信息?DTO*?@return?添加成功的用戶編號*/@PostMapping("add")public?Mono<Integer>?add(@RequestBody?Publisher<UserAddDTO>?addDTO)?{//?插入用戶記錄,返回編號Integer?returnId?=?1;//?返回用戶編號return?Mono.just(returnId);}/***?更新指定用戶編號的用戶**?@param?updateDTO?更新用戶信息?DTO*?@return?是否修改成功*/@PostMapping("/update")public?Mono<Boolean>?update(@RequestBody?Publisher<UserUpdateDTO>?updateDTO)?{//?更新用戶記錄Boolean?success?=?true;//?返回更新是否成功return?Mono.just(success);}/***?刪除指定用戶編號的用戶**?@param?id?用戶編號*?@return?是否刪除成功*/@PostMapping("/delete")?//?URL?修改成?/delete?,RequestMethod?改成?DELETEpublic?Mono<Boolean>?delete(@RequestParam("id")?Integer?id)?{//?刪除用戶記錄Boolean?success?=?false;//?返回是否更新成功return?Mono.just(success);}}-
在類和方法上,我們添加了?@Controller?和 SpringMVC 在使用的?@GetMapping?和?PostMapping?等注解,提供 API 接口,這個和我們在使用 SpringMVC 是一模一樣的。
-
在?dto?和?vo?包下,有 API 使用到的 DTO 和 VO 類。
-
😈 因為是入門示例,我們會發(fā)現(xiàn)代碼十分簡單,保持淡定。在后文中,我們會提供和 Spring Data JPA、Spring Data MongoDB、Spring Data Redis 等等整合的示例。
-
#list()?方法,我們最終調(diào)用?Flux#fromIterable(Iterable<? extends T> it)?方法,將 List 包裝成 Flux 對象返回。
-
#get(Integer id)?方法,我們最終調(diào)用?Mono#just(T data)?方法,將 UserVO 包裝成 Mono 對象返回。
-
#add(Publisher<UserAddDTO> addDTO)?方法,參數(shù)為 Publisher 類型,泛型為 UserAddDTO 類型,并且添加了?@RequestBody?注解,從 request 的 Body 中讀取參數(shù)。注意,此時提交參數(shù)需要使用?"application/json"?等 Content-Type 內(nèi)容類型。
- #add(...)?方法,也可以使用?application/x-www-form-urlencoded?或?multipart/form-data?這兩個 Content-Type 內(nèi)容類型,通過 request 的 Form Data 或 Multipart Data 傳遞參數(shù)。代碼如下: //?UserController.java/***?添加用戶**?@param?addDTO?添加用戶信息?DTO*?@return?添加成功的用戶編號*/
@PostMapping("add2")
public?Mono<Integer>?add(Mono<UserAddDTO>?addDTO)?{//?插入用戶記錄,返回編號Integer?returnId?=?UUID.randomUUID().hashCode();//?返回用戶編號return?Mono.just(returnId);
}
-
此時,參數(shù)為 Mono 類型,泛型為 UserAddDTO 類型。
-
當然,我們也可以直接使用參數(shù)為 UserAddDTO 類型。如果后續(xù)需要使用到 Reactor API ,則我們自己主動調(diào)用?Mono#just(T data)?方法,封裝出 Publisher 對象。😈 注意,Flux 和 Mono 都實現(xiàn)了 Publisher 接口。
-
可能有胖友不了解 request Form Data、Multipart Data 和 request Body 的差異,可以看看 《HTTP 請求中 request payload 和 formData 區(qū)別?》 文章。
-
WebFlux 對于 Form Data ,在 《Web on Reactive Stack —— Spring WebFlux —— Form Data》 有簡短說明。
-
WebFlux 對于 Multipart Data ?,在 《Web on Reactive Stack —— Spring WebFlux —— Multipart Data 》 有簡短說明。
-
-
#update(Publisher<UserUpdateDTO> updateDTO)?方法,和?#add(Publisher<UserAddDTO> addDTO)?方法一致,就不重復贅述。
-
#delete(Integer id)?方法,和?#get(Integer id)?方法一致,就不重復贅述。
2.4 基于函數(shù)式編程方式
在?cn.iocoder.springboot.lab27.springwebflux.controller?包路徑下,創(chuàng)建 UserRouter 類。代碼如下:
//?UserRouter.java@Configuration public?class?UserRouter?{@Beanpublic?RouterFunction<ServerResponse>?userListRouterFunction()?{return?RouterFunctions.route(RequestPredicates.GET("/users2/list"),new?HandlerFunction<ServerResponse>()?{@Overridepublic?Mono<ServerResponse>?handle(ServerRequest?request)?{//?查詢列表List<UserVO>?result?=?new?ArrayList<>();result.add(new?UserVO().setId(1).setUsername("yudaoyuanma"));result.add(new?UserVO().setId(2).setUsername("woshiyutou"));result.add(new?UserVO().setId(3).setUsername("chifanshuijiao"));//?返回列表return?ServerResponse.ok().bodyValue(result);}});}@Beanpublic?RouterFunction<ServerResponse>?userGetRouterFunction()?{return?RouterFunctions.route(RequestPredicates.GET("/users2/get"),new?HandlerFunction<ServerResponse>()?{@Overridepublic?Mono<ServerResponse>?handle(ServerRequest?request)?{//?獲得編號Integer?id?=?request.queryParam("id").map(s?->?StringUtils.isEmpty(s)???null?:?Integer.valueOf(s)).get();//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername(UUID.randomUUID().toString());//?返回列表return?ServerResponse.ok().bodyValue(user);}});}@Beanpublic?RouterFunction<ServerResponse>?demoRouterFunction()?{return?route(GET("/users2/demo"),?request?->?ok().bodyValue("demo"));}}-
在類上,添加?@Configuration?注解,保證該類中的 Bean 們,都被掃描到。
-
在每個方法中,我們都通弄?RouterFunctions#route(RequestPredicate predicate, HandlerFunction<T> handlerFunction)?方法,定義了一條路由。
-
第一個參數(shù)?predicate?參數(shù),是 RequestPredicate 類型,請求謂語,用于匹配請求??梢酝ㄟ^ RequestPredicates 來構(gòu)建各種條件。
-
第二個參數(shù)?handlerFunction?參數(shù),是 RouterFunction 類型,處理器函數(shù)。
-
-
每個方法定義的路由,胖友自己看下代碼,一眼能看的明白。一般來說,采用第三個方法的寫法,更加簡潔。注意,需要使用?static import?靜態(tài)引入,代碼如下:
import?static?org.springframework.web.reactive.function.server.RequestPredicates.*; import?static?org.springframework.web.reactive.function.server.RouterFunctions.*; import?static?org.springframework.web.reactive.function.server.ServerResponse.*;
一般來說,艿艿更加推薦基于 Annotated Controller 方式實現(xiàn)的編程方式,更符合我們現(xiàn)在的開發(fā)習慣,學習成本也相對低一些。同時,和 API 接口文檔工具 Swagger 也更容易集成。
😈 有沒覺得每個 HandlerFunction 函數(shù),和每個 Servlet 有點像。
更多基于函數(shù)式編程方式的示例,可以看看如下兩篇文章:
-
《Introduction to the Functional Web Framework in Spring 5》
-
《Spring Boot RouterFunction tutorial》
3. 測試接口
示例代碼對應倉庫:lab-27-webflux-01 。
在開發(fā)完接口,我們會進行接口的自測。一般情況下,我們先啟動項目,然后使用 Postman、curl、瀏覽器,手工模擬請求后端 API 接口。
實際上,WebFlux 提供了 Web 測試客戶端 WebTestClient 類,方便我們快速測試接口。下面,我們對 UserController 提供的接口,進行下單元測試。也就是說,本小節(jié),我們會繼續(xù)在 lab-27-webflux-01 示例的基礎上修改。
MockMvc 提供了集成測試和單元測試的能力,我們分成 「3.1 集成測試」 和 「3.2 單元測試」 來看。如果胖友對測試這塊不太了解,可以看看如下兩篇文章:
-
《小談 Java 單元測試》
-
《談談單元測試》
3.1 集成測試
創(chuàng)建 UserControllerTest 測試類,我們來測試一下簡單的 UserController 的每個操作。核心代碼如下:
//?UserControllerTest.java@RunWith(SpringRunner.class) @SpringBootTest(classes?=?Application.class) @AutoConfigureWebFlux @AutoConfigureWebTestClient public?class?UserControllerTest?{@Autowiredprivate?WebTestClient?webClient;@Testpublic?void?testList()?{webClient.get().uri("/users/list").exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("[\n"?+"????{\n"?+"????????\"id\":?1,\n"?+"????????\"username\":?\"yudaoyuanma\"\n"?+"????},\n"?+"????{\n"?+"????????\"id\":?2,\n"?+"????????\"username\":?\"woshiyutou\"\n"?+"????},\n"?+"????{\n"?+"????????\"id\":?3,\n"?+"????????\"username\":?\"chifanshuijiao\"\n"?+"????}\n"?+"]");?//?響應結(jié)果}@Testpublic?void?testGet()?{//?獲得指定用戶編號的用戶webClient.get().uri("/users/get?id=1").exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("{\n"?+"????\"id\":?1,\n"?+"????\"username\":?\"username:1\"\n"?+"}");?//?響應結(jié)果}@Testpublic?void?testGet2()?{//?獲得指定用戶編號的用戶webClient.get().uri("/users/v2/get?id=1").exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("{\n"?+"????\"id\":?1,\n"?+"????\"username\":?\"test\"\n"?+"}");?//?響應結(jié)果}@Testpublic?void?testAdd()?{Map<String,?Object>?params?=?new?HashMap<>();params.put("username",?"yudaoyuanma");params.put("password",?"nicai");//?添加用戶webClient.post().uri("/users/add").bodyValue(params).exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("1");?//?響應結(jié)果。因為沒有提供 content 的比較,所以只好使用 json 來比較。竟然能通過}@Testpublic?void?testAdd2()?{?//?發(fā)送文件的測試,可以參考?https://dev.to/shavz/sending-multipart-form-data-using-spring-webtestclient-2gb7?文章BodyInserters.FormInserter<String>?formData?=?//?Form?Data?數(shù)據(jù),需要這么拼湊BodyInserters.fromFormData("username",?"yudaoyuanma").with("password",?"nicai");//?添加用戶webClient.post().uri("/users/add2").body(formData).exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("1");?//?響應結(jié)果。因為沒有提供 content 的比較,所以只好使用 json 來比較。竟然能通過}@Testpublic?void?testUpdate()?{Map<String,?Object>?params?=?new?HashMap<>();params.put("id",?1);params.put("username",?"yudaoyuanma");//?修改用戶webClient.post().uri("/users/update").bodyValue(params).exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody(Boolean.class)?//?期望返回值類型是?Boolean.consumeWith((Consumer<EntityExchangeResult<Boolean>>)?result?->?//?通過消費結(jié)果,判斷符合是 true 。Assert.assertTrue("返回結(jié)果需要為?true",?result.getResponseBody()));}@Testpublic?void?testDelete()?{//?刪除用戶webClient.post().uri("/users/delete?id=1").exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody(Boolean.class)?//?期望返回值類型是?Boolean.isEqualTo(true);?//?這樣更加簡潔一些 //??????????????? .consumeWith((Consumer<EntityExchangeResult<Boolean>>) result ->?//?通過消費結(jié)果,判斷符合是 true 。 //????????????????????????Assert.assertTrue("返回結(jié)果需要為?true",?result.getResponseBody()));}}-
在類上,我們添加了?@AutoConfigureWebTestClient?注解,用于自動化配置我們稍后注入的 WebTestClient Bean 對象?webClient?。在后續(xù)的測試中,我們會看到都是通過?webClient?調(diào)用后端 API 接口。而每一次調(diào)用后端 API 接口,都會執(zhí)行真正的后端邏輯。因此,整個邏輯,走的是集成測試,會啟動一個真實的 Spring 環(huán)境。
-
每次 API 接口的請求,都通過 RequestHeadersSpec 來構(gòu)建。構(gòu)建完成后,通過?RequestHeadersSpec#exchange()?方法來執(zhí)行請求,返回 ResponseSpec 結(jié)果。
-
WebTestClient 的?#get()、#head()、#delete()、#options()?方法,返回的是 RequestHeadersUriSpec 對象。
-
WebTestClient 的?#post()、#put()、#delete()、#patch()?方法,返回的是 RequestBodyUriSpec 對象。
-
RequestHeadersUriSpec 和 RequestBodyUriSpec 都繼承了 RequestHeadersSpec 接口。
-
-
執(zhí)行完請求后,通過調(diào)用 RequestBodyUriSpec 的各種斷言方法,添加對結(jié)果的預期,相當于做斷言。如果不符合預期,則會拋出異常,測試不通過。
3.2 單元測試
為了更好的展示 WebFlux 單元測試的示例,我們需要改寫 UserController 的代碼,讓其會依賴 UserService 。修改點如下:
-
在?cn.iocoder.springboot.lab27.springwebflux.service?包路徑下,創(chuàng)建 UserService 類。代碼如下:
//?UserService.java@Service public?class?UserService?{public?UserVO?get(Integer?id)?{return?new?UserVO().setId(id).setUsername("test");}} -
在 UserController 類中,增加?GET /users/v2/get?接口,獲得指定用戶編號的用戶。代碼如下:
//?UserController.java@Autowired private?UserService?userService;/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/ @GetMapping("/v2/get") public?Mono<UserVO>?get2(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?userService.get(id);//?返回return?Mono.just(user); }-
在代碼中,我們注入了 UserService Bean 對象?userService?,然后在新增的接口方法中,會調(diào)用?UserService#get(Integer id)?方法,獲得指定用戶編號的用戶。
-
創(chuàng)建 UserControllerTest2 測試類,我們來測試一下簡單的 UserController 的新增的這個 API 操作。代碼如下:
//?UserControllerTest2.java@RunWith(SpringRunner.class) @WebFluxTest(UserController.class) public?class?UserControllerTest2?{@Autowiredprivate?WebTestClient?webClient;@MockBeanprivate?UserService?userService;@Testpublic?void?testGet2()?throws?Exception?{//?Mock?UserService?的?get?方法System.out.println("before?mock:"?+?userService.get(1));?//?<1.1>Mockito.when(userService.get(1)).thenReturn(new?UserVO().setId(1).setUsername("username:1"));?//?<1.2>System.out.println("after?mock:"?+?userService.get(1));?//?<1.3>//?查詢用戶列表webClient.get().uri("/users/v2/get?id=1").exchange()?//?執(zhí)行請求.expectStatus().isOk()?//?響應狀態(tài)碼?200.expectBody().json("{\n"?+"????\"id\":?1,\n"?+"????\"username\":?\"username:1\"\n"?+"}");?//?響應結(jié)果}}-
在類上添加?@WebFluxTest?注解,并且傳入的是 UserController 類,表示我們要對 UserController 進行單元測試。
-
同時,@WebFluxTest?注解,是包含了?@UserController?的組合注解,所以它會自動化配置我們稍后注入的 WebTestClient Bean 對象?mvc?。在后續(xù)的測試中,我們會看到都是通過?webClient?調(diào)用后端 API 接口。但是!每一次調(diào)用后端 API 接口,并不會執(zhí)行真正的后端邏輯,而是走的 Mock 邏輯。也就是說,整個邏輯,走的是單元測試,只會啟動一個?Mock?的 Spring 環(huán)境。
艿艿:注意上面每個加粗的地方!
-
userService?屬性,我們添加了?@MockBean?注解,實際這里注入的是一個使用 Mockito 創(chuàng)建的 UserService Mock 代理對象。如下圖所示:
-
打印的就是我們 Mock 返回的 UserVO 對象。
-
結(jié)果竟然返回的是?null?空。理論來說,此時應該返回一個?id = 1?的 UserVO 對象。實際上,因為此時的?userService?是通過 Mockito 來 Mock 出來的對象,其所有調(diào)用它的方法,返回的都是空。
-
UserController 中,也會注入一個 UserService 屬性,此時注入的就是該 Mock 出來的 UserService Bean 對象。
-
默認情況下,
-
<1.1>?處,我們調(diào)用?UserService#get(Integer id)?方法,然后打印返回結(jié)果。執(zhí)行結(jié)果如下:
before?mock:null -
<1.2>?處,通過 Mockito 進行 Mock?userService?的?#get(Integer id)?方法,當傳入的?id = 1?方法參數(shù)時,返回?id = 1?并且?username = "username:1"?的 UserVO 對象。
-
<1.3>?處,再次調(diào)用?UserService#get(Integer id)?方法,然后打印返回結(jié)果。執(zhí)行結(jié)果如下:
after?cn.iocoder.springboot.lab27.springwebflux.vo.UserVO@23202c31
-
-
后續(xù),使用?webClient?完成一次后端 API 調(diào)用,并進行斷言結(jié)果是否正確。執(zhí)行成功,單元測試通過。
可能胖友對單元測試不是很了解,這里在額外推薦一本書 《有效的單元測試》 。很薄,周末抽幾個小時就能讀完。
如果覺得本小節(jié)還不夠,可以看看 《SpringBoot WebFlux Test – @WebFluxTest》 文章,寫的還是不錯的。
4. 全局統(tǒng)一返回
示例代碼對應倉庫:lab-27-webflux-02 。
在我們提供后端 API 給前端時,我們需要告前端,這個 API 調(diào)用結(jié)果是否成功:
-
如果成功,成功的數(shù)據(jù)是什么。后續(xù),前端會取數(shù)據(jù)渲染到頁面上。
-
如果失敗,失敗的原因是什么。一般,前端會將原因彈出提示給用戶。
這樣,我們就需要有統(tǒng)一的返回結(jié)果,而不能是每個接口自己定義自己的風格。一般來說,統(tǒng)一的全局返回信息如下:
-
成功時,返回成功的狀態(tài)碼?+?數(shù)據(jù)。
-
失敗時,返回失敗的狀態(tài)碼?+?錯誤提示。
在標準的 RESTful API 的定義,是推薦使用 HTTP 響應狀態(tài)碼 返回狀態(tài)碼。一般來說,我們實踐很少這么去做,主要有如下原因:
-
業(yè)務返回的錯誤狀態(tài)碼很多,HTTP 響應狀態(tài)碼無法很好的映射。例如說,活動還未開始、訂單已取消等等。
-
國內(nèi)開發(fā)者對 HTTP 響應狀態(tài)碼不是很了解,可能只知道 200、403、404、500 幾種常見的。這樣,反倒增加學習成本。
所以,實際項目在實踐時,我們會將狀態(tài)碼放在 Response Body?響應內(nèi)容中返回。
在全局統(tǒng)一返回里,我們至少需要定義三個字段:
-
code:狀態(tài)碼。無論是否成功,必須返回。
關(guān)于這一塊,也有團隊實踐時,增加了?success?字段,通過?true?和?false?表示成功還是失敗。這個看每個團隊的習慣吧。艿艿的話,還是偏好基于約定,返回 0 時表示成功。
-
成功時,狀態(tài)碼為 0 。
-
失敗時,對應業(yè)務的錯誤碼。
-
-
data:數(shù)據(jù)。成功時,返回該字段。
-
message:錯誤提示。失敗時,返回該字段。
那么,讓我們來看兩個示例:
//?成功響應 {code:?0,data:?{id:?1,username:?"yudaoyuanma"} }//?失敗響應 {code:?233666,message:?"徐媽太丑了" }下面,我們來看一個示例。
艿艿:考慮到不破壞 「2. 快速入門」 和 「3. 測試接口」 提供的示例,我們需要重新弄搭建一個。
4.1 引入依賴
在 「2.2 引入依賴」 一致。
4.2 Application
在 「2.3 Application」 一致。
4.3 CommonResult
在?cn.iocoder.springboot.lab27.springwebflux.core.vo?包路徑,創(chuàng)建 CommonResult 類,用于全局統(tǒng)一返回。代碼如下:
//?CommonResult.javapublic?class?CommonResult<T>?implements?Serializable?{public?static?Integer?CODE_SUCCESS?=?0;/***?錯誤碼*/private?Integer?code;/***?錯誤提示*/private?String?message;/***?返回數(shù)據(jù)*/private?T?data;/***?將傳入的?result?對象,轉(zhuǎn)換成另外一個泛型結(jié)果的對象**?因為 A 方法返回的 CommonResult 對象,不滿足調(diào)用其的 B 方法的返回,所以需要進行轉(zhuǎn)換。**?@param?result?傳入的?result?對象*?@param?<T>?返回的泛型*?@return?新的?CommonResult?對象*/public?static?<T>?CommonResult<T>?error(CommonResult<?>?result)?{return?error(result.getCode(),?result.getMessage());}public?static?<T>?CommonResult<T>?error(Integer?code,?String?message)?{Assert.isTrue(!CODE_SUCCESS.equals(code),?"code 必須是錯誤的!");CommonResult<T>?result?=?new?CommonResult<>();result.code?=?code;result.message?=?message;return?result;}public?static?<T>?CommonResult<T>?success(T?data)?{CommonResult<T>?result?=?new?CommonResult<>();result.code?=?CODE_SUCCESS;result.data?=?data;result.message?=?"";return?result;}@JsonIgnore?//?忽略,避免?jackson?序列化給前端public?boolean?isSuccess()?{?//?方便判斷是否成功return?CODE_SUCCESS.equals(code);}@JsonIgnore?//?忽略,避免?jackson?序列化給前端public?boolean?isError()?{?//?方便判斷是否失敗return?!isSuccess();}//?...?省略?setting/getting/toString?方法}-
每個字段,胖友自己看相應的注釋。
4.4 GlobalResponseBodyHandler
在?cn.iocoder.springboot.lab27.springwebflux.core.web?包路徑,創(chuàng)建 GlobalResponseBodyHandler 類,全局統(tǒng)一返回的處理器。代碼如下:
//?GlobalResponseBodyHandler.javapublic?class?GlobalResponseBodyHandler?extends?ResponseBodyResultHandler?{private?static?Logger?LOGGER?=?LoggerFactory.getLogger(GlobalResponseBodyHandler.class);private?static?MethodParameter?METHOD_PARAMETER_MONO_COMMON_RESULT;private?static?final?CommonResult?COMMON_RESULT_SUCCESS?=?CommonResult.success(null);static?{try?{//?<1>?獲得 METHOD_PARAMETER_MONO_COMMON_RESULT 。其中?-1 表示?`#methodForParams()`?方法的返回值METHOD_PARAMETER_MONO_COMMON_RESULT?=?new?MethodParameter(GlobalResponseBodyHandler.class.getDeclaredMethod("methodForParams"),?-1);}?catch?(NoSuchMethodException?e)?{LOGGER.error("[static][獲取?METHOD_PARAMETER_MONO_COMMON_RESULT?時,找不都方法");throw?new?RuntimeException(e);}}public?GlobalResponseBodyHandler(List<HttpMessageWriter<?>>?writers,?RequestedContentTypeResolver?resolver)?{super(writers,?resolver);}public?GlobalResponseBodyHandler(List<HttpMessageWriter<?>>?writers,?RequestedContentTypeResolver?resolver,?ReactiveAdapterRegistry?registry)?{super(writers,?resolver,?registry);}@Override@SuppressWarnings("unchecked")public?Mono<Void>?handleResult(ServerWebExchange?exchange,?HandlerResult?result)?{Object?returnValue?=?result.getReturnValue();Object?body;//?<1.1>?處理返回結(jié)果為?Mono?的情況if?(returnValue?instanceof?Mono)?{body?=?((Mono<Object>)?result.getReturnValue()).map((Function<Object,?Object>)?GlobalResponseBodyHandler::wrapCommonResult).defaultIfEmpty(COMMON_RESULT_SUCCESS);//?<1.2>?處理返回結(jié)果為?Flux?的情況}?else?if?(returnValue?instanceof?Flux)?{body?=?((Flux<Object>)?result.getReturnValue()).collectList().map((Function<Object,?Object>)?GlobalResponseBodyHandler::wrapCommonResult).defaultIfEmpty(COMMON_RESULT_SUCCESS);//?<1.3>?處理結(jié)果為其它類型}?else?{body?=?wrapCommonResult(returnValue);}//?<2>return?writeBody(body,?METHOD_PARAMETER_MONO_COMMON_RESULT,?exchange);}private?static?Mono<CommonResult>?methodForParams()?{return?null;}private?static?CommonResult<?>?wrapCommonResult(Object?body)?{//?如果已經(jīng)是?CommonResult?類型,則直接返回if?(body?instanceof?CommonResult)?{return?(CommonResult<?>)?body;}//?如果不是,則包裝成?CommonResult?類型return?CommonResult.success(body);}}-
繼承 WebFlux 的 ResponseBodyResultHandler 類,因為該類將 Response 的 body 寫回給前端。所以,我們通過重寫該類的?#handleResult(ServerWebExchange exchange, HandlerResult result)?方法,將返回結(jié)果進行使用 CommonResult 包裝。
-
<1>?處,獲得?METHOD_PARAMETER_MONO_COMMON_RESULT?。其中?-1?表示?#methodForParams()?方法的返回值類型?Mono<CommonResult>?。后續(xù)我們在#handleResult(ServerWebExchange exchange, HandlerResult result)?方法中,會使用到?METHOD_PARAMETER_MONO_COMMON_RESULT?。
-
重寫?#handleResult(ServerWebExchange exchange, HandlerResult result)?方法,將返回結(jié)果進行使用 CommonResult 包裝。
-
<1.1>?處,處理返回結(jié)果為 Mono 的情況。通過調(diào)用?Mono#map(Function<? super T, ? extends R> mapper)?方法,將原返回結(jié)果,進行包裝成?CommonResult<?>?。
-
<1.2>?處,處理返回結(jié)果為 Flux 的情況。先通過調(diào)用?Flux#collectList()?方法,將其轉(zhuǎn)換成?Mono<List<T>>?對象,后續(xù)就是和?<1.1>?相同的邏輯。
-
<1.3>?處,處理結(jié)果為其它類型的情況,直接進行包裝成?CommonResult<?>?。
-
-
<2>?處,調(diào)用父類方法?#writeBody(Object body, MethodParameter bodyParameter, ServerWebExchange exchange)?方法,實現(xiàn)將結(jié)果寫回給前端。
在思路上,和 SpringMVC 使用 ResponseBodyAdvice +?@ControllerAdvice?注解,是一致的。只是說,WebFlux 暫時沒有提供這樣的方式,所以咱只好通過繼承 ResponseBodyResultHandler 類,重寫其?#handleResult(ServerWebExchange exchange, HandlerResult result)?方法,將返回結(jié)果進行使用 CommonResult 包裝。
4.5 WebFluxConfiguration
在?cn.iocoder.springboot.lab27.springwebflux.config?包路徑下,創(chuàng)建 WebFluxConfiguration 配置類。代碼如下:
//?WebFluxConfiguration.java@Configuration public?class?WebFluxConfiguration?{@Beanpublic?GlobalResponseBodyHandler?responseWrapper(ServerCodecConfigurer?serverCodecConfigurer,RequestedContentTypeResolver?requestedContentTypeResolver)?{return?new?GlobalResponseBodyHandler(serverCodecConfigurer.getWriters(),?requestedContentTypeResolver);}}-
在?#responseWrapper(serverCodecConfigurer, requestedContentTypeResolver)?方法中,我們創(chuàng)建了 4.4 GlobalResponseBodyHandler Bean 對象,實現(xiàn)對返回結(jié)果的包裝。
4.6 UserController
在?cn.iocoder.springboot.lab27.springwebflux.controller?包路徑下,創(chuàng)建 UserController 類。代碼如下:
//?UserController.java@RestController @RequestMapping("/users") public?class?UserController?{/***?查詢用戶列表**?@return?用戶列表*/@GetMapping("/list")public?Flux<UserVO>?list()?{//?查詢列表List<UserVO>?result?=?new?ArrayList<>();result.add(new?UserVO().setId(1).setUsername("yudaoyuanma"));result.add(new?UserVO().setId(2).setUsername("woshiyutou"));result.add(new?UserVO().setId(3).setUsername("chifanshuijiao"));//?返回列表return?Flux.fromIterable(result);}/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/@GetMapping("/get")public?Mono<UserVO>?get(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername("username:"?+?id);//?返回return?Mono.just(user);}/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/@GetMapping("/get2")public?Mono<CommonResult<UserVO>>?get2(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername("username:"?+?id);//?返回return?Mono.just(CommonResult.success(user));}/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/@GetMapping("/get3")public?UserVO?get3(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername("username:"?+?id);//?返回return?user;}/***?獲得指定用戶編號的用戶**?@param?id?用戶編號*?@return?用戶*/@GetMapping("/get4")public?CommonResult<UserVO>?get4(@RequestParam("id")?Integer?id)?{//?查詢用戶UserVO?user?=?new?UserVO().setId(id).setUsername("username:"?+?id);//?返回return?CommonResult.success(user);}}-
API 接口雖然比較多,但是我們可以先根據(jù)返回結(jié)果的類型,分成 Flux 和 Mono 兩類。然后,艿艿這里又創(chuàng)建了 Mono 分類的四種情況的接口,就是?/users/get、/users/get2、/users/get3、/users/get4?四個。胖友看下這四個接口的返回結(jié)果的類型,很容易就明白了。
-
在?#get(Integer id)?方法,返回的結(jié)果是 UserVO 類型。這樣,結(jié)果會被 GlobalResponseBodyHandler 攔截,包裝成 CommonResult 類型返回。請求結(jié)果如下:
{"code":?0,"message":?"","data":?{"id":?10,"username":?"username:10"} }-
會有?"message": ""?的返回的原因是,我們使用 SpringMVC 提供的 Jackson 序列化,對于 CommonResult 此時的?message = null?的情況下,會序列化它成?"message": ""?返回。實際情況下,不會影響前端處理。
-
-
在?# get2(Integer id)?方法,返回的結(jié)果是?Mono<Common<UserVO>>?類型。結(jié)果雖然也會被 GlobalResponseBodyHandler 處理,但是不會二次再重復包裝成 CommonResult 類型返回。
5. 全局異常處理
示例代碼對應倉庫:lab-27-webflux-02 。
在 「4. 全局統(tǒng)一返回」 中,我們已經(jīng)定義了使用 CommonResult 全局統(tǒng)一返回,并且看到了成功返回的示例與代碼。這一小節(jié),我們主要是來全局異常處理,最終能也是通過 CommonResult 返回。
那么,我們就不嗶嗶,直接看著示例代碼,遨游起來。
友情提示:該示例,基于 「4. 全局統(tǒng)一返回」 的 lab-27-webflux-02 的基礎上,繼續(xù)改造。
5.1 ServiceExceptionEnum
在?cn.iocoder.springboot.lab27.springwebflux.constants?包路徑,創(chuàng)建 ServiceExceptionEnum 枚舉類,枚舉項目中的錯誤碼。代碼如下:
//?ServiceExceptionEnum.javapublic?enum?ServiceExceptionEnum?{//?==========?系統(tǒng)級別?==========SUCCESS(0,?"成功"),SYS_ERROR(2001001000,?"服務端發(fā)生異常"),MISSING_REQUEST_PARAM_ERROR(2001001001,?"參數(shù)缺失"),//?==========?用戶模塊?==========USER_NOT_FOUND(1001002000,?"用戶不存在"),//?==========?訂單模塊?==========//?==========?商品模塊?==========;/***?錯誤碼*/private?int?code;/***?錯誤提示*/private?String?message;ServiceExceptionEnum(int?code,?String?message)?{this.code?=?code;this.message?=?message;}//?...?省略?getting?方法}-
因為錯誤碼是全局的,最好按照模塊來拆分。如下是艿艿在 onemall 項目的實踐:
/***?服務異常**?參考?https://www.kancloud.cn/onebase/ob/484204?文章**?一共?10?位,分成四段**?第一段,1?位,類型*??????1?-?業(yè)務級別異常*??????2?-?系統(tǒng)級別異常*?第二段,3?位,系統(tǒng)類型*??????001?-?用戶系統(tǒng)*??????002?-?商品系統(tǒng)*??????003?-?訂單系統(tǒng)*??????004?-?支付系統(tǒng)*??????005?-?優(yōu)惠劵系統(tǒng)*??????...?-?...*?第三段,3?位,模塊*??????不限制規(guī)則。*??????一般建議,每個系統(tǒng)里面,可能有多個模塊,可以再去做分段。以用戶系統(tǒng)為例子:*??????????001?-?OAuth2?模塊*??????????002?-?User?模塊*??????????003?-?MobileCode?模塊*?第四段,3?位,錯誤碼*???????不限制規(guī)則。*???????一般建議,每個模塊自增。*/
5.2 ServiceException
我們在一起討論下 Service 邏輯異常的時候,如何進行返回。這里的邏輯異常,我們指的是,例如說用戶名已經(jīng)存在,商品庫存不足等。一般來說,常用的方案選擇,有兩種:
-
封裝統(tǒng)一的業(yè)務異常類 ServiceException ,里面有錯誤碼和錯誤提示,然后進行?throws?拋出。
-
封裝通用的返回類 CommonResult ,里面有錯誤碼和錯誤提示,然后進行?return?返回。
一開始,我們選擇了 CommonResult ,結(jié)果發(fā)現(xiàn)如下情況:
-
因為 Spring?@Transactional?聲明式事務,是基于異常進行回滾的,如果使用 CommonResult 返回,則事務回滾會非常麻煩。
-
當調(diào)用別的方法時,如果別人返回的是 CommonResult 對象,還需要不斷的進行判斷,寫起來挺麻煩的。
所以,后來我們采用了拋出業(yè)務異常 ServiceException 的方式。
在?cn.iocoder.springboot.lab27.springwebflux.core.exception?包路徑,創(chuàng)建 ServiceException 異常類,繼承 RuntimeException 異常類,用于定義業(yè)務異常。代碼如下:
//?ServiceException.javapublic?final?class?ServiceException?extends?RuntimeException?{/***?錯誤碼*/private?final?Integer?code;public?ServiceException(ServiceExceptionEnum?serviceExceptionEnum)?{//?使用父類的?message?字段super(serviceExceptionEnum.getMessage());//?設置錯誤碼this.code?=?serviceExceptionEnum.getCode();}//?...?省略?getting?方法}-
提供傳入?serviceExceptionEnum?參數(shù)的構(gòu)造方法。具體的處理,看下代碼和注釋。
5.3 GlobalExceptionHandler
在?cn.iocoder.springboot.lab27.springwebflux.core.web?包路徑,創(chuàng)建 GlobalExceptionHandler 類,全局統(tǒng)一返回的處理器。代碼如下:
//?GlobalExceptionHandler.java@ControllerAdvice(basePackages?=?"cn.iocoder.springboot.lab27.springwebflux.controller") public?class?GlobalExceptionHandler?{private?Logger?logger?=?LoggerFactory.getLogger(getClass());/***?處理?ServiceException?異常*/@ResponseBody@ExceptionHandler(value?=?ServiceException.class)public?CommonResult?serviceExceptionHandler(ServiceException?ex)?{logger.debug("[serviceExceptionHandler]",?ex);//?包裝?CommonResult?結(jié)果return?CommonResult.error(ex.getCode(),?ex.getMessage());}/***?處理?ServerWebInputException?異常**?WebFlux?參數(shù)不正確*/@ResponseBody@ExceptionHandler(value?=?ServerWebInputException.class)public?CommonResult?serverWebInputExceptionHandler(ServerWebInputException?ex)?{logger.debug("[ServerWebInputExceptionHandler]",?ex);//?包裝?CommonResult?結(jié)果return?CommonResult.error(ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getCode(),ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR.getMessage());}/***?處理其它?Exception?異常*/@ResponseBody@ExceptionHandler(value?=?Exception.class)public?CommonResult?exceptionHandler(Exception?e)?{//?記錄異常日志logger.error("[exceptionHandler]",?e);//?返回?ERROR?CommonResultreturn?CommonResult.error(ServiceExceptionEnum.SYS_ERROR.getCode(),ServiceExceptionEnum.SYS_ERROR.getMessage());}}-
在 WebFlux 中,可以使用通過實現(xiàn) ResponseBodyAdvice 接口,并添加?@ControllerAdvice?接口,攔截 Controller 的返回結(jié)果。注意,我們這里?@ControllerAdvice?注解,設置了?basePackages?屬性,只攔截?"cn.iocoder.springboot.lab27.springwebflux.controller"?包,也就是我們定義的 Controller 。為什么呢?因為在項目中,我們可能會引入 Swagger 等庫,也使用 Controller 提供 API 接口,那么我們顯然不應該讓 GlobalResponseBodyHandler 去攔截這些接口,畢竟它們并不需要我們?nèi)ヌ嫠鼈冏鋈纸y(tǒng)一的返回。
-
我們定義了三個方法,通過添加?@ExceptionHandler?注解,定義每個方法對應處理的異常。并且,也添加了?@ResponseBody?注解,標記直接使用返回結(jié)果作為 API 的響應。
-
#serviceExceptionHandler(...)?方法,攔截處理 ServiceException 業(yè)務異常,直接使用該異常的?code?+?message?屬性,構(gòu)建出 CommonResult 對象返回。
-
#serverWebInputExceptionHandler(...)?方法,攔截處理 ServerWebInputException 請求參數(shù)異常,構(gòu)建出錯誤碼為?ServiceExceptionEnum.MISSING_REQUEST_PARAM_ERROR?的 CommonResult 對象返回。
-
#exceptionHandler(...)?方法,攔截處理 Exception 異常,構(gòu)建出錯誤碼為?ServiceExceptionEnum.SYS_ERROR?的 CommonResult 對象返回。這是一個兜底的異常處理,避免有一些其它異常,我們沒有在 GlobalExceptionHandler 中,提供自定義的處理方式。
注意,在?#exceptionHandler(...)?方法中,我們還多使用?logger?打印了錯誤日志,方便我們接入 ELK 等日志服務,發(fā)起告警,通知我們?nèi)ヅ挪榻鉀Q。如果胖友的系統(tǒng)里暫時沒有日志服務,可以記錄錯誤日志到數(shù)據(jù)庫中,也是不錯的選擇。而其它兩個方法,因為是更偏業(yè)務的,相對正常的異常,所以無需記錄錯誤日志。
5.4 UserController
在 UserController 類中,我們添加兩個 API 接口,拋出異常,方便我們測試全局異常處理的效果。代碼如下:
//?UserController.java/***?測試拋出?NullPointerException?異常*/ @GetMapping("/exception-01") public?UserVO?exception01()?{throw?new?NullPointerException("沒有粗面魚丸"); }/***?測試拋出?ServiceException?異常*/ @GetMapping("/exception-02") public?UserVO?exception02()?{throw?new?ServiceException(ServiceExceptionEnum.USER_NOT_FOUND); }-
在?#exception01()?方法,拋出 NullPointerException 異常。這樣,異常會被?GlobalExceptionHandler#exceptionHandler(...)?方法來攔截,包裝成 CommonResult 類型返回。請求結(jié)果如下:
{"code":?2001001000,"message":?"服務端發(fā)生異常","data":?null } -
在?#exception02()?方法,拋出 ServiceException 異常。這樣,異常會被?GlobalExceptionHandler#serviceExceptionHandler(...)?方法來攔截,包裝成 CommonResult 類型返回。請求結(jié)果如下:
{"code":?1001002000,"message":?"用戶不存在","data":?null }
5.5 簡單小結(jié)
采用?ControllerAdvice?+?@ExceptionHandler?注解的方式,可以很方便的實現(xiàn) WebFlux 的全局異常處理。不過這種方案存在一個弊端,不支持 WebFlux 的基于函數(shù)式編程方式。不過考慮到,絕大多數(shù)情況下,我們并不會采用基于函數(shù)式編程方式,所以這種方案還是沒問題的。看了下 WebFlux 的官方文檔,也是推薦這種方案,詳細可見 《Web on Reactive Stack —— Spring WebFlux —— Managing Exceptions》 。
如果胖友真的需要支持 WebFlux 的基于函數(shù)式編程方式,可以看看 《Handling Errors in Spring WebFlux》 文章,通過繼承?org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler?抽象類,實現(xiàn)自定義的全局異常處理器。
6. WebFilter 過濾器
示例代碼對應倉庫:lab-27-webflux-02 。
在 SpringMVC 中,我們可以通過實現(xiàn) HandlerInterceptor 接口,攔截 SpringMVC 處理請求的過程,自定義前置和處理的邏輯。不了解這塊的胖友,可以看看 《芋道 Spring Boot SpringMVC 入門》 的 「6. HandlerInterceptor 攔截器」 小節(jié)。
在 WebFlux 中,我們可以通過實現(xiàn) WebFilter 接口,過濾 WebFlux 處理請求的過程,自定義前置和處理的邏輯。該接口代碼如下:
//?DemoWebFilterWebFilter.java/***?Contract?for?interception-style,?chained?processing?of?Web?requests?that?may*?be?used?to?implement?cross-cutting,?application-agnostic?requirements?such*?as?security,?timeouts,?and?others.**?@author?Rossen?Stoyanchev*?@since?5.0*/ public?interface?WebFilter?{/***?Process?the?Web?request?and?(optionally)?delegate?to?the?next*?{@code?WebFilter}?through?the?given?{@link?WebFilterChain}.*?@param?exchange?the?current?server?exchange*?@param?chain?provides?a?way?to?delegate?to?the?next?filter*?@return?{@code?Mono<Void>}?to?indicate?when?request?processing?is?complete*/Mono<Void>?filter(ServerWebExchange?exchange,?WebFilterChain?chain);}-
因為 WebFilterChain 的?#filter(ServerWebExchange exchange)?方法,返回的是?Mono<Void>?對象,所以可以進行各種 Reactor 的操作??瓤瓤?#xff0c;當然需要胖友比較了解 Reactor 的使用,我們才能實現(xiàn)出的 WebFilter ,否則會覺得挺難用的。
-
另外,WebFilterChain 是由多個 WebFilter 過濾器組成的鏈,其默認的實現(xiàn)為 DefaultWebFilterChain 。
-
總體來說,從形態(tài)上和我們在 Servlet 看到的 FilterChain 和 Filter 是比較相似的,只是因為結(jié)合了 Reactor 響應式編程,所以編寫時,差異蠻大的。
6.1 DemoWebFilter
下面,讓我們來編寫一個簡單的 WebFilter 示例。
在?cn.iocoder.springboot.lab27.springwebflux.core.filter?包路徑,創(chuàng)建 DemoWebFilter 類,一個簡單的 WebFilter 示例。代碼如下:
//?DemoWebFilter.java@Component @Order(1) public?class?DemoWebFilter?implements?WebFilter?{private?Logger?logger?=?LoggerFactory.getLogger(getClass());@Overridepublic?Mono<Void>?filter(ServerWebExchange?serverWebExchange,?WebFilterChain?webFilterChain)?{//?<1>?繼續(xù)執(zhí)行請求return?webFilterChain.filter(serverWebExchange).doOnSuccess(new?Consumer<Void>()?{?//?<2>?執(zhí)行成功后回調(diào)@Overridepublic?void?accept(Void?aVoid)?{logger.info("[accept][執(zhí)行成功]");}});}}-
在類上,添加?@Component?注解,創(chuàng)建 DemoWebFilter Bean 對象。這樣,該過濾器就已經(jīng)加入了 WebFlux 的過濾器鏈中。目前,暫未內(nèi)置支持根據(jù)請求路徑 uri 等條件來配置是否過濾,需要我們自己在實現(xiàn)?#filter(serverWebExchange, webFilterChain)?方法來完成。
-
在類上,添加?@Order?注解,設置過濾器的順序。
-
實現(xiàn)?#filter(serverWebExchange, webFilterChain)?方法,實現(xiàn)在請求執(zhí)行完成后,打印一條執(zhí)行成功的日志。
-
<1>?處,調(diào)用?WebFilterChain#filter(exchange)?方法,交給過濾器鏈中的下一個過濾器,繼續(xù)進行過濾處理,并返回?Mono<Void>?對象。
-
<2>?處,調(diào)用?Mono#doOnSuccess(Consumer<? super T> onSuccess)?方法,實現(xiàn)在請求執(zhí)行完成后,打印一條執(zhí)行成功的日志。這里,我們可以參考 《Reactor 文檔 —— Mono》 ,實現(xiàn)各種其它操作。
-
😈 在后面的小節(jié)中,我們會看一個實現(xiàn)處理 Cors 跨域的 CorsWebFilter ,對理解 WebFilter 有一定的幫助。
6.2 Filtering Handler Functions
在基于函數(shù)式編程方式中,可以使用如下的方式,實現(xiàn)對每個路由的過濾處理。代碼如下:
//?UserRouter.java@Bean public?RouterFunction<ServerResponse>?demo2RouterFunction()?{return?route(GET("/users2/demo2"),?request?->?ok().bodyValue("demo")).filter(new?HandlerFilterFunction<ServerResponse,?ServerResponse>()?{@Overridepublic?Mono<ServerResponse>?filter(ServerRequest?request,?HandlerFunction<ServerResponse>?next)?{return?next.handle(request).doOnSuccess(new?Consumer<ServerResponse>()?{?//?執(zhí)行成功后回調(diào)@Overridepublic?void?accept(ServerResponse?serverResponse)?{logger.info("[accept][執(zhí)行成功]");}});}}); }因為實際場景下,使用到基于函數(shù)式編程方式比較少,這里就不擴展開來講。感興趣的胖友,可以看看 《Web on Reactive Stack —— Spring WebFlux —— Filtering Handler Functions》 文檔。
7. Servlet、Filter、Listener
目前測試下來,java.servlet?提供的 Servlet、Filter、Listener 組件,無法在 WebFlux 中使用。測試的示例,可見 lab-27-webflux-03 。
艿艿翻了下 Spring Security 對 WebFlux 的支持,也是通過實現(xiàn) WebFlux 接口的 WebFilterChainProxy 過濾器,即在 「6. WebFilter 過濾器」 中看到的內(nèi)容。
8. Cors 跨域
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
9. 集成響應式的 MongoDB
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
10. 集成響應式的 Redis
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
11. 集成響應式的 Elasticsearch
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
12. 整合響應式的 JPA
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
13. 整合響應式的 R2DBC 和事務
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
14. 其他內(nèi)容
超過微信文章長度限制,請訪問 http://www.iocoder.cn/Spring-Boot/WebFlux/
666. 彩蛋
至此,我們已經(jīng)完成了 Spring WebFlux 的簡單入門。如果用一句簡單的話來概括 WebFlux 的話,那就是:
-
WebFlux 在 Spring Framework 5 推出的,以 Reactor 庫為基礎,基于異步和事件驅(qū)動,實現(xiàn)的響應式 Web 開發(fā)框架。
-
WebFlux 能夠充分利用多核 CPU 的硬件資源,處理大量的并發(fā)請求。因此,可以在不擴充硬件的資源的情況下,提升系統(tǒng)的吞吐性和伸縮性。
注意,這里我們提到的是吞吐性和伸縮性,而不是提升每個請求的性能。我們來回想下整個 WebFlux 的執(zhí)行過程:請求是被作為一個事件丟到線程池中執(zhí)行,等到執(zhí)行完畢,異步回調(diào)結(jié)果給主線程,最后返回給前端。
那么整個過程,相比 SpringMVC 的執(zhí)行過程來說,至少多了一次線程的上下文切換。我們都知道,線程的切換是有成本的。所以,單看一個請求的處理,SpringMVC 的性能是優(yōu)于 WebFlux 的。
我們上文提到的主線程,一般來說就是 IO 線程。
但是,由于 WebFlux 的 IO 線程是非阻塞的,可以不斷解析請求,丟到線程池中執(zhí)行。而 SpringMVC 的 IO 線程是阻塞的,需要等到請求被處理完畢,才能解析下一個請求并進行處理。這樣,隨著每個請求的被處理時間越長、并發(fā)請求的量級越大,WebFlux 相比 SpringMVC 的整體吞吐量高的越多,平均的請求響應時間越短。如下圖所示:
性能對比從圖中,我們可以看到,隨著并發(fā)請求量的增大,WebFlux 的響應時間平穩(wěn)在 100ms 左右,而 SpringMVC 的響應式時間從 3000 并發(fā)量開始,響應時間直線上升。😈 感興趣的胖友,可以參考如下文章,自己做一波性能的基準測試:
-
《性能測試 —— SpringMVC、Webflux 基準測試》
-
《性能測試 —— Spring Cloud Gateway、Zuul 基準測試》
-
《WebFlux 性能測試》
-
《WebFlux 性能問題和適用場景》
那么什么場景下的服務,適合使用 WebFlux 呢?我們可以把任務分成 IO 密集型和 CPU 密集型,而服務本質(zhì)上,是執(zhí)行一個又一個的任務,所以也可以這么分。😈 不了解 IO 密集型和 CPU 密集型的胖友,可以先看下 《計算密集型和 IO 密集型》 文章。
而我們業(yè)務中編寫的代碼,都無一幸免需要跟 MySQL、MongoDB、Elasticsearch 等數(shù)據(jù)庫打交道,又或者跟 Redis、Memcached 等緩存服務打交道,還或者需要跟 RocketMQ、RabbitMQ、Kafka 等消息隊列打交道。無論這些中間件做的多牛逼,性能多么掉渣天,我們都無法避免會經(jīng)過網(wǎng)絡 IO 和磁盤 IO 。所以,我們提供的服務,大多數(shù)都是 IO 密集型。很少會存在,直接從內(nèi)存讀取數(shù)據(jù),直接返回的情況。
**因此,我們業(yè)務中編寫的代碼,絕大多多多數(shù)都是 IO 密集型,都是適合使用 WebFlux 的。**但是,響應式編程對開發(fā)人員的編碼能力要求會比較高,一旦腦子一抽,在 IO 線程中編寫了阻塞代碼,反倒出現(xiàn)性能下滑。具體可以看看艿艿在 《性能測試 —— SpringMVC、Webflux 基準測試》 提供的測試示例,明明白白的。
艿艿建議的話,如果考慮使用 WebFlux 的話,一定要把 Reactor 好好學習下,不然真的是做廝大發(fā)好。同時,每次上線之前,對使用 WebFlux 編寫的服務,做下性能測試,可以發(fā)現(xiàn)編寫不正確的地方,找到阻塞 IO 線程的邏輯。
目前,暫時找不到大規(guī)模使用 WebFlux 的業(yè)務開源項目,最大使用 WebFlux 構(gòu)建的開源項目,就是 Spring Cloud 開源的網(wǎng)關(guān) Spring Cloud Gateway 。😈 可能,WebFlux 或者響應式編程最好的歸宿,暫時是中間件。如果胖友有看過 Dubbo 的線程模型,就會發(fā)現(xiàn)和 WebFlux 是異曲同工之妙。
OK ,嗶嗶結(jié)束~如果胖友想要進一步了解 WebFlux 的話,不煩看看 Spring Cloud Gateway 的源碼,可以看看艿艿寫的 《芋道 Spring Cloud Gateway 源碼解析》 。
總結(jié)
以上是生活随笔為你收集整理的艿艿连肝了几个周末,写了一篇贼长的 Spring 响应式 Web 框架 WebFlux!市面第二完整~的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 做成任何事情的方法
- 下一篇: 系统学习Lambda表达式