自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Java響應(yīng)式編程實(shí)踐與原理解析

開發(fā) 前端
在傳統(tǒng)的命令式編程模式下,程序都是按照人工編寫的指令一條條順序的同步執(zhí)行,也就是說,只有當(dāng)前指令運(yùn)行完畢,下一條指令才開始執(zhí)行。那么傳統(tǒng)的命令式編程有有些線程處理模型呢?

背景

在傳統(tǒng)的命令式編程模式下,程序都是按照人工編寫的指令一條條順序的同步執(zhí)行,也就是說,只有當(dāng)前指令運(yùn)行完畢,下一條指令才開始執(zhí)行。那么傳統(tǒng)的命令式編程有有些線程處理模型呢?

首先是同步阻塞式,在這種模型下,只有阻塞操作完成后,程序才能夠繼續(xù)執(zhí)行。而且阻塞會(huì)浪費(fèi)資源,比如等待網(wǎng)絡(luò)連接(數(shù)據(jù)庫(kù)請(qǐng)求,其他服務(wù)請(qǐng)求),就會(huì)導(dǎo)致執(zhí)行線程處于空閑狀態(tài)。

第二種就是異步阻塞式,在這種方式下一般會(huì)通過線程池,創(chuàng)建很多線程,然后針對(duì)請(qǐng)求,分配空閑的線程來處理。每個(gè)處理線程當(dāng)遇到阻塞操作時(shí),還是會(huì)中斷等待操作完成,不過相對(duì)于同步阻塞的模式,減少了任務(wù)的響應(yīng)時(shí)間。通過增加并行度,提升了資源利用率。

第三種是異步非阻塞,通過回調(diào)方法來摒棄阻塞操作帶來的資源浪費(fèi)。不過回調(diào)函數(shù)會(huì)層層嵌套,導(dǎo)致回調(diào)噩夢(mèng)(callback hell),讓可讀性變得很差。

為了利用第三種模型的優(yōu)勢(shì),同時(shí)又讓代碼維護(hù)性更高,spring社區(qū)推出了spring flux響應(yīng)式非阻塞編程。它默認(rèn)的實(shí)現(xiàn)叫projectreactor。projectreactor是JVM的完全非阻塞響應(yīng)式編程基礎(chǔ),具有高效的需求管理(以管理“背壓”的形式).它提供了可組合的異步序列API Flux(用于[0…N]元素)和 Mono(用于[0 | 1]元素),廣泛地實(shí)現(xiàn)了Reactive Extensions規(guī)范。

特點(diǎn)

響應(yīng)式編程的特點(diǎn)包括以下幾點(diǎn)。待會(huì)會(huì)通過例子給大家詳細(xì)展示下。

  • 可組合性&可讀性
  • 直到訂閱才會(huì)發(fā)生任何事情
  • 采用背壓或通過其他方式消費(fèi)者向生產(chǎn)者發(fā)出排放率過高的信號(hào)的能力
  • 具體豐富的數(shù)據(jù)流運(yùn)算符
  • 高水平但高價(jià)值的抽象是并發(fā)性不可知的

實(shí)現(xiàn)

projectreactor引入了可組合的反應(yīng)類型,它們實(shí)現(xiàn)Publisher同時(shí)也提供了豐富的操作符,尤其是 Flux 和Mono 。

Flux 表示一個(gè)0..N項(xiàng)的反應(yīng)序列,可以有 完成信號(hào)、錯(cuò)誤信息來結(jié)束整個(gè)流程。所以傳輸?shù)臄?shù)據(jù)為一個(gè)普通值、一個(gè)完成信號(hào)、一個(gè)錯(cuò)誤信號(hào)。對(duì)應(yīng)的方法為onNext()、onComplete()、onError()。

而一個(gè)Mono對(duì)象表示一個(gè)單值或空的(0..1)結(jié)果,可以認(rèn)為是一種特殊的 Flux,最多可以發(fā)出一個(gè)普通值,同樣包含onComplete()、onError()。

示例

  • 靜態(tài)數(shù)據(jù)創(chuàng)建

直接調(diào)用just()方法進(jìn)行創(chuàng)建,也可以通過一個(gè)Stream或者一個(gè)Iterable對(duì)象(比如List)。還有通過Flux靜態(tài)方法來生成,range方法(這個(gè)方法生成的是一個(gè) Integer 序列,第一個(gè)參數(shù)表示起始數(shù)字,第二個(gè)參數(shù)表示,生成的個(gè)數(shù),這里生成的數(shù)據(jù)就為1、2、3),empty() 方法就是生成一個(gè)空的序列。

Flux<String> flux1 = Flux.just("one", "two", "three");
Flux<String> flux2 = Flux.fromStream(Stream.of("one", "two", "three"));
List<String> iterable = Arrays.asList("one", "two", "three");
Flux<String> flux3 = Flux.fromIterable(iterable);
Flux<Integer> flux4 = Flux.range(1, 3);
//或者通過 #empty() 生成空數(shù)據(jù)
Flux<String> fluxEmpty = Flux.empty()

Mono 也有類似的創(chuàng)建方法,只是對(duì)于的 just() 方法是對(duì)應(yīng)只是一個(gè)參數(shù)。而 justOrEmpty() 方法會(huì)對(duì)空值進(jìn)行校驗(yàn),選擇調(diào)用 just() 或者 empty()。

//Mono 也是類型
Mono<String> monoEmpty = Mono.empty();

Mono<String> mono1 = Mono.just("one");

//justOrEmpty 可以保證傳入?yún)?shù)為空時(shí)也不會(huì)報(bào)錯(cuò)
Mono<String> mono2 = Mono.justOrEmpty(null);

  • 動(dòng)態(tài)數(shù)據(jù)創(chuàng)建

動(dòng)態(tài)數(shù)據(jù)創(chuàng)建方法主要有g(shù)enerate與create兩種方法。

對(duì)于generate 方法,在Flux中有3個(gè)重載方法,不管是哪個(gè)方法都是會(huì)包含一個(gè)循環(huán)構(gòu)造函數(shù)。在每個(gè)循環(huán)中,sink.next()方法最多被調(diào)用一次。比如在 flux_generate1 這個(gè)實(shí)例對(duì)應(yīng)的方法。循環(huán)生成1~10的序列,當(dāng)atomicInteger大于10的時(shí)候就調(diào)用complete()方法,發(fā)出信息通知訂閱者。flux_generate2 實(shí)例對(duì)應(yīng)的方法則將atomicInteger作為一個(gè)對(duì)象,在方法中進(jìn)行傳遞,并且在最后打印在控制臺(tái)上。

// generate 生成,調(diào)用 next 即生成數(shù)據(jù),complete 則是完成了整個(gè)流程
// 一個(gè)循環(huán)中只允許調(diào)用 next 方式一次
AtomicInteger atomicInteger = new AtomicInteger();
Flux<Integer> flux_generate1 = Flux.generate(sink -> {
if(atomicInteger.incrementAndGet() > 10){
sink.complete();
}
sink.next(atomicInteger.get());
});

Flux<Integer> flux_generate2 = Flux.generate(() -> 0, (integer, sink) -> {
if (++integer > 10) {
sink.complete();
}
sink.next(integer);
return integer;
}, integer -> {
System.out.println("last integer value is " + integer);
});

執(zhí)行過程解析

為了更好的理解flux的底層實(shí)現(xiàn)邏輯和編程思想,我們下面會(huì)給大家詳細(xì)的演示下flux.create方法的執(zhí)行。尤其是前面提到的直到訂閱才會(huì)發(fā)生任何事情,這句話的真實(shí)含義。

flux.create((t) -> {
t.next("create");
t.next("create1");

}).subscribe(st->{
System.out.println(st);
});

上面是我們要執(zhí)行的一段代碼。通過debug我們可以看到如下的執(zhí)行過程。

  • Flux.create方法接受一個(gè)函數(shù)式接口Consumer作為輸入?yún)?shù),在我們這個(gè)例子中就是。
(t) -> {
t.next("create");
t.next("create1");
},
public static <T> Flux<T> create(Consumer<? super FluxSink<T>> emitter) {
return create(emitter, OverflowStrategy.BUFFER);
}

  • 我們一路追蹤下去,發(fā)現(xiàn)它把我們的函數(shù)式接口賦值給了Fluxcreate對(duì)象的一個(gè)屬性source,然后就返回了。并沒有執(zhí)行這個(gè)函數(shù)式接口的邏輯。
FluxCreate(Consumer<? super FluxSink<T>> source, OverflowStrategy backpressure, FluxCreate.CreateMode createMode) {
this.source = (Consumer)Objects.requireNonNull(source, "source");
this.backpressure = (OverflowStrategy)Objects.requireNonNull(backpressure, "backpressure");
this.createMode = createMode;
}

  • 那么什么時(shí)候執(zhí)行我們的代碼邏輯呢,接著向下看。subscribe方法也是接收了一個(gè)函數(shù)式接口。
(st->{
System.out.println(st);
}
public final Disposable subscribe(Consumer<? super T> consumer) {
Objects.requireNonNull(consumer, "consumer");
return this.subscribe(consumer, (Consumer)null, (Runnable)null);
}

  • 下面我們看看調(diào)用subscribe后發(fā)生了什么。

調(diào)用subscribe函數(shù)之1

調(diào)用subscribe函數(shù)之2

調(diào)用subscribe函數(shù)之3

沒錯(cuò)就是通過subscribe出發(fā)了Flux.create里面的執(zhí)行代碼,而這個(gè)里面的每次next調(diào)用,又觸發(fā)了后面的subscriber的執(zhí)行,最終將結(jié)果打印出來。

Connected to the target VM, address: '127.0.0.1:53984', transport: 'socket'
create
create1
Disconnected from the target VM, address: '127.0.0.1:53984', transport: 'socket'
Process finished with exit code 0


責(zé)任編輯:華軒 來源: 今日頭條
相關(guān)推薦

2025-03-07 10:23:46

2019-07-01 13:34:22

vue系統(tǒng)數(shù)據(jù)

2022-06-16 13:08:30

Combine響應(yīng)式編程訂閱

2022-12-28 10:50:34

AI訓(xùn)練深度學(xué)習(xí)

2021-07-14 13:12:51

2021-07-28 20:13:04

響應(yīng)式編程

2024-12-31 08:00:32

2025-02-06 08:24:25

AQS開發(fā)Java

2016-10-21 11:04:07

JavaScript異步編程原理解析

2020-08-13 11:24:45

Java技術(shù)開發(fā)

2018-08-07 16:17:35

JavaMySQL數(shù)據(jù)庫(kù)

2023-11-29 09:00:55

ReactuseMemo

2020-12-08 08:53:53

編程ThreadPoolE線程池

2017-05-04 16:35:45

2020-08-31 07:19:57

MonoFlux Reactor

2022-08-28 09:05:34

分布式存儲(chǔ)Ceph

2023-02-28 09:07:18

ChatGPTAI

2024-03-20 10:48:09

Java 8內(nèi)存管理

2024-05-23 08:02:23

2024-09-02 16:10:19

vue2前端
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)