Java9異步編程-反應(yīng)式流使用
在本文中,主要研究下Java9+中的反應(yīng)流。簡單地說,就是使用Flow類,F(xiàn)low類包含了用于構(gòu)建響應(yīng)式流處理的主要模塊。
這里說的反應(yīng)流其實(shí)是一種非阻塞式背壓的異步流處理標(biāo)準(zhǔn)(有點(diǎn)繞口)。
該規(guī)范在響應(yīng)式宣言中定義,并且有各種各樣的實(shí)現(xiàn),例如RxJava或Akka-Streams。
Reactive API總覽
要構(gòu)建一個(gè)流,主要使用三個(gè)抽象,并將它們組合成異步處理邏輯。
每個(gè)流都需要處理由Publisher實(shí)例發(fā)布給它的事件;發(fā)布者有一個(gè)subscribe()的方法。
如果某個(gè)訂閱者希望接收發(fā)布者發(fā)布的事件,則需要使用subscribe()訂閱發(fā)布者。
消息的接收方需要實(shí)現(xiàn)訂閱者接口。一般情況下,接受者是每個(gè)Flow處理的結(jié)束,因?yàn)樗膶?shí)例不會(huì)進(jìn)一步發(fā)送消息。
可以將Subscriber看作Sink。有四個(gè)方法需要重寫onSubscribe(), onNext(), onError()和onComplete()。
如果希望轉(zhuǎn)換傳入的消息并將其進(jìn)一步傳遞給下一個(gè)訂閱服務(wù),則需要實(shí)現(xiàn)Processor接口。
它既充當(dāng)訂閱服務(wù)(因?yàn)樗邮障?,又充當(dāng)發(fā)布服務(wù)(因?yàn)樗幚磉@些消息并將它們發(fā)送以進(jìn)行進(jìn)一步處理)。
發(fā)布和消費(fèi)消息
假設(shè)想要?jiǎng)?chuàng)建一個(gè)簡單的流,其中有一個(gè)發(fā)布者發(fā)布消息,一個(gè)簡單的訂閱者在消息到達(dá)時(shí)使用消息。
先創(chuàng)建一個(gè)EndSubscriber類。需要實(shí)現(xiàn)訂閱服務(wù)接口。接下來,重寫所需的方法。
onSubscribe()方法在處理開始之前被調(diào)用。
訂閱的實(shí)例subscription作為參數(shù)傳遞。Subscription是控制訂閱服務(wù)和發(fā)布服務(wù)之間的消息流的類.
- 1public class EndSubscriber<T> implements Subscriber<T> {
- 2 // 多少消息需要消費(fèi)
- 3 private final AtomicInteger howMuchMessagesToConsume;
- 4 private Flow.Subscription subscription;
- 5 // 保存消費(fèi)過的消息
- 6 public List<T> consumedElements = new LinkedList<>();
- 7
- 8 public EndSubscriber(Integer howMuchMessagesToConsume) {
- 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume);
- 10 }
- 11
- 12 @Override
- 13 public void onSubscribe(Flow.Subscription subscription) {
- 14 this.subscription = subscription;
- 15 subscription.request(1);
- 16 }
- 17}
在這里還初始化了一個(gè)在測(cè)試中使用的消耗元素的空列表。
現(xiàn)在,需要從訂閱者接口實(shí)現(xiàn)其余的方法。這里的主要方法是onNext(),它在發(fā)布者發(fā)布新消息時(shí)被調(diào)用
- 1@Override
- 2public void onNext(T item) {
- 3 System.out.println("Got : " + item);
- 4 consumedElements.add(item);
- 5 subscription.request(1);
- 6}
這里需要注意的的是,當(dāng)在onSubscribe()方法中啟動(dòng)開始訂閱時(shí),以及當(dāng)處理消息時(shí)onNext(),需要調(diào)用subscription上的request()方法來通知當(dāng)前訂閱器準(zhǔn)備使用更多消息。
最后,需要實(shí)現(xiàn)onError(),它會(huì)在處理過程中拋出異常時(shí)被調(diào)用.
在發(fā)布者關(guān)閉時(shí)調(diào)用onComplete().
- 1@Override
- 2public void onError(Throwable t) {
- 3 t.printStackTrace();
- 4}
- 5
- 6@Override
- 7public void onComplete() {
- 8 System.out.println("Done");
- 9}
接下來為這個(gè)處理流編寫一個(gè)測(cè)試。將使用SubmissionPublisher類,這是java.util.concurrent中的一個(gè)類,它實(shí)現(xiàn)了Publisher接口。
測(cè)試中向發(fā)布者提交N個(gè)元素,我們的終端訂閱者會(huì)接收到這些元素。
- 1@Test
- 2public void whenSubscribeToIt_thenShouldConsumeAll()
- 3 throws InterruptedException {
- 4
- 5 // given
- 6 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 7 EndSubscriber<String> subscriber = new EndSubscriber<>();
- 8 publisher.subscribe(subscriber);
- 9 List<String> items = List.of("1", "x", "2", "x", "3", "x");
- 10
- 11 // when
- 12 assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
- 13 items.forEach(publisher::submit);
- 14 publisher.close();
- 15
- 16 // then
- 17 await().atMost(1000, TimeUnit.MILLISECONDS)
- 18 .until(
- 19 () -> assertThat(subscriber.consumedElements)
- 20 .containsExactlyElementsOf(items)
- 21 );
- 22}
注意,在publisher實(shí)例上調(diào)用close()方法。它將在每個(gè)訂閱者上調(diào)用onComplete()。
程序輸出如下:
- 1Got : 1
- 2Got : x
- 3Got : 2
- 4Got : x
- 5Got : 3
- 6Got : x
- 7Done
消息的轉(zhuǎn)換
假設(shè)還希望在發(fā)布者和訂閱者之間做一些數(shù)據(jù)的轉(zhuǎn)換。
下面我創(chuàng)建一個(gè)TransformProcessor類,它實(shí)現(xiàn)了Processor并擴(kuò)展了SubmissionPublisher,因?yàn)樗瑫r(shí)包含Publisher和Subscriber。
并且將傳入一個(gè)Function將輸入轉(zhuǎn)換到輸出。
- 1import java.util.concurrent.Flow;
- 2import java.util.concurrent.SubmissionPublisher;
- 3import java.util.function.Function;
- 4
- 5public class TransformProcessor<T,R> extends SubmissionPublisher<R> implements Flow.Processor<T,R> {
- 6 private Function<T,R> function;
- 7 private Flow.Subscription subscription;
- 8
- 9 public TransformProcessor(Function<T, R> function) {
- 10 super();
- 11 this.function = function;
- 12 }
- 13
- 14 @Override
- 15 public void onSubscribe(Flow.Subscription subscription) {
- 16 this.subscription = subscription;
- 17 subscription.request(1);
- 18 }
- 19
- 20 @Override
- 21 public void onNext(T item) {
- 22 submit(function.apply(item));
- 23 subscription.request(1);
- 24 }
- 25
- 26 @Override
- 27 public void onError(Throwable t) {
- 28 t.printStackTrace();
- 29 }
- 30
- 31 @Override
- 32 public void onComplete() {
- 33 close();
- 34 }
- 35}
這里的TransformProcessor將把String轉(zhuǎn)換為兩個(gè)String,看下面我寫的測(cè)試用例。
- 1 @Test
- 2 public void whenSubscribeAndTransformElements_thenShouldConsumeAll() {
- 3 // given
- 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 5 Function<String, String> dup = x -> x.concat(x);
- 6 TransformProcessor<String, String> transformProcessor
- 7 = new TransformProcessor<>(dup);
- 8 EndSubscriber<String> subscriber = new EndSubscriber<>(6);
- 9 List<String> items = List.of("1", "2", "3");
- 10 List<String> expectedResult = List.of("11", "22", "33");
- 11 // when
- 12 publisher.subscribe(transformProcessor);
- 13 transformProcessor.subscribe(subscriber);
- 14 items.forEach(publisher::submit);
- 15 publisher.close();
- 16
- 17 await().atMost(1000, TimeUnit.MILLISECONDS)
- 18 .untilAsserted(() -> assertTrue(subscriber.consumedElements.containsAll(expectedResult)));
- 19 }
使用訂閱控制消息需求
假設(shè)只想消費(fèi)第一個(gè)消息,應(yīng)用一些邏輯并完成處理??梢允褂胷equest()方法來實(shí)現(xiàn)這一點(diǎn)。
修改下代碼:
- 1public class EndSubscriber<T> implements Flow.Subscriber<T> {
- 2 // 多少消息需要消費(fèi)
- 3 private final AtomicInteger howMuchMessagesToConsume;
- 4 private Flow.Subscription subscription;
- 5 // 保存消費(fèi)過的消息
- 6 public List<T> consumedElements = new LinkedList<>();
- 7
- 8 public EndSubscriber(Integer howMuchMessagesToConsume) {
- 9 this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume);
- 10 }
- 11
- 12 @Override
- 13 public void onSubscribe(Flow.Subscription subscription) {
- 14 this.subscription = subscription;
- 15 subscription.request(1);
- 16 }
- 17
- 18 @Override
- 19 public void onNext(T item) {
- 20 howMuchMessagesToConsume.decrementAndGet(); // 減一
- 21 System.out.println("Got : " + item);
- 22 consumedElements.add(item);
- 23 if (howMuchMessagesToConsume.get() > 0) {
- 24 subscription.request(1);
- 25 }
- 26 }
- 27
- 28 @Override
- 29 public void onError(Throwable t) {
- 30 t.printStackTrace();
- 31 }
- 32
- 33 @Override
- 34 public void onComplete() {
- 35 System.out.println("Done");
- 36 }
- 37}
測(cè)試
- 1@Test
- 2public void whenRequestForOnlyOneElement_thenShouldConsumeOne(){
- 3 // given
- 4 SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
- 5 EndSubscriber<String> subscriber = new EndSubscriber<>(1);
- 6 publisher.subscribe(subscriber);
- 7 List<String> items = List.of("1", "x", "2", "x", "3", "x");
- 8 List<String> expected = List.of("1");
- 9
- 10 // when
- 11 assertEquals(publisher.getNumberOfSubscribers(),1);
- 12 items.forEach(publisher::submit);
- 13 publisher.close();
- 14
- 15 // then
- 16 await().atMost(1000, TimeUnit.MILLISECONDS)
- 17 .untilAsserted(() ->
- 18 assertTrue(subscriber.consumedElements.containsAll(expected))
- 19 );
- 20}
盡管發(fā)布者發(fā)布了6個(gè)元素,但EndSubscriber將只使用一個(gè)元素,因?yàn)樗硎局恍枰幚磉@一個(gè)元素。
通過在Subscription上使用request()方法,我們可以實(shí)現(xiàn)更復(fù)雜的回壓機(jī)制來控制消息消費(fèi)的速度。