如何用Spring WebFlux構建Reactive REST API
譯文【51CTO.com快譯】
在本文中,我們將討論如何使用Spring WebFlux來構建響應式REST API。在正式討論之前,讓我們首先來看看系統(tǒng)的開發(fā),傳統(tǒng)REST在實現中遇到的問題,以及當前API的普遍需求。
下圖簡要地羅列了傳統(tǒng)應用和現代應用系統(tǒng)的主要特點。如今的系統(tǒng)講求的是:分布式應用、云原生、高可用性和可擴展性。因此,有效地利用系統(tǒng)現有的資源是至關重要的。
應用程序API需求的演變
那么傳統(tǒng)的REST API請求處理又是如何工作的呢?
傳統(tǒng)REST API模型
如上圖所示,傳統(tǒng)REST API會帶來如下問題:
- 阻塞和同步 → 通常,請求線程會去等待各種阻塞的I/O直至結束之后,才能被釋放,進而將響應返回給調用方。
- 每個請求的線程數 → Web容器會用到基于請求的線程(thread-per-request)模型。該模型限制了待處理的并發(fā)請求數量。也就是說,容器會對請求進行排隊,進而最終影響到API的性能。
- 處理高并發(fā)用戶的限制 → 正是由于Web容器使用了基于請求的線程模型,因此我們無法去處理那些高并發(fā)量的請求。
- 無法更好地利用系統(tǒng)資源 → 阻塞的I/O會造成線程處于空閑狀態(tài),進而導致Web容器無法接受更多的請求,我們也就無法有效地利用現有的系統(tǒng)資源。
- 沒有背壓(backpressure)支持 → 由于我們無法從客戶端或服務器處施加背壓,因此應用程序在負載量大時,無法維持正常運行。也就是說,倘若應用突然面臨大量的請求,則服務器或客戶端可能會由于中斷,而無法訪問到該應用。
下面,讓我們來看看響應式API的優(yōu)勢,以及如何使用響應式編程,來解決上述問題。
- 異步和無阻塞 → 響應式編程為編寫異步和非阻塞應用程序提供了靈活性。
- 事件/消息驅動 → 系統(tǒng)能夠為任何活動生成對應的事件或消息。例如,那些來自數據庫的數據會被視為事件流。
- 支持背壓 → 我們可以通過施加背壓,來“優(yōu)雅地”處理從一個系統(tǒng)到另一個系統(tǒng)的壓力,從而避免了拒絕服務的出現。
- 可預測的應用響應時間 → 由于線程是異步且非阻塞的,因此我們可以預測負載下的應用響應時間。
- 更好地利用系統(tǒng)資源 → 同樣由于線程是異步且非阻塞的,因此各種線程不會被I/O所占用,它們能夠支持更多的用戶請求。
- 基于負載的擴容方式
- 擺脫基于請求的線程 → 借助響應式API,并得益于異步且非阻塞的線程,我們可以擺脫基于請求的線程模型。在請求被產生后,模型會與服務器一起創(chuàng)建事件,并通過請求線程,去處理其他的請求。
那么,響應式編程的具體流程是怎樣的呢?如下圖所示,一旦應用程序調用了從某個數據源獲取數據的操作,那么就會立即返回一個線程,并且讓來自該數據源的數據作為數據/事件流出現。在此,應用程序是訂閱者(subscriber),數據源是發(fā)布者(publisher)。一旦數據流完成后,onComplete事件就會被觸發(fā)。
數據流工作流程
如下圖所示,如果發(fā)生了任何異常情況,發(fā)布者將會觸發(fā)onError事件。
數據流工作流程
在某些情況下,例如:從數據庫中刪除一個條目,發(fā)布者只會立即觸發(fā)onComplete/onError事件,而不會調用onNext事件,畢竟沒有任何數據可以返回。
數據流工作流程
下面,我們進一步討論:什么是背壓,以及如何將背壓應用于響應流。例如,我們有一個客戶端應用正在向另一個服務請求數據。該服務能夠以1000 TPS(吞吐量)的速率發(fā)布事件,而客戶端應用只能以200 TPS的速率處理事件。
那么在這種情況下,客戶端應用程序需要通過緩沖數據來進行處理。而在隨后的調用中,客戶端應用程序可能會緩沖更多的數據,以致最終耗盡內存。顯然,這對于那些依賴該客戶端應用的其他程序,會造成級聯(lián)效應。為了避免此類情況,客戶端應用可以要求服務在事件的末尾進行緩沖,并以客戶端應用的速率去推送各種事件。這就是所謂的背壓,具體流程請見下圖。
背壓示例
下面,我們將介紹響應流的規(guī)范(請參見--https://www.reactive-streams.org/),以及一個實現案例--Project Reactor(請參見--https://projectreactor.io/)。通常,響應流的規(guī)范定義了如下接口類型:
- 發(fā)布者(Publisher) → 發(fā)布者是那些具有無限數量順序元素的提供者。它可以按照訂閱者的要求進行發(fā)布。其Java代碼如下所示:
- public interface Publisher<T> {
- public void subscribe(Subscriber<? super T> s);
- }
- 訂閱者(Subscriber) → 訂閱者恰好是那些具有無限數量順序元素的使用者。其Java代碼如下所示:
- public interface Subscriber<T> {
- public void onSubscribe(Subscription s);
- public void onNext(T t);
- public void onError(Throwable t);
- public void onComplete();
- }
- 訂閱(Subscription) → 表示訂閱者向發(fā)布者訂閱的某個一對一的周期。其Java代碼如下所示:
- public interface Subscription {
- public void request(long n);
- public void cancel();
- }
- 處理器(Processor) → 表示一個處理階段,即訂閱者和發(fā)布者之間根據約定進行處理。
下面是響應流規(guī)范的類圖:
響應流規(guī)范
其實,響應流規(guī)范具有許多種實現方式,上述Project Reactor只是其中的一種。Reactor可以完全實現無阻塞、且有效的請求管理。它能夠提供兩個響應式和可組合的API,即:Flux [N](請參見-- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)和Mono [0|1](請參見--https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)。它們廣泛地實現了響應式擴展(Reactive Extensions)。Reactor為HTTP(包括Websocket)提供了非阻塞的背壓式網絡引擎、TCP和UDP。它也非常適合于微服務的架構。
- Flux→ 它是發(fā)布者帶有各種rx運算符的響應流(Reactive Streams),它會發(fā)出0到N個元素,然后輸出成功、或帶有某個錯誤的完成結果。其流程圖如下所示:
圖片來源:https://projectreactor.io
- Mono → 它也是發(fā)布者具有各種基本rx運算符的響應流,能夠通過發(fā)出0到1個元素,輸出成功、或帶有某個錯誤的完成結果。其流程圖如下所示:
圖片來源:https://projectreactor.io
由于Reactor的實施往往涉及到Spring 5.x,因此,我們可以使用帶有Spring servlet棧的命令式編程,來構建REST API。下圖展示了Spring如何支持響應式和servlet棧的實現。
圖片來源:spring.io
下面是一個公布了響應式REST API的應用。在該應用中,我們使用到了:
- 帶有WebFlux的Spring Boot
- 具有響應式支持的Spring數據
- Cassandra數據庫
下圖是該應用的整體架構:
下面是build.gradle文件的Groovy代碼,它包含了與Spring WebFlux協(xié)同使用的各種依賴項。
- plugins {
- id 'org.springframework.boot' version '2.2.6.RELEASE'
- id 'io.spring.dependency-management' version '1.0.9.RELEASE'
- id 'java'
- }
- group = 'org.smarttechie'
- version = '0.0.1-SNAPSHOT'
- sourceCompatibility = '1.8'
- repositories {
- mavenCentral()
- }
- dependencies {
- implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive'
- implementation 'org.springframework.boot:spring-boot-starter-webflux'
- testImplementation('org.springframework.boot:spring-boot-starter-test') {
- exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
- }
- testImplementation 'io.projectreactor:reactor-test'
- }
- test {
- useJUnitPlatform()
- }
在此應用程序中,我公布了如下API。您可以通過GitHub的相關鏈接--https://github.com/2013techsmarts/Spring-Reactive-Examples,下載源代碼。
在構建響應式API時,我們可以使用功能性樣式編程模型來構建API,而無需使用RestController。當然,您需要具有如下的router和handler組件:
Router:
- package org.smarttechie.router;
- import org.smarttechie.handler.ProductHandler;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
- import org.springframework.http.MediaType;
- import org.springframework.web.reactive.function.server.RouterFunction;
- import org.springframework.web.reactive.function.server.RouterFunctions;
- import org.springframework.web.reactive.function.server.ServerResponse;
- import static org.springframework.web.reactive.function.server.RequestPredicates.*;
- @Configuration
- public class ProductRouter {
- /**
- * The router configuration for the product handler.
- * @param productHandler
- * @return
- */
- @Bean
- public RouterFunction<ServerResponse> productsRoute(ProductHandler productHandler){
- return RouterFunctions.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct);
- }
- }
Handler:
- package org.smarttechie.handler;
- import org.smarttechie.model.Product;
- import org.smarttechie.service.ProductService;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.http.MediaType;
- import org.springframework.stereotype.Component;
- import org.springframework.web.reactive.function.server.ServerRequest;
- import org.springframework.web.reactive.function.server.ServerResponse;
- import reactor.core.publisher.Mono;
- import static org.springframework.web.reactive.function.BodyInserters.fromObject;
- @Component
- public class ProductHandler {
- @Autowired
- private ProductService productService;
- static Mono<ServerResponse> notFound = ServerResponse.notFound().build();
- /**
- * The handler to get all the available products.
- * @param serverRequest
- * @return - all the products info as part of ServerResponse
- */
- public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) {
- return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(), Product.class);
- }
- /**
- * The handler to create a product
- * @param serverRequest
- * @return - return the created product as part of ServerResponse
- */
- public Mono<ServerResponse> createProduct(ServerRequest serverRequest) {
- Mono<Product> productToSave = serverRequest.bodyToMono(Product.class);
- return productToSave.flatMap(product ->
- ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.save(product), Product.class));
- }
- /**
- * The handler to delete a product based on the product id.
- * @param serverRequest
- * @return - return the deleted product as part of ServerResponse
- */
- public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) {
- String id = serverRequest.pathVariable("id");
- Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id));
- return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(deleteItem, Void.class);
- }
- /**
- * The handler to update a product.
- * @param serverRequest
- * @return - The updated product as part of ServerResponse
- */
- public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) {
- return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(product))) .switchIfEmpty(notFound);
- }
- }
至止,我們已經對如何公布響應式REST API有所了解。針對上述實現,我們使用了Gatling(譯者注:是一款功能強大的負載測試工具),在響應式API和非響應式API(使用Spring RestController構建非響應式API)上,進行了簡單的基準化測試。其結果比較如下圖所示。具體的Gatling負載測試腳本,請參考GitHub上的鏈接:https://github.com/2013techsmarts/Spring-Reactive-Examples。
負載測試結果比較
原標題:Build Reactive REST APIs With Spring WebFlux ,作者:Siva Prasad Rao Janapati
【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】