Spring WebFlux入門實例并整合數(shù)據(jù)庫實現(xiàn)基本的增刪改查
環(huán)境:Springboot2.4.11
概述
為什么創(chuàng)建了SpringWebFlux?
部分答案是需要一個非阻塞web堆棧來處理具有少量線程的并發(fā)性,并使用較少的硬件資源進行擴展。Servlet 3.1確實為非阻塞I/O提供了一個API。但是,使用它會導致Servlet API的其他部分出現(xiàn)偏差。這是一個新的公共API在任何非阻塞運行時作為基礎的動機。這一點很重要,因為服務器(如Netty)在異步、非阻塞空間中建立良好。
答案的另一部分是函數(shù)式編程。正如Java 5中添加注釋創(chuàng)造了機會(如帶注釋的REST控制器或單元測試),Java 8中添加lambda表達式也為Java中的函數(shù)API創(chuàng)造了機會。這對于允許異步邏輯的聲明性組合的非阻塞應用程序和延續(xù)式API(CompletableFuture和ReactiveX推廣)是一個福音。在編程模型級別,Java8支持SpringWebFlux提供功能性web端點和帶注釋的控制器。
什么是反應式編程?
“反應式”指的是圍繞對變化作出反應而構建的編程模型 — 響應I/O事件的網(wǎng)絡組件、響應鼠標事件的UI控制器以及其他組件。非阻塞是反應性的,因為我們現(xiàn)在不是被阻塞,而是在操作完成或數(shù)據(jù)可用時對通知作出反應。還有另一個重要的機制,我們Spring團隊將其與“反應性”聯(lián)系起來,那就是無阻塞背壓。在同步命令式代碼中,阻塞調用作為一種自然形式的背壓,迫使調用方等待。在非阻塞代碼中,控制事件的速率變得非常重要,以便快速生產者不會壓倒其目的地。
Reactive Streams 是一個小規(guī)范(在Java9中也采用),它定義了具有背壓的異步組件之間的交互。例如,數(shù)據(jù)存儲庫(充當發(fā)布者)可以生成HTTP服務器(充當訂戶)可以寫入響應的數(shù)據(jù)。反應流的主要目的是讓訂閱者控制發(fā)布者生成數(shù)據(jù)的速度。
Reactive Streams 在互操作性方面起著重要作用。它對庫和基礎結構組件很感興趣,但作為應用程序API不太有用,因為它的級別太低。應用程序需要更高級別、更豐富、功能更強大的API來組成異步邏輯 — 與Java8流API類似,但不僅僅針對集合。這就是反應式庫所扮演的角色。
Reactor是Spring WebFlux的首選反應庫。它提供了Mono和Flux API類型,通過與ReactiveX操作符詞匯表對齊的一組豐富的操作符來處理0..1(Mono)和0..N(Flux)的數(shù)據(jù)序列。反應器是一個反應流庫,因此,其所有操作員都支持非阻塞背壓。Reactor非常關注服務器端Java。它是與Spring密切合作開發(fā)的。
WebFlux需要Reactor作為核心依賴項,但它可以通過反應流與其他反應庫進行互操作。作為一般規(guī)則,WebFlux API接受普通發(fā)布服務器作為輸入,在內部將其調整為反應器類型,使用該類型,并返回Flux或Mono作為輸出。
編程模型
Spring Web模塊包含了Spring WebFlux的基礎,包括HTTP抽象、支持服務器的反應流適配器、編解碼器和與Servlet API相媲美的核心WebHandler-API,但是具有非阻塞契約。
在此基礎上,Spring WebFlux提供了兩種編程模型的選擇:
- Annotated Controllers:與SpringMVC一致,并基于SpringWeb模塊中相同的注釋。SpringMVC和WebFlux控制器都支持反應式(Reactor和RxJava)返回類型,因此很難區(qū)分它們。一個顯著的區(qū)別是WebFlux還支持反應式@RequestBody參數(shù)。
- Functional Endpoints: 基于Lambda的輕量級功能性編程模型。您可以將其視為一個小型庫或一組應用程序可用于路由和處理請求的實用程序。帶注釋控制器的最大區(qū)別在于,應用程序從頭到尾負責請求處理,而不是通過注釋聲明意圖并被回調。
適用性
Spring MVC or WebFlux如何選擇?
它們可以并排使用,并且來自各方的反饋對雙方都有利。下圖顯示了兩者之間的關系、它們的共同點以及各自唯一支持的內容:

我們建議你考慮以下幾點:
- 如果你有一個運行良好的SpringMVC應用程序,則無需進行更改。命令式編程是編寫、理解和調試代碼的最簡單方法。您可以選擇最多的庫,因為從歷史上看,大多數(shù)庫都是阻塞的。
- 如果你已經在購買非阻塞web堆棧,Spring WebFlux提供了與此領域其他產品相同的執(zhí)行模型優(yōu)勢,還提供了服務器選擇(Netty、Tomcat、Jetty、Undertow和Servlet 3.1+容器)、編程模型選擇(annotated controllers and functional web endpoints),以及可選擇的反應庫(Reactor、RxJava或其他)。
- 如果你對用于Java8 Lambdas或Kotlin的輕量級功能性web框架感興趣,可以使用SpringWebFlux功能性web端點。對于需求不太復雜的小型應用程序或微服務來說,這也是一個不錯的選擇,因為它們可以從更高的透明度和控制中獲益。
- 在微服務體系結構中,可以混合使用具有SpringMVC或SpringWebFlux控制器或SpringWebFlux功能端點的應用程序。在兩個框架中都支持相同的基于注釋的編程模型,這使得重用知識更加容易,同時也為正確的工作選擇了正確的工具。
- 評估應用程序的一種簡單方法是檢查其依賴性。如果你有塊持久性API(JPA、JDBC)或網(wǎng)絡API可供使用,SpringMVC至少是通用體系結構的最佳選擇。對于Reactor和RxJava來說,在單獨的線程上執(zhí)行阻塞調用在技術上是可行的,但是你不會充分利用非阻塞web堆棧。
- 如果你有一個SpringMVC應用程序,可以調用遠程服務,請嘗試使用反應式WebClient。你可以直接從SpringMVC控制器方法返回反應類型(Reactor、RxJava或其他)。每次調用的延遲或調用之間的相互依賴性越大,好處就越顯著。SpringMVC控制器也可以調用其他反應組件。
- 如果你有一個龐大的團隊,請記住,在向非阻塞、函數(shù)式和聲明式編程轉變的過程中,學習曲線很陡峭。在沒有完全轉換的情況下啟動的一種實用方法是使用反應式WebClient。除此之外,從小事做起,衡量效益。我們預計,對于廣泛的應用,這種轉變是不必要的。如果您不確定要尋找哪些好處,請從了解非阻塞I/O的工作原理(例如,單線程Node.js上的并發(fā))及其效果開始。
應用服務
Spring WebFlux在Tomcat、Jetty、Servlet3.1+容器以及Netty和Undertow等非Servlet運行時上受支持。所有服務器都適用于低級別的通用API,以便跨服務器支持更高級別的編程模型。
Spring WebFlux沒有啟動或停止服務器的內置支持。然而,從Spring配置和WebFlux基礎設施組裝應用程序并用幾行代碼運行它是很容易的。
Spring Boot有一個WebFlux啟動器,可以自動執(zhí)行這些步驟。默認情況下,初學者使用Netty,但通過更改Maven或Gradle依賴項,可以很容易地切換到Tomcat、Jetty或Undertow。springboot默認為Netty,因為它更廣泛地用于異步、無阻塞,并允許客戶端和服務器共享資源。
Tomcat和Jetty可以與Spring MVC和WebFlux一起使用。但是,請記住,它們的使用方式是非常不同的。SpringMVC依賴于Servlet阻塞I/O,并允許應用程序在需要時直接使用Servlet API。Spring WebFlux依賴于Servlet3.1非阻塞I/O,并在低級適配器后面使用ServletAPI。它不暴露直接使用。
對于Undertow,Spring WebFlux直接使用Undertow API,而不使用ServletAPI。
性能
性能有許多特點和意義。響應式和非阻塞通常不會使應用程序運行得更快。在某些情況下,它們可以(例如,如果使用WebClient并行運行遠程調用)??偟膩碚f,它需要更多的工作來完成非阻塞方式的事情,這可以稍微增加所需的處理時間。
反應式和非阻塞的主要預期好處是能夠用少量固定數(shù)量的線程和更少的內存進行擴展。這使得應用程序在負載下更具彈性,因為它們以更可預測的方式擴展。然而,為了觀察這些好處,你需要有一些延遲(包括緩慢和不可預測的網(wǎng)絡I/O混合)。這就是反應堆棧開始顯示其優(yōu)勢的地方,而差異可能是巨大的。
線程模型
在SpringMVC(以及一般的servlet應用程序)中,假定應用程序可以阻止當前線程(例如,遠程調用)。由于這個原因,servlet容器使用一個大的線程池來吸收請求處理期間的潛在阻塞。
在Spring WebFlux(以及一般的非阻塞服務器)中,假定應用程序不阻塞。因此,非阻塞服務器使用一個小的、固定大小的線程池(事件循環(huán)工作者)來處理請求。
- 調用阻塞API
如果你確實需要使用阻塞庫怎么辦?Reactor和RxJava都提供publishOn操作符,以便在不同的線程上繼續(xù)處理。這意味著有一個容易逃生的艙口。但是,請記住,阻塞API并不適合此并發(fā)模型。
- 易變狀態(tài)
在Reactor和RxJava中,通過操作符聲明邏輯。在運行時,會形成一個反應式管道,在該管道中,數(shù)據(jù)會在不同的階段按順序處理。這樣做的一個關鍵好處是,它使應用程序不必保護可變狀態(tài),因為該管道中的應用程序代碼永遠不會被并發(fā)調用。
- 線程模型
在運行Spring WebFlux的服務器上,你希望看到哪些線程?
在“普通”Spring WebFlux服務器上(例如,沒有數(shù)據(jù)訪問或其他可選依賴項),您可以期望服務器有一個線程,其他幾個線程用于請求處理(通常與CPU核數(shù)相同)。然而,Servlet容器可以從更多線程開始(例如,Tomcat上的10個線程),以支持Servlet(阻塞)I/O和Servlet 3.1(非阻塞)I/O使用。
反應式WebClient以事件循環(huán)方式運行。因此,您可以看到與此相關的少量固定數(shù)量的處理線程(例如,reactor http nio-帶有reactor Netty連接器)。但是,如果Reactor Netty同時用于客戶端和服務器,則默認情況下,這兩個服務器共享事件循環(huán)資源。
Reactor和RxJava提供了線程池抽象,稱為調度程序,與publishOn操作符一起使用,publishOn操作符用于將處理切換到不同的線程池。調度程序的名稱表示特定的并發(fā)策略 — 例如,“并行”(用于CPU綁定的、線程數(shù)量有限的工作)或“彈性”(用于I/O綁定的、線程數(shù)量較多的工作)。如果你看到這樣的線程,則意味著某些代碼正在使用特定的線程池調度程序策略。
數(shù)據(jù)訪問庫和其他第三方依賴項也可以創(chuàng)建和使用自己的線程。
引入依賴
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-webflux</artifactId>
- </dependency>
- <!--R2DBC是基于Reactive Streams標準來設計的。通過使用R2DBC,你可以使用reactive API來操作數(shù)據(jù)。同時R2DBC只是一個開放的標準,而各個具體的數(shù)據(jù)庫連接實現(xiàn),需要實現(xiàn)這個標準。-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-data-r2dbc</artifactId>
- </dependency>
- <!--響應式編程傳統(tǒng)的jdbc操作是阻塞式的,所以不能再用以前的mysql驅動了-->
- <dependency>
- <groupId>dev.miku</groupId>
- <artifactId>r2dbc-mysql</artifactId>
- </dependency>
相關配置
- spring:
- r2dbc:
- #連接數(shù)據(jù)庫的url,前綴不再是jdbc而是換成r2dbc
- #這里可以配置連接池相關的其它屬性,這里為了簡潔不配置
- url: r2dbc:mysql://localhost:3306/testjpa
- username: root
- password: 123123
PO定義
- @Table(value = "reactive_users")
- public class Users implements Serializable {
- @Id
- private Long id ;
- private Integer age ;
- private String name ;
- }
DAO層定義
- // 在使用JPA時經常用到的是JpaRepository等;在反應式編程中不能在使用了,只能用如下的接口
- public interface UsersRepository extends ReactiveCrudRepository<Users, Long>, ReactiveSortingRepository<Users, Long>{
- // 這里的方法名定義還是與使用data-jpa時一樣的定義
- public Mono<Users> findByName(String name) ;
- }
Service層定義
- @Service
- public class UsersService {
- @Resource
- private UsersRepository usersRepository ;
- @Transactional
- public Mono<Users> save(Users users) {
- return usersRepository.save(users) ;
- }
- public Mono<Users> getUsers(Long id) {
- return usersRepository.findById(id) ;
- }
- public Flux<Users> list() {
- return usersRepository.findAll() ;
- }
- public Mono<Users> getUsersByName(String name) {
- return usersRepository.findByName(name) ;
- }
- }
Service中定義了CURD操作。
Controller接口定義
- @RestController
- @RequestMapping("/users")
- public class UsersController {
- @Resource
- private UsersService usersService ;
- @PostMapping("/save")
- public Mono<Long> save(@RequestBody Users user) {
- Mono<Users> res = usersService.save(user) ;
- return res.flatMap(new Function<Users, Mono<Long>>() {
- @Override
- public Mono<Long> apply(Users t) {
- return Mono.just(t.getId()) ;
- }
- }) ;
- }
- @GetMapping("/{id}")
- public Mono<Users> getUsers(@PathVariable("id") Long id) {
- return usersService.getUsers(id) ;
- }
- @GetMapping("/lists")
- public Flux<Users> list() {
- return usersService.list() ;
- }
- @GetMapping("/name")
- public Mono<Users> name(String name) {
- return usersService.getUsersByName(name) ;
- }
- }
Controller的定義還是與傳統(tǒng)的定義方式差不多,只是返回值要么是Mono(單一值),要么是Flux(集合)對象。