自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java9異步編程-反應(yīng)式流使用

開發(fā) 后端
在本文中,主要研究下Java9+中的反應(yīng)流。簡單地說,就是使用Flow類,F(xiàn)low類包含了用于構(gòu)建響應(yīng)式流處理的主要模塊。

[[438594]]

 在本文中,主要研究下Java9+中的反應(yīng)流。簡單地說,就是使用Flow類,F(xiàn)low類包含了用于構(gòu)建響應(yīng)式流處理的主要模塊。

這里說的反應(yīng)流其實(shí)是一種非阻塞式背壓的異步流處理標(biāo)準(zhǔn)(有點(diǎn)繞口)。

該規(guī)范在響應(yīng)式宣言中定義,并且有各種各樣的實(shí)現(xiàn),例如RxJava或Akka-Streams。

Reactive API總覽

要構(gòu)建一個(gè)流,主要使用三個(gè)抽象,并將它們組合成異步處理邏輯。

每個(gè)流都需要處理由Publisher實(shí)例發(fā)布給它的事件;發(fā)布者有一個(gè)subscribe()的方法。

如果某個(gè)訂閱者希望接收發(fā)布者發(fā)布的事件,則需要使用subscribe()訂閱發(fā)布者。

消息的接收方需要實(shí)現(xiàn)訂閱者接口。一般情況下,接受者是每個(gè)Flow處理的結(jié)束,因?yàn)樗膶?shí)例不會(huì)進(jìn)一步發(fā)送消息。

可以將Subscriber看作Sink。有四個(gè)方法需要重寫onSubscribe(), onNext(), onError()和onComplete()。

如果希望轉(zhuǎn)換傳入的消息并將其進(jìn)一步傳遞給下一個(gè)訂閱服務(wù),則需要實(shí)現(xiàn)Processor接口。

它既充當(dāng)訂閱服務(wù)(因?yàn)樗邮障?,又充當(dāng)發(fā)布服務(wù)(因?yàn)樗幚磉@些消息并將它們發(fā)送以進(jìn)行進(jìn)一步處理)。

發(fā)布和消費(fèi)消息

假設(shè)想要?jiǎng)?chuàng)建一個(gè)簡單的流,其中有一個(gè)發(fā)布者發(fā)布消息,一個(gè)簡單的訂閱者在消息到達(dá)時(shí)使用消息。

先創(chuàng)建一個(gè)EndSubscriber類。需要實(shí)現(xiàn)訂閱服務(wù)接口。接下來,重寫所需的方法。

onSubscribe()方法在處理開始之前被調(diào)用。

訂閱的實(shí)例subscription作為參數(shù)傳遞。Subscription是控制訂閱服務(wù)和發(fā)布服務(wù)之間的消息流的類.

  1.  1public class EndSubscriber<T> implements Subscriber<T> { 
  2.  2       // 多少消息需要消費(fèi) 
  3.  3    private final AtomicInteger howMuchMessagesToConsume; 
  4.  4    private Flow.Subscription subscription; 
  5.  5    // 保存消費(fèi)過的消息 
  6.  6    public List<T> consumedElements = new LinkedList<>(); 
  7.  7 
  8.  8    public EndSubscriber(Integer howMuchMessagesToConsume) { 
  9.  9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 
  10. 10    } 
  11. 11 
  12. 12    @Override 
  13. 13    public void onSubscribe(Flow.Subscription subscription) { 
  14. 14        this.subscription = subscription; 
  15. 15        subscription.request(1); 
  16. 16    } 
  17. 17} 

在這里還初始化了一個(gè)在測(cè)試中使用的消耗元素的空列表。

現(xiàn)在,需要從訂閱者接口實(shí)現(xiàn)其余的方法。這里的主要方法是onNext(),它在發(fā)布者發(fā)布新消息時(shí)被調(diào)用

  1. 1@Override 
  2. 2public void onNext(T item) { 
  3. 3    System.out.println("Got : " + item); 
  4. 4    consumedElements.add(item); 
  5. 5    subscription.request(1); 
  6. 6} 

這里需要注意的的是,當(dāng)在onSubscribe()方法中啟動(dòng)開始訂閱時(shí),以及當(dāng)處理消息時(shí)onNext(),需要調(diào)用subscription上的request()方法來通知當(dāng)前訂閱器準(zhǔn)備使用更多消息。

最后,需要實(shí)現(xiàn)onError(),它會(huì)在處理過程中拋出異常時(shí)被調(diào)用.

在發(fā)布者關(guān)閉時(shí)調(diào)用onComplete().

  1. 1@Override 
  2. 2public void onError(Throwable t) { 
  3. 3    t.printStackTrace(); 
  4. 4} 
  5. 6@Override 
  6. 7public void onComplete() { 
  7. 8    System.out.println("Done"); 
  8. 9} 

接下來為這個(gè)處理流編寫一個(gè)測(cè)試。將使用SubmissionPublisher類,這是java.util.concurrent中的一個(gè)類,它實(shí)現(xiàn)了Publisher接口。

測(cè)試中向發(fā)布者提交N個(gè)元素,我們的終端訂閱者會(huì)接收到這些元素。

  1.  1@Test 
  2.  2public void whenSubscribeToIt_thenShouldConsumeAll()  
  3.  3  throws InterruptedException { 
  4.  4 
  5.  5    // given 
  6.  6    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  7.  7    EndSubscriber<String> subscriber = new EndSubscriber<>(); 
  8.  8    publisher.subscribe(subscriber); 
  9.  9    List<String> items = List.of("1""x""2""x""3""x"); 
  10. 10 
  11. 11    // when 
  12. 12    assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); 
  13. 13    items.forEach(publisher::submit); 
  14. 14    publisher.close(); 
  15. 15 
  16. 16    // then 
  17. 17     await().atMost(1000, TimeUnit.MILLISECONDS) 
  18. 18       .until( 
  19. 19         () -> assertThat(subscriber.consumedElements) 
  20. 20         .containsExactlyElementsOf(items) 
  21. 21     ); 
  22. 22} 

注意,在publisher實(shí)例上調(diào)用close()方法。它將在每個(gè)訂閱者上調(diào)用onComplete()。

程序輸出如下:

  1. 1Got : 1 
  2. 2Got : x 
  3. 3Got : 2 
  4. 4Got : x 
  5. 5Got : 3 
  6. 6Got : x 
  7. 7Done 

消息的轉(zhuǎn)換

假設(shè)還希望在發(fā)布者和訂閱者之間做一些數(shù)據(jù)的轉(zhuǎn)換。

下面我創(chuàng)建一個(gè)TransformProcessor類,它實(shí)現(xiàn)了Processor并擴(kuò)展了SubmissionPublisher,因?yàn)樗瑫r(shí)包含Publisher和Subscriber。

并且將傳入一個(gè)Function將輸入轉(zhuǎn)換到輸出。

  1.  1import java.util.concurrent.Flow; 
  2.  2import java.util.concurrent.SubmissionPublisher; 
  3.  3import java.util.function.Function
  4.  4 
  5.  5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> { 
  6.  6    private Function<T,R> function
  7.  7    private Flow.Subscription subscription; 
  8.  8 
  9.  9    public TransformProcessor(Function<T, R> function) { 
  10. 10        super(); 
  11. 11        this.function = function
  12. 12    } 
  13. 13 
  14. 14    @Override 
  15. 15    public void onSubscribe(Flow.Subscription subscription) { 
  16. 16        this.subscription = subscription; 
  17. 17        subscription.request(1); 
  18. 18    } 
  19. 19 
  20. 20    @Override 
  21. 21    public void onNext(T item) { 
  22. 22        submit(function.apply(item)); 
  23. 23        subscription.request(1); 
  24. 24    } 
  25. 25 
  26. 26    @Override 
  27. 27    public void onError(Throwable t) { 
  28. 28        t.printStackTrace(); 
  29. 29    } 
  30. 30 
  31. 31    @Override 
  32. 32    public void onComplete() { 
  33. 33        close(); 
  34. 34    } 
  35. 35} 

這里的TransformProcessor將把String轉(zhuǎn)換為兩個(gè)String,看下面我寫的測(cè)試用例。

  1.  1 @Test 
  2.  2    public void whenSubscribeAndTransformElements_thenShouldConsumeAll() { 
  3.  3        // given 
  4.  4        SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  5.  5        Function<String, String> dup = x -> x.concat(x); 
  6.  6        TransformProcessor<String, String> transformProcessor 
  7.  7                = new TransformProcessor<>(dup); 
  8.  8        EndSubscriber<String> subscriber = new EndSubscriber<>(6); 
  9.  9        List<String> items = List.of("1""2""3"); 
  10. 10        List<String> expectedResult = List.of("11""22""33"); 
  11. 11        // when 
  12. 12        publisher.subscribe(transformProcessor); 
  13. 13        transformProcessor.subscribe(subscriber); 
  14. 14        items.forEach(publisher::submit); 
  15. 15        publisher.close(); 
  16. 16 
  17. 17        await().atMost(1000, TimeUnit.MILLISECONDS) 
  18. 18                .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult))); 
  19. 19    } 

使用訂閱控制消息需求

假設(shè)只想消費(fèi)第一個(gè)消息,應(yīng)用一些邏輯并完成處理??梢允褂胷equest()方法來實(shí)現(xiàn)這一點(diǎn)。

修改下代碼:

  1.  1public class EndSubscriber<T> implements Flow.Subscriber<T> { 
  2.  2    // 多少消息需要消費(fèi) 
  3.  3    private final AtomicInteger howMuchMessagesToConsume; 
  4.  4    private Flow.Subscription subscription; 
  5.  5    // 保存消費(fèi)過的消息 
  6.  6    public List<T> consumedElements = new LinkedList<>(); 
  7.  7 
  8.  8    public EndSubscriber(Integer howMuchMessagesToConsume) { 
  9.  9        this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); 
  10. 10    } 
  11. 11 
  12. 12    @Override 
  13. 13    public void onSubscribe(Flow.Subscription subscription) { 
  14. 14        this.subscription = subscription; 
  15. 15        subscription.request(1); 
  16. 16    } 
  17. 17 
  18. 18    @Override 
  19. 19    public void onNext(T item) { 
  20. 20        howMuchMessagesToConsume.decrementAndGet(); // 減一 
  21. 21        System.out.println("Got : " + item); 
  22. 22        consumedElements.add(item); 
  23. 23        if (howMuchMessagesToConsume.get() > 0) { 
  24. 24            subscription.request(1); 
  25. 25        } 
  26. 26    } 
  27. 27 
  28. 28    @Override 
  29. 29    public void onError(Throwable t) { 
  30. 30        t.printStackTrace(); 
  31. 31    } 
  32. 32 
  33. 33    @Override 
  34. 34    public void onComplete() { 
  35. 35        System.out.println("Done"); 
  36. 36    } 
  37. 37} 

測(cè)試

  1.  1@Test 
  2.  2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){ 
  3.  3    // given 
  4.  4    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
  5.  5    EndSubscriber<String> subscriber = new EndSubscriber<>(1); 
  6.  6    publisher.subscribe(subscriber); 
  7.  7    List<String> items = List.of("1""x""2""x""3""x"); 
  8.  8    List<String> expected = List.of("1"); 
  9.  9 
  10. 10    // when 
  11. 11    assertEquals(publisher.getNumberOfSubscribers(),1); 
  12. 12    items.forEach(publisher::submit); 
  13. 13    publisher.close(); 
  14. 14 
  15. 15    // then 
  16. 16    await().atMost(1000, TimeUnit.MILLISECONDS) 
  17. 17            .untilAsserted(() -> 
  18. 18                    assertTrue(subscriber.consumedElements.containsAll(expected)) 
  19. 19            ); 
  20. 20} 

盡管發(fā)布者發(fā)布了6個(gè)元素,但EndSubscriber將只使用一個(gè)元素,因?yàn)樗硎局恍枰幚磉@一個(gè)元素。

通過在Subscription上使用request()方法,我們可以實(shí)現(xiàn)更復(fù)雜的回壓機(jī)制來控制消息消費(fèi)的速度。

責(zé)任編輯:武曉燕 來源: 碼小菜
相關(guān)推薦

2023-08-31 16:47:05

反應(yīng)式編程數(shù)據(jù)流

2022-08-15 09:00:00

JavaScript前端架構(gòu)

2022-03-29 07:32:38

R2DBC數(shù)據(jù)庫反應(yīng)式

2023-12-26 08:15:11

反應(yīng)式遠(yuǎn)程接口

2023-09-21 08:01:27

SpringR2DBC實(shí)現(xiàn)數(shù)據(jù)庫

2021-03-22 08:45:30

異步編程Java

2021-05-07 16:19:36

異步編程Java線程

2024-01-31 08:26:44

2015-07-30 10:05:37

Java9JShell

2023-04-10 07:44:04

java9java21java

2017-12-06 16:28:59

JDK 9JDK 8開發(fā)者

2020-02-06 19:12:36

Java函數(shù)式編程編程語言

2014-09-12 10:46:35

Java9

2013-04-01 15:38:54

異步編程異步編程模型

2015-09-16 15:11:58

C#異步編程

2011-07-27 14:10:43

javascript

2023-01-12 11:23:11

Promise異步編程

2022-09-22 08:19:26

WebFlux函數(shù)式編程

2015-07-16 09:52:40

Java9新特性軟件開發(fā)

2021-08-02 11:13:28

人工智能機(jī)器學(xué)習(xí)技術(shù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)