響應(yīng)式異步非阻塞編程在服務(wù)端的應(yīng)用
作者 | 搜狐視頻 趙文浩
1、前言
對(duì)于服務(wù)端的開發(fā)者而言,我們總有一個(gè)共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。作為Javaer我們,服務(wù)性能優(yōu)化的武器庫中,異步和并發(fā)是永遠(yuǎn)不會(huì)過時(shí)的兩個(gè)。
然而理想很美好,現(xiàn)實(shí)很骨感:
- 異步編程的思維方式同大腦的命令式思維是背道而馳的。在Java的世界中,直到目前Jdk17,也沒有async/await來幫我們解決回調(diào)地獄的問題,強(qiáng)行大量異步,將深陷回調(diào)地獄而不能解脫...
- 并發(fā)調(diào)用方面,大量編排異步線程任務(wù),不僅會(huì)造成資源的快速消耗,也會(huì)導(dǎo)致業(yè)務(wù)流程的實(shí)現(xiàn)難以理解,正所謂:寫這段代碼時(shí)能理解它的只有我和God,一個(gè)月后能理解它的就只有God了...。
在服務(wù)端引入響應(yīng)式編程,是解決如上問題的一個(gè)好的思路。
下面,我以搜狐視頻服務(wù)端PUGC團(tuán)隊(duì)在PUGC視頻對(duì)象創(chuàng)建接口的重構(gòu)工作的實(shí)踐為背景,介紹響應(yīng)式(基于RxJava)異步非阻塞編程在服務(wù)端的應(yīng)用在服務(wù)端的應(yīng)用。
2、問題概述
PUGC視頻對(duì)象創(chuàng)建接口,從業(yè)務(wù)角度看:用于為用戶上傳視頻數(shù)據(jù)前在服務(wù)端為其分配一個(gè)視頻對(duì)象記錄,該視頻對(duì)象記錄被用于描述當(dāng)前視頻的完整生命周期,從技術(shù)角度看:它是一個(gè)聚合接口,通過組合多個(gè)上游接口數(shù)據(jù),主業(yè)務(wù)過程涉及:帳號(hào)、內(nèi)容審核、視頻對(duì)象存儲(chǔ)、轉(zhuǎn)碼、CDN調(diào)度等,實(shí)現(xiàn)業(yè)務(wù)過程。
該接口代碼年代久遠(yuǎn),從提交記錄中可查到的最早的歷史在2013年,隨著業(yè)務(wù)的變化,開發(fā)人員的變更,代碼中充斥著各種各樣的味道。不論是從業(yè)務(wù)角度、性能角度亦或是日常維護(hù)角度看,都難以滿足需要。
- 業(yè)務(wù)流處理過程邏輯冗長,邏輯復(fù)雜,可讀性差,難于維護(hù)。
- 并發(fā)任務(wù)較多,但缺少合理的編排措施,是否需要異步或并發(fā)控制完全隨意。
- 幾乎沒有正確的異常處理。
- 充滿Ctrl+C/V的味道
- ......
為了解決以上的諸多問題,我們開始了重構(gòu)(重寫)之路。
3、讀懂業(yè)務(wù)流
重構(gòu)的原則是保證接口實(shí)現(xiàn)的業(yè)務(wù)規(guī)則的一致性,通過仔細(xì)研讀代碼,整理出接口中實(shí)現(xiàn)的諸多特性和業(yè)務(wù)規(guī)則。
視頻對(duì)象創(chuàng)建主流程如下圖所示:
注:時(shí)序圖中描述的是主流程中的關(guān)鍵點(diǎn),因篇幅所限并未列出每個(gè)調(diào)用的具體細(xì)節(jié)
從用戶請(qǐng)求到達(dá)服務(wù)端開始劃分業(yè)務(wù)執(zhí)行階段:
- 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
- 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
- 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段:
- 3.1 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對(duì)象擴(kuò)展
- 3.2 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它
- 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)
- 5 結(jié)束
這其中每個(gè)階段的內(nèi)容都需要通過若干個(gè)上游接口調(diào)用協(xié)作來完成,因篇幅所限,并未完全描述每個(gè)階段的具體實(shí)現(xiàn)細(xì)節(jié)。
4、選擇合適的架構(gòu)
通過分析業(yè)務(wù)流的特點(diǎn):
- 業(yè)務(wù)流程鏈路較長
- 需要協(xié)作的上游接口較多
- IO操作為主
結(jié)合接口重構(gòu)核心目標(biāo):
- 代碼應(yīng)該準(zhǔn)確描述業(yè)務(wù)流
- 合理的并發(fā)任務(wù)編排
- 準(zhǔn)確異常處理
- 降低資源占用,提升接口性能
我們決定基于響應(yīng)式異步非阻塞架構(gòu)對(duì)接口進(jìn)行重構(gòu):
- 響應(yīng)式編程是面向數(shù)據(jù)流的,業(yè)務(wù)流的特點(diǎn)可知,它有明確的階段性,數(shù)據(jù)在每個(gè)階段變換和流動(dòng)
- 整個(gè)過程涉及較多的接口調(diào)用和異步任務(wù)編排,基于異步非阻塞可顯著減少異步線程的依賴
- 響應(yīng)式編程中提供了完善的異常處理機(jī)制,特別對(duì)異步環(huán)境下的異常處理非常友好
涉及的基礎(chǔ)組件
- 核心響應(yīng)式編程框架: ReactiveX/RxJava
- Http客戶端 Vertx WebClient
- CacheCloud響應(yīng)式客戶端 sohutv-basic / cachecloud-client
- Dubbo響應(yīng)式客戶端sohutv-basic / dubbo-reactive-consumer
同步阻塞模式適配響應(yīng)式異步非阻塞
因后續(xù)章節(jié)代碼示例中主要以dubbo服務(wù)接口調(diào)用為主,所以我以視頻服務(wù)端團(tuán)隊(duì)基于dubbo-2.6.5版本實(shí)現(xiàn)的dubbo響應(yīng)式客戶端dubbo-reactive-consumer為例,介紹將傳統(tǒng)的同步阻塞模式適配響應(yīng)式異步非阻塞的思路。
Dubbo是目前視頻服務(wù)端用使用的較多的RPC框架,目前最新的版本是Dubbo3.x(注:由于歷史原因,目前團(tuán)隊(duì)使用的版本還停留在2.6.5版本)。在Spring環(huán)境中,比較簡便的注入dubbo服務(wù)接口代理對(duì)象(簡稱接口對(duì)象)的方式是通過@com.alibaba.dubbo.config.annotation.Reference注解自動(dòng)注入,示例如下:
@Component
public class DubboSyncExample {
@Reference
private VideoInfoService videoInfoService;
@Nullable
public VideoInfo getVideoInfo(long vid) throws Exception {
return videoInfoService.getVideoInfo(vid);
}
}
默認(rèn)情況下,這種方式注入的是基于同步阻塞模式接口對(duì)象,由于dubbo客戶端基于Netty實(shí)現(xiàn),所以它天生是是異步的。一般情況下,如果希望通過異步方式使用dubbo客戶端可以按如下方式操作:
@Component
public class DubboAsyncExample {
/**
* 標(biāo)記為異步調(diào)用
*/
@Reference(async = true)
private VideoInfoService videoInfoService;
@NotNull
public Future<VideoInfo> getVideoInfo(long vid) throws Exception {
// 提交異步請(qǐng)求
videoInfoService.getVideoInfo(vid);
// 通過ThreadLocal保存了對(duì)當(dāng)前請(qǐng)求上下文的引用RpcContext
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取當(dāng)前請(qǐng)求的Future<T>
Future<VideoInfo> future = rpcContext.getFuture();
return future;
}
}
為支持響應(yīng)式框架,同時(shí)保持通過@com.alibaba.dubbo.config.annotation.Reference注解自動(dòng)注入接口對(duì)象的方式,我們的實(shí)現(xiàn)過程如下:
將接口對(duì)象包裝成響應(yīng)式引用類型:ReactiveReference<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import java.util.Optional;
public interface ReactiveReference<Service> {
@NotNull
<T> Maybe<T> maybe(@NotNull Function<Service, T> f);
@NotNull
<T> Single<Optional<T>> single(@NotNull Function<Service, T> f);
}
接口中提供了兩個(gè)方法用到了整個(gè)單值類型的響應(yīng)式流:
- Maybe<T>: 當(dāng)向下游Observer發(fā)射的對(duì)象為null時(shí),會(huì)結(jié)束整個(gè)流的生命周期
- Single<T>: 在整個(gè)流的生命周期中不允許發(fā)射null對(duì)象
實(shí)現(xiàn)響應(yīng)式引用實(shí)現(xiàn)ReactiveReferenceImpl<Service>
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;
import java.util.Optional;
final class RxJavaReactiveReferenceImpl<Service> implements ReactiveReference<Service> {
@NotNull
final Service service;
RxJavaReactiveReferenceImpl(@NotNull Service service) {
this.service = service;
}
/**
* maybe
*/
@Override
@NotNull
public <T> Maybe<T> maybe(@NotNull Function<Service, T> f) {
// 將Single<Optional<T>>解包成Maybe<T>
return this.single(f).mapOptional($ -> $);
}
/**
* single
* @param actual 實(shí)際的調(diào)用
*/
@Override
@NotNull
public <T> Single<Optional<T>> single(@NotNull Function<Service, T> actual) {
// 延遲創(chuàng)建一個(gè)Single流
return Single.defer(() ->
Single.create(
emitter -> {
DubboSubscription<T> subscription =
new DubboSubscription<>(
() -> actual.apply(service), // 將實(shí)際調(diào)用包裝成 java.util.concurrent.Callable<T>
emitter::onSuccess, // 執(zhí)行成功向下游發(fā)射結(jié)果
emitter::onError // 執(zhí)行異常,向下游發(fā)射異常信息
);
emitter.setCancellable(subscription::cancel); // 支持流取消時(shí)的回調(diào)
subscription.request(1L);
}
)
);
}
}
DubboSubscription<T>用于同Dubbo框架橋接,實(shí)現(xiàn)如下:
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
public class DubboSubscription<T> implements Subscription {
/**
* dubbo rpc調(diào)用的結(jié)果Future引用,用于維護(hù)生命周期
* 當(dāng)響應(yīng)式流被取消時(shí)需要取消這個(gè)Future
*/
@NotNull
private final AtomicReference<Future<T>> futureRef = new AtomicReference<>();
/**
* 實(shí)際的接口調(diào)用
*/
@NotNull
private final Callable<T> actual;
/**
* 當(dāng)rpc調(diào)用成功時(shí)執(zhí)行
*/
@NotNull
private final Consumer<Optional<T>> onNext;
/**
* 當(dāng)rpc調(diào)用異常時(shí)執(zhí)行
*/
@NotNull
private final Consumer<Throwable> onError;
public DubboSubscription(@NotNull Callable<T> actual, @NotNull Consumer<Optional<T>> onNext, @NotNull Consumer<Throwable> onError) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
}
@Override
public void request(long n) {
try {
RpcContext rpcContext = RpcContext.getContext();
// 異步執(zhí)行dubbo rpc請(qǐng)求
this.actual.call();
// 取到當(dāng)前請(qǐng)求的Future<T>
Future<T> future = rpcContext.getFuture();
if (!(future instanceof FutureAdapter)) {
// Future<T>類型驗(yàn)證,要求是 com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter,
// 因?yàn)镕utureAdapter包裝了 com.alibaba.dubbo.remoting.exchange.ResponseFuture對(duì)象,它支持設(shè)置結(jié)果回調(diào)
throw new UnsupportedOperationException(String.format("The future is not a instance of [%s]", FutureAdapter.class.getName()));
}
futureRef.set(future); // 設(shè)置Future引用
FutureAdapter<T> futureAdapter = (FutureAdapter<T>) future;
futureAdapter
.getFuture()
.setCallback(
new ResponseCallback() {
@SuppressWarnings("unchecked")
@Override
public void done(Object response) {
try {
T t = (T) ((Result) response).recreate(); // 結(jié)果對(duì)象反序列化
DubboSubscription.this.onNext.accept(Optional.ofNullable(t)); // dubbo rpc調(diào)用成功
} catch (Throwable e) {
DubboSubscription.this.onError(e); // 一般為反序列化操作引起的異常
}
}
@Override
public void caught(Throwable e) {
// // dubbo rpc調(diào)用異常
DubboSubscription.this.onError(e);
}
}
);
} catch (Throwable e) {
// 其它異常
onError(e);
}
}
private void onError(Throwable e) {
// 執(zhí)行onError回調(diào)
DubboSubscription.this.onError.accept(e);
}
/**
* 響應(yīng)式流取消時(shí)回調(diào)
*/
@Override
public void cancel() {
Future<T> future = futureRef.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
}
于是基于我們的響應(yīng)式dubbo客戶端,服務(wù)接口的調(diào)用方式為:
@Component
public class DubboReactiveExample {
@Reference
private ReactiveReference<VideoInfoService> videoInfoService;
/**
* 響應(yīng)式請(qǐng)求
*/
@NotNull
public Maybe<VideoInfo> maybeVideoInfo(long vid) {
// 提交響應(yīng)式請(qǐng)求
return videoInfoService.maybe(service -> service.getVideoInfo(vid));
}
/**
* 響應(yīng)式請(qǐng)求
*/
@NotNull
public Single<Optional<VideoInfo>> singleVideoInfo(long vid) {
// 提交響應(yīng)式請(qǐng)求
return videoInfoService.single(service -> service.getVideoInfo(vid));
}
}
注:此處的實(shí)現(xiàn)為常規(guī)的Spring生命周期處理,與本文無關(guān),具體實(shí)現(xiàn)細(xì)節(jié)不再贅述
擴(kuò)展com.alibaba.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor使dubbo自動(dòng)注入框架支持ReactiveReference<Service>類型,自動(dòng)設(shè)置為異步模式,且向目標(biāo)Component中注入的實(shí)際類型為ReactiveReferenceImpl<Service>
5、實(shí)踐過程
我們?cè)谧x懂業(yè)務(wù)流一節(jié)中對(duì)視頻對(duì)象創(chuàng)建主流程做了說明,然后將主流程分成了5個(gè)階段,現(xiàn)在我們使用RxJava代碼來描述這5個(gè)階段(注:后續(xù)代碼僅用于演示主流程的實(shí)現(xiàn)過程,并不是完整的實(shí)現(xiàn)過程)。
輸入?yún)?shù)Args
- 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,Args被設(shè)計(jì)為不可變
/**
* 入?yún)b
* 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,Args被設(shè)計(jì)為不可變
*/
@Builder
@Getter
public class Args {
@NotNull
private final long accountId;
private final String title;
private final String coverImg;
private final String other;
/**
* 視頻內(nèi)容校驗(yàn)值
*/
private final String signature;
... // other fields
}
過程結(jié)果PassObj
- 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,PassObj被設(shè)計(jì)為不可變
/**
* 響應(yīng)式流處理過程對(duì)象包裝
* 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,PassObj被設(shè)計(jì)為不可變
*/
@Builder(toBuilder = true)
@Getter
public class PassObj {
private final Args args;
private final Account account;
private final boolean argsAccepted;
private final VideoInfo videoInfo;
private final VideoInfoExtend videoInfoExtend;
private final PugcInfo pugcInfo;
private final CDNNodeInfo cdnNodeInfo;
... // other fields
}
響應(yīng)式流主干
- 清晰地表達(dá)了業(yè)務(wù)流的執(zhí)行過程
@NotNull
public Single<PassObj> create(@NotNull Args inputArgs) {
return Single.fromCallable(() -> PassObj.builder().args(inputArgs).build())
.flatMap(this::checkStage) // 1 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
.flatMap(this::createVideoObjectStage)// 2 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
.flatMap(this::createCorrelateObjectStage)// 3 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段
.flatMap(this::cdnDispatchStage)// 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)\
.doOnSuccess(passObj -> { // 5 結(jié)束
// 視頻對(duì)象創(chuàng)建成功
})
.doOnError(e -> log.error(e.getMessage(), e))
;
}
每個(gè)階段的詳細(xì)實(shí)現(xiàn):
數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
/**
* 1 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
*/
@NotNull
public Single<PassObj> checkStage(@NotNull PassObj passObj) {
return this.checkAndGetAccount(passObj) // 驗(yàn)證并獲取account對(duì)象
.map(account -> passObj.toBuilder().account(account).build())// 帳號(hào)狀態(tài)校驗(yàn)通過, 重建passObj,并裝配account
.flatMap(_passObj ->
this.checkInputArgs(_passObj) // 入?yún)⒑弦?guī)性校驗(yàn)
.map(argsAccepted -> _passObj.toBuilder().argsAccepted(argsAccepted).build())// 入?yún)⒑弦?guī)性校驗(yàn)通過, 重建passObj,并裝配校驗(yàn)結(jié)果
)
.flatMap(_passObj -> // 視頻元數(shù)據(jù)信息校驗(yàn)
this.videoMetaService.single(s -> s.getVideoMetaBySignature(passObj.getArgs().getSignature()))
.map($videoMeta -> {
if ($videoMeta.isPresent()) {
// 視頻元數(shù)據(jù)信息存在,說明視頻上傳是重復(fù)的,執(zhí)行相關(guān)的異常處理
throw new VideoDuplicatedException();
}
// 視頻沒有重復(fù),可以創(chuàng)建,向后傳遞passObj
return _passObj;
})
)
;
}
- 驗(yàn)證并獲取Account對(duì)象
/**
* 驗(yàn)證并獲取Account對(duì)象
*/
@NotNull
private Single<Account> checkAndGetAccount(@NotNull PassObj passObj) {
return Maybe.fromCallable(() -> passObj.getArgs().getAccountId())// 取accountId
.filter(accountId -> accountId > 0L) // 取基本有效的值
.flatMap(accountId -> this.accountService.maybe(s -> s.checkAccount(accountId)))// 驗(yàn)證帳號(hào)狀態(tài)
.defaultIfEmpty(false)
.doOnSuccess(accountAccepted -> {
if (!accountAccepted) {
// 帳號(hào)狀態(tài)校驗(yàn)失敗
// 通過明確的異常來處理失敗過程
throw new AccountRejectedException();
}
})
.flatMapMaybe(_accountAccepted -> this.accountService.maybe(s -> s.getAccount(passObj.getArgs().getAccountId())))
.switchIfEmpty(Single.error(AccountObjectNullPointerException::new))
;
}
- 入?yún)⒑弦?guī)性校驗(yàn)
/**
* 入?yún)⒑弦?guī)性校驗(yàn)
*/
private Single<Boolean> checkInputArgs(@NotNull PassObj passObj) { // 入?yún)⒑弦?guī)性校驗(yàn)可并發(fā)執(zhí)行
return Single.zip(
this.contentAuditService.maybe(s -> s.titleCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(TitleRejectedException::new)), // title
this.contentAuditService.maybe(s -> s.coverImgCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(CoverImgRejectedException::new)), // coverImage
this.contentAuditService.maybe(s -> s.otherCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(OtherRejectedException::new)), // others
(titleAccepted, coverImgAccepted, otherAccepted) -> titleAccepted && coverImgAccepted && otherAccepted
);
}
對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
/**
* 2 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
*/
@NotNull
public Single<PassObj> createVideoObjectStage(@NotNull PassObj passObj) {
return Single.fromCallable(() ->
// 包裝VideoInfo對(duì)象
VideoInfo.builder()
.userId(passObj.getAccount().getId())
.title(passObj.getArgs().getTitle())
.coverImg(passObj.getArgs().getCoverImg())
.other(passObj.getArgs().getOther())
.build()
)
.flatMapMaybe(videoInfo ->
this.videoInfoService.maybe(s -> s.createVideoInfo(videoInfo)) // 保存視頻對(duì)象
.filter(createdVideoInfo -> createdVideoInfo.getId() > 0L) // 驗(yàn)證保存結(jié)果
)
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoCreateException::new)) // 異常處理
.map(createdVideoInfo -> passObj.toBuilder().videoInfo(createdVideoInfo).build()) // 將保存結(jié)果裝配到passObj中
;
}
關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段
/**
* 3 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段
*/
@NotNull
public Single<PassObj> createCorrelateObjectStage(@NotNull PassObj passObj) {
return Single.zip( // 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對(duì)象擴(kuò)展 可并發(fā)執(zhí)行
Single.fromCallable(() -> // 視頻對(duì)象擴(kuò)展
VideoInfoExtend.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(videoInfoExtend ->
this.videoInfoService.maybe(s -> s.createVideoInfoExtend(videoInfoExtend))
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoExtendCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoExtendCreateException::new)) // 異常處理
),
Single.fromCallable(() -> // 自媒體業(yè)務(wù)
PugcInfo.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(pugcInfo ->
this.pugcService.maybe(s -> s.createPugcInfo(pugcInfo))
.onErrorResumeNext(e -> Maybe.error(() -> new PugcInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(PugcInfoCreateException::new)) // 異常處理
),
(createdVideoInfoExtend, createdPugcInfo) ->
passObj.toBuilder()
.videoInfoExtend(createdVideoInfoExtend)
.pugcInfo(createdPugcInfo)
.build()
)
.doOnSuccess(updatedPassObj -> { // 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它 不關(guān)注結(jié)果,異步執(zhí)行
Maybe.fromCallable(() ->
VideoMeta.builder()
.signature(updatedPassObj.getArgs().getSignature())
.build()
)
.flatMap(videoMeta -> this.videoMetaService.maybe(s -> s.createVideoMeta(videoMeta)))
.doOnError(e -> log.error(e.getMessage(), e))
.subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoKeyFrame(updatedPassObj.getVideoInfo())).subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoResolution(updatedPassObj.getVideoInfo())).subscribe();
this.otherOptionalService.maybe(s -> s.doSomething(updatedPassObj.getVideoInfo())).subscribe();
})
;
}
上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)
/**
* 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)
*/
@NotNull
public Single<PassObj> cdnDispatchStage(@NotNull PassObj passObj) {
return cdnDispatchService.maybe(CDNDispatchService::dispatch) // 調(diào)度一個(gè)CDN上傳節(jié)點(diǎn)
.onErrorResumeNext(e -> Maybe.error(() -> new CDNDispatchException(e))) // 調(diào)用異常
.switchIfEmpty(Single.error(CDNDispatchException::new)) // 沒有取到節(jié)點(diǎn), 異常處理
.map(cdnNodeInfo ->
passObj.toBuilder()
.cdnNodeInfo(cdnNodeInfo)
.build()
)
;
}
6、總結(jié)
以上內(nèi)容是搜狐視頻服務(wù)端PUGC團(tuán)隊(duì),首次在核心業(yè)務(wù)接口中應(yīng)用響應(yīng)式異步非阻塞架構(gòu)的思考和實(shí)施過程,文中主要闡述了兩個(gè)內(nèi)容:
- Dubbo響應(yīng)式客戶端的實(shí)現(xiàn)過程
- 視頻對(duì)象創(chuàng)建接口的業(yè)務(wù)分析和實(shí)現(xiàn)過程
通過本次重構(gòu),我們獲得了如下收益:
- 重新梳理業(yè)務(wù)流,將業(yè)務(wù)流同響應(yīng)式流整合,基于響應(yīng)式編程規(guī)范業(yè)務(wù)流的實(shí)現(xiàn)過程
- 視頻對(duì)象創(chuàng)建接口平均響應(yīng)時(shí)間從256ms降低到146ms,性能提供顯著
結(jié)束,感謝閱讀!