自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

如何用Spring WebFlux構建Reactive REST API

譯文
開發(fā) 后端
在本文中,我們將討論如何使用Spring WebFlux來構建響應式REST API,Reactive API的實現,以及響應式規(guī)范的優(yōu)勢。

【51CTO.com快譯】

在本文中,我們將討論如何使用Spring WebFlux來構建響應式REST API。在正式討論之前,讓我們首先來看看系統(tǒng)的開發(fā),傳統(tǒng)REST在實現中遇到的問題,以及當前API的普遍需求。

下圖簡要地羅列了傳統(tǒng)應用和現代應用系統(tǒng)的主要特點。如今的系統(tǒng)講求的是:分布式應用、云原生、高可用性和可擴展性。因此,有效地利用系統(tǒng)現有的資源是至關重要的。

 

應用程序API需求的演變

那么傳統(tǒng)的REST API請求處理又是如何工作的呢?

 

傳統(tǒng)REST API模型

如上圖所示,傳統(tǒng)REST API會帶來如下問題:

  •  阻塞和同步 → 通常,請求線程會去等待各種阻塞的I/O直至結束之后,才能被釋放,進而將響應返回給調用方。
  •  每個請求的線程數 → Web容器會用到基于請求的線程(thread-per-request)模型。該模型限制了待處理的并發(fā)請求數量。也就是說,容器會對請求進行排隊,進而最終影響到API的性能。
  • 處理高并發(fā)用戶的限制 → 正是由于Web容器使用了基于請求的線程模型,因此我們無法去處理那些高并發(fā)量的請求。
  • 無法更好地利用系統(tǒng)資源 → 阻塞的I/O會造成線程處于空閑狀態(tài),進而導致Web容器無法接受更多的請求,我們也就無法有效地利用現有的系統(tǒng)資源。
  •  沒有背壓(backpressure)支持 → 由于我們無法從客戶端或服務器處施加背壓,因此應用程序在負載量大時,無法維持正常運行。也就是說,倘若應用突然面臨大量的請求,則服務器或客戶端可能會由于中斷,而無法訪問到該應用。

下面,讓我們來看看響應式API的優(yōu)勢,以及如何使用響應式編程,來解決上述問題。

  • 異步和無阻塞 → 響應式編程為編寫異步和非阻塞應用程序提供了靈活性。
  • 事件/消息驅動 → 系統(tǒng)能夠為任何活動生成對應的事件或消息。例如,那些來自數據庫的數據會被視為事件流。
  • 支持背壓 → 我們可以通過施加背壓,來優(yōu)雅地”處理從一個系統(tǒng)到另一個系統(tǒng)的壓力,從而避免了拒絕服務的出現。
  • 可預測的應用響應時間 → 由于線程是異步且非阻塞的,因此我們可以預測負載下的應用響應時間。
  • 更好地利用系統(tǒng)資源 → 同樣由于線程是異步且非阻塞的,因此各種線程不會被I/O所占用,它們能夠支持更多的用戶請求。
  • 基于負載的擴容方式
  • 擺脫基于請求的線程 → 借助響應式API,并得益于異步且非阻塞的線程,我們可以擺脫基于請求的線程模型。在請求被產生后,模型會與服務器一起創(chuàng)建事件,并通過請求線程,去處理其他的請求。

那么,響應式編程的具體流程是怎樣的呢?如下圖所示,一旦應用程序調用了從某個數據源獲取數據的操作,那么就會立即返回一個線程,并且讓來自該數據源的數據作為數據/事件流出現。在此,應用程序是訂閱者(subscriber),數據源是發(fā)布者(publisher)。一旦數據流完成后,onComplete事件就會被觸發(fā)。 

 

數據流工作流程

如下圖所示,如果發(fā)生了任何異常情況,發(fā)布者將會觸發(fā)onError事件。 

數據流工作流程

在某些情況下,例如:從數據庫中刪除一個條目,發(fā)布者只會立即觸發(fā)onComplete/onError事件,而不會調用onNext事件,畢竟沒有任何數據可以返回。 

數據流工作流程

下面,我們進一步討論:什么是背壓,以及如何將背壓應用于響應流。例如,我們有一個客戶端應用正在向另一個服務請求數據。該服務能夠以1000 TPS(吞吐量)的速率發(fā)布事件,而客戶端應用只能以200 TPS的速率處理事件。

那么在這種情況下,客戶端應用程序需要通過緩沖數據來進行處理。而在隨后的調用中,客戶端應用程序可能會緩沖更多的數據,以致最終耗盡內存。顯然,這對于那些依賴該客戶端應用的其他程序,會造成級聯(lián)效應。為了避免此類情況,客戶端應用可以要求服務在事件的末尾進行緩沖,并以客戶端應用的速率去推送各種事件。這就是所謂的背壓,具體流程請見下圖。 

背壓示例

下面,我們將介紹響應流的規(guī)范(請參見--https://www.reactive-streams.org/),以及一個實現案例--Project Reactor(請參見--https://projectreactor.io/)。通常,響應流的規(guī)范定義了如下接口類型:

  • 發(fā)布者(Publisher) → 發(fā)布者是那些具有無限數量順序元素的提供者。它可以按照訂閱者的要求進行發(fā)布。其Java代碼如下所示:  
  1. public interface Publisher<T> {    
  2.      public void subscribe(Subscriber<? super T> s); 
  3.  
  •  訂閱者(Subscriber) → 訂閱者恰好是那些具有無限數量順序元素的使用者。其Java代碼如下所示:  
  1. public interface Subscriber<T> { 
  2.     public void onSubscribe(Subscription s); 
  3.      public void onNext(T t); 
  4.      public void onError(Throwable t); 
  5.      public void onComplete(); 
  6.  
  •  訂閱(Subscription) → 表示訂閱者向發(fā)布者訂閱的某個一對一的周期。其Java代碼如下所示:
  1. public interface Subscription { 
  2.     public void request(long n); 
  3.     public void cancel(); 
  4. }
  •  處理器(Processor) → 表示一個處理階段,即訂閱者和發(fā)布者之間根據約定進行處理。

下面是響應流規(guī)范的類圖: 

響應流規(guī)范

其實,響應流規(guī)范具有許多種實現方式,上述Project Reactor只是其中的一種。Reactor可以完全實現無阻塞、且有效的請求管理。它能夠提供兩個響應式和可組合的API,即:Flux [N](請參見-- https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html)和Mono [0|1](請參見--https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html)。它們廣泛地實現了響應式擴展(Reactive Extensions)。ReactorHTTP(包括Websocket)提供了非阻塞的背壓式網絡引擎、TCPUDP。它也非常適合于微服務的架構。

  •  Flux→ 它是發(fā)布者帶有各種rx運算符的響應流(Reactive Streams),它會發(fā)出0N個元素,然后輸出成功、或帶有某個錯誤的完成結果。其流程圖如下所示: 

圖片來源:https://projectreactor.io

  •  Mono → 它也是發(fā)布者具有各種基本rx運算符的響應流,能夠通過發(fā)出01個元素,輸出成功、或帶有某個錯誤的完成結果。其流程圖如下所示: 

圖片來源:https://projectreactor.io

由于Reactor的實施往往涉及到Spring 5.x,因此,我們可以使用帶有Spring servlet棧的命令式編程,來構建REST API。下圖展示了Spring如何支持響應式和servlet棧的實現。 

 

圖片來源:spring.io

下面是一個公布了響應式REST API的應用。在該應用中,我們使用到了:

  •  帶有WebFluxSpring Boot
  •  具有響應式支持的Spring數據
  •  Cassandra數據庫

下圖是該應用的整體架構: 

下面是build.gradle文件的Groovy代碼,它包含了與Spring WebFlux協(xié)同使用的各種依賴項。

  1. plugins { 
  2.      id 'org.springframework.boot' version '2.2.6.RELEASE' 
  3.      id 'io.spring.dependency-management' version '1.0.9.RELEASE' 
  4.      id 'java' 
  5. group = 'org.smarttechie' 
  6. version = '0.0.1-SNAPSHOT' 
  7. sourceCompatibility = '1.8' 
  8. repositories { 
  9.     mavenCentral() 
  10. }  
  11. dependencies { 
  12.    implementation 'org.springframework.boot:spring-boot-starter-data-cassandra-reactive' 
  13.    implementation 'org.springframework.boot:spring-boot-starter-webflux' 
  14.    testImplementation('org.springframework.boot:spring-boot-starter-test') { 
  15.    exclude group'org.junit.vintage', module: 'junit-vintage-engine' 
  16.    } 
  17.    testImplementation 'io.projectreactor:reactor-test' 
  18.  } 
  19. test { 
  20.   useJUnitPlatform() 
  21.  

在此應用程序中,我公布了如下API。您可以通過GitHub的相關鏈接--https://github.com/2013techsmarts/Spring-Reactive-Examples,下載源代碼。  

在構建響應式API時,我們可以使用功能性樣式編程模型來構建API,而無需使用RestController。當然,您需要具有如下的routerhandler組件:

Router 

  1. package org.smarttechie.router; 
  2. import org.smarttechie.handler.ProductHandler; 
  3. import org.springframework.context.annotation.Bean; 
  4. import org.springframework.context.annotation.Configuration; 
  5. import org.springframework.http.MediaType; 
  6. import org.springframework.web.reactive.function.server.RouterFunction; 
  7. import org.springframework.web.reactive.function.server.RouterFunctions; 
  8. import org.springframework.web.reactive.function.server.ServerResponse; 
  9. import static org.springframework.web.reactive.function.server.RequestPredicates.*; 
  10. @Configuration 
  11. public class ProductRouter { 
  12.     /** 
  13.      * The router configuration for the product handler. 
  14.      * @param productHandler 
  15.      * @return 
  16.      */ 
  17.     @Bean 
  18. public RouterFunction<ServerResponse>    productsRoute(ProductHandler productHandler){ 
  19.     return RouterFunctions.route(GET("/products").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::getAllProducts).andRoute(POST("/product").and(accept(MediaType.APPLICATION_JSON)),productHandler::createProduct).andRoute(DELETE("/product/{id}").and(accept(MediaType.APPLICATION_JSON)) ,productHandler::deleteProduct).andRoute(PUT("/product/{id}").and(accept(MediaType.APPLICATION_JSON)),productHandler::updateProduct); 
  20.  } 
  21. }

 Handler 

  1. package org.smarttechie.handler; 
  2.   import org.smarttechie.model.Product; 
  3.   import org.smarttechie.service.ProductService; 
  4.   import org.springframework.beans.factory.annotation.Autowired; 
  5.   import org.springframework.http.MediaType; 
  6.   import org.springframework.stereotype.Component;    
  7.   import org.springframework.web.reactive.function.server.ServerRequest; 
  8.    import org.springframework.web.reactive.function.server.ServerResponse; 
  9.   import reactor.core.publisher.Mono; 
  10.    import static org.springframework.web.reactive.function.BodyInserters.fromObject;     
  11.    @Component 
  12.    public class ProductHandler { 
  13.     @Autowired 
  14.     private ProductService productService; 
  15.      static Mono<ServerResponse> notFound = ServerResponse.notFound().build(); 
  16.    /** 
  17.      * The handler to get all the available products. 
  18.      * @param serverRequest 
  19.      * @return - all the products info as part of ServerResponse 
  20.      */ 
  21.     public Mono<ServerResponse> getAllProducts(ServerRequest serverRequest) { 
  22.          return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.getAllProducts(), Product.class); 
  23.   } 
  24.  
  25.     /** 
  26.       * The handler to create a product 
  27.       * @param serverRequest 
  28.       * @return - return the created product as part of ServerResponse 
  29.      */ 
  30.     public Mono<ServerResponse> createProduct(ServerRequest serverRequest) { 
  31.         Mono<Product> productToSave = serverRequest.bodyToMono(Product.class); 
  32.         return productToSave.flatMap(product -> 
  33.                 ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(productService.save(product), Product.class));   
  34.  
  35.    }    
  36.  
  37.     /** 
  38.      * The handler to delete a product based on the product id.          
  39.      * @param serverRequest 
  40.      * @return - return the deleted product as part of ServerResponse 
  41.      */    
  42.     public Mono<ServerResponse> deleteProduct(ServerRequest serverRequest) { 
  43.         String id = serverRequest.pathVariable("id");  
  44.         Mono<Void> deleteItem = productService.deleteProduct(Integer.parseInt(id)); 
  45.          return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(deleteItem, Void.class);   
  46.    } 
  47.     /** 
  48.       * The handler to update a product. 
  49.       * @param serverRequest   
  50.       * @return - The updated product as part of ServerResponse 
  51.      */ 
  52.     public Mono<ServerResponse> updateProduct(ServerRequest serverRequest) { 
  53.       return productService.update(serverRequest.bodyToMono(Product.class)).flatMap(product ->ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(fromObject(product))) .switchIfEmpty(notFound);  
  54.     } 

至止,我們已經對如何公布響應式REST API有所了解。針對上述實現,我們使用了Gatling(譯者注:是一款功能強大的負載測試工具),在響應式API和非響應式API(使用Spring RestController構建非響應式API)上,進行了簡單的基準化測試。其結果比較如下圖所示。具體的Gatling負載測試腳本,請參考GitHub上的鏈接:https://github.com/2013techsmarts/Spring-Reactive-Examples。 

負載測試結果比較 

原標題:Build Reactive REST APIs With Spring WebFlux ,作者:Siva Prasad Rao Janapati

【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】

 

責任編輯:龐桂玉 來源: Linux中國
相關推薦

2022-05-31 07:40:41

ArctypeFeather.jsSQLite

2023-05-11 12:40:00

Spring控制器HTTP

2025-03-28 09:33:11

2022-02-09 14:36:25

GoMongoDBFiber

2022-01-07 15:11:27

項目Go 框架

2023-09-21 11:20:46

2025-03-31 09:30:52

2023-09-04 11:52:53

SpringMVC性能

2020-05-25 07:00:00

雙因素認證身份認證密碼

2023-12-06 07:13:16

RESTAPI客戶端

2023-04-18 15:18:10

2024-10-15 09:34:57

2022-07-04 09:15:10

Spring請求處理流程

2023-02-09 08:01:12

核心組件非阻塞

2024-09-26 08:03:37

2023-10-18 15:55:14

REST APISpring MVC

2022-03-29 09:00:00

Angular框架REST API

2022-11-04 08:39:46

SpringWebFlux

2024-01-09 09:09:45

RESTGraphQL

2021-11-16 14:25:38

JavaScript前端
點贊
收藏

51CTO技術棧公眾號