自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

響應(yīng)式異步非阻塞編程在服務(wù)端的應(yīng)用

開發(fā)
對(duì)于服務(wù)端的開發(fā)者而言,我們總有一個(gè)共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。

作者 | 搜狐視頻 趙文浩

1、前言

對(duì)于服務(wù)端的開發(fā)者而言,我們總有一個(gè)共同的目標(biāo),那就是如何用更少的資源獲得足夠的性能來支持我們的服務(wù)!,我們不是在性能優(yōu)化中,就是在性能優(yōu)化的路上。作為Javaer我們,服務(wù)性能優(yōu)化的武器庫中,異步和并發(fā)是永遠(yuǎn)不會(huì)過時(shí)的兩個(gè)。

然而理想很美好,現(xiàn)實(shí)很骨感:

  • 異步編程的思維方式同大腦的命令式思維是背道而馳的。在Java的世界中,直到目前Jdk17,也沒有async/await來幫我們解決回調(diào)地獄的問題,強(qiáng)行大量異步,將深陷回調(diào)地獄而不能解脫...
  • 并發(fā)調(diào)用方面,大量編排異步線程任務(wù),不僅會(huì)造成資源的快速消耗,也會(huì)導(dǎo)致業(yè)務(wù)流程的實(shí)現(xiàn)難以理解,正所謂:寫這段代碼時(shí)能理解它的只有我和God,一個(gè)月后能理解它的就只有God了...。

在服務(wù)端引入響應(yīng)式編程,是解決如上問題的一個(gè)好的思路。

下面,我以搜狐視頻服務(wù)端PUGC團(tuán)隊(duì)在PUGC視頻對(duì)象創(chuàng)建接口的重構(gòu)工作的實(shí)踐為背景,介紹響應(yīng)式(基于RxJava)異步非阻塞編程在服務(wù)端的應(yīng)用在服務(wù)端的應(yīng)用。

2、問題概述

PUGC視頻對(duì)象創(chuàng)建接口,從業(yè)務(wù)角度看:用于為用戶上傳視頻數(shù)據(jù)前在服務(wù)端為其分配一個(gè)視頻對(duì)象記錄,該視頻對(duì)象記錄被用于描述當(dāng)前視頻的完整生命周期,從技術(shù)角度看:它是一個(gè)聚合接口,通過組合多個(gè)上游接口數(shù)據(jù),主業(yè)務(wù)過程涉及:帳號(hào)、內(nèi)容審核、視頻對(duì)象存儲(chǔ)、轉(zhuǎn)碼、CDN調(diào)度等,實(shí)現(xiàn)業(yè)務(wù)過程。

該接口代碼年代久遠(yuǎn),從提交記錄中可查到的最早的歷史在2013年,隨著業(yè)務(wù)的變化,開發(fā)人員的變更,代碼中充斥著各種各樣的味道。不論是從業(yè)務(wù)角度、性能角度亦或是日常維護(hù)角度看,都難以滿足需要。

  • 業(yè)務(wù)流處理過程邏輯冗長,邏輯復(fù)雜,可讀性差,難于維護(hù)。
  • 并發(fā)任務(wù)較多,但缺少合理的編排措施,是否需要異步或并發(fā)控制完全隨意。
  • 幾乎沒有正確的異常處理。
  • 充滿Ctrl+C/V的味道
  • ......

為了解決以上的諸多問題,我們開始了重構(gòu)(重寫)之路。

3、讀懂業(yè)務(wù)流

重構(gòu)的原則是保證接口實(shí)現(xiàn)的業(yè)務(wù)規(guī)則的一致性,通過仔細(xì)研讀代碼,整理出接口中實(shí)現(xiàn)的諸多特性和業(yè)務(wù)規(guī)則。

視頻對(duì)象創(chuàng)建主流程如下圖所示:

圖片

 注:時(shí)序圖中描述的是主流程中的關(guān)鍵點(diǎn),因篇幅所限并未列出每個(gè)調(diào)用的具體細(xì)節(jié)

從用戶請(qǐng)求到達(dá)服務(wù)端開始劃分業(yè)務(wù)執(zhí)行階段:

  1. 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
  2. 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
  3. 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段:
  • 3.1 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對(duì)象擴(kuò)展
  • 3.2 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它
  • 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)
  • 5 結(jié)束

這其中每個(gè)階段的內(nèi)容都需要通過若干個(gè)上游接口調(diào)用協(xié)作來完成,因篇幅所限,并未完全描述每個(gè)階段的具體實(shí)現(xiàn)細(xì)節(jié)。

4、選擇合適的架構(gòu)

通過分析業(yè)務(wù)流的特點(diǎn):

  • 業(yè)務(wù)流程鏈路較長
  • 需要協(xié)作的上游接口較多
  • IO操作為主

結(jié)合接口重構(gòu)核心目標(biāo):

  • 代碼應(yīng)該準(zhǔn)確描述業(yè)務(wù)流
  • 合理的并發(fā)任務(wù)編排
  • 準(zhǔn)確異常處理
  • 降低資源占用,提升接口性能

我們決定基于響應(yīng)式異步非阻塞架構(gòu)對(duì)接口進(jìn)行重構(gòu):

  • 響應(yīng)式編程是面向數(shù)據(jù)流的,業(yè)務(wù)流的特點(diǎn)可知,它有明確的階段性,數(shù)據(jù)在每個(gè)階段變換和流動(dòng)
  • 整個(gè)過程涉及較多的接口調(diào)用和異步任務(wù)編排,基于異步非阻塞可顯著減少異步線程的依賴
  • 響應(yīng)式編程中提供了完善的異常處理機(jī)制,特別對(duì)異步環(huán)境下的異常處理非常友好

涉及的基礎(chǔ)組件

  • 核心響應(yīng)式編程框架: ReactiveX/RxJava
  • Http客戶端 Vertx WebClient
  • CacheCloud響應(yīng)式客戶端 sohutv-basic / cachecloud-client
  • Dubbo響應(yīng)式客戶端sohutv-basic / dubbo-reactive-consumer

同步阻塞模式適配響應(yīng)式異步非阻塞

因后續(xù)章節(jié)代碼示例中主要以dubbo服務(wù)接口調(diào)用為主,所以我以視頻服務(wù)端團(tuán)隊(duì)基于dubbo-2.6.5版本實(shí)現(xiàn)的dubbo響應(yīng)式客戶端dubbo-reactive-consumer為例,介紹將傳統(tǒng)的同步阻塞模式適配響應(yīng)式異步非阻塞的思路。

Dubbo是目前視頻服務(wù)端用使用的較多的RPC框架,目前最新的版本是Dubbo3.x(注:由于歷史原因,目前團(tuán)隊(duì)使用的版本還停留在2.6.5版本)。在Spring環(huán)境中,比較簡便的注入dubbo服務(wù)接口代理對(duì)象(簡稱接口對(duì)象)的方式是通過@com.alibaba.dubbo.config.annotation.Reference注解自動(dòng)注入,示例如下:

@Component
public class DubboSyncExample {

@Reference
private VideoInfoService videoInfoService;

@Nullable
public VideoInfo getVideoInfo(long vid) throws Exception {
return videoInfoService.getVideoInfo(vid);
}
}

默認(rèn)情況下,這種方式注入的是基于同步阻塞模式接口對(duì)象,由于dubbo客戶端基于Netty實(shí)現(xiàn),所以它天生是是異步的。一般情況下,如果希望通過異步方式使用dubbo客戶端可以按如下方式操作:

@Component
public class DubboAsyncExample {

/**
* 標(biāo)記為異步調(diào)用
*/
@Reference(async = true)
private VideoInfoService videoInfoService;

@NotNull
public Future<VideoInfo> getVideoInfo(long vid) throws Exception {
// 提交異步請(qǐng)求
videoInfoService.getVideoInfo(vid);
// 通過ThreadLocal保存了對(duì)當(dāng)前請(qǐng)求上下文的引用RpcContext
RpcContext rpcContext = RpcContext.getContext();
// 從RpcContext中獲取當(dāng)前請(qǐng)求的Future<T>
Future<VideoInfo> future = rpcContext.getFuture();
return future;
}
}

為支持響應(yīng)式框架,同時(shí)保持通過@com.alibaba.dubbo.config.annotation.Reference注解自動(dòng)注入接口對(duì)象的方式,我們的實(shí)現(xiàn)過程如下:

將接口對(duì)象包裝成響應(yīng)式引用類型:ReactiveReference<Service>

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;

import java.util.Optional;

public interface ReactiveReference<Service> {

@NotNull
<T> Maybe<T> maybe(@NotNull Function<Service, T> f);

@NotNull
<T> Single<Optional<T>> single(@NotNull Function<Service, T> f);
}

接口中提供了兩個(gè)方法用到了整個(gè)單值類型的響應(yīng)式流:

  • Maybe<T>: 當(dāng)向下游Observer發(fā)射的對(duì)象為null時(shí),會(huì)結(jié)束整個(gè)流的生命周期
  • Single<T>: 在整個(gè)流的生命周期中不允許發(fā)射null對(duì)象

實(shí)現(xiàn)響應(yīng)式引用實(shí)現(xiàn)ReactiveReferenceImpl<Service>

import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Single;
import org.jetbrains.annotations.NotNull;

import java.util.Optional;

final class RxJavaReactiveReferenceImpl<Service> implements ReactiveReference<Service> {

@NotNull
final Service service;

RxJavaReactiveReferenceImpl(@NotNull Service service) {
this.service = service;
}

/**
* maybe
*/
@Override
@NotNull
public <T> Maybe<T> maybe(@NotNull Function<Service, T> f) {
// 將Single<Optional<T>>解包成Maybe<T>
return this.single(f).mapOptional($ -> $);
}

/**
* single
* @param actual 實(shí)際的調(diào)用
*/
@Override
@NotNull
public <T> Single<Optional<T>> single(@NotNull Function<Service, T> actual) {
// 延遲創(chuàng)建一個(gè)Single流
return Single.defer(() ->
Single.create(
emitter -> {
DubboSubscription<T> subscription =
new DubboSubscription<>(
() -> actual.apply(service), // 將實(shí)際調(diào)用包裝成 java.util.concurrent.Callable<T>
emitter::onSuccess, // 執(zhí)行成功向下游發(fā)射結(jié)果
emitter::onError // 執(zhí)行異常,向下游發(fā)射異常信息
);
emitter.setCancellable(subscription::cancel); // 支持流取消時(shí)的回調(diào)
subscription.request(1L);
}
)
);
}
}

DubboSubscription<T>用于同Dubbo框架橋接,實(shí)現(xiàn)如下:

import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcContext;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import org.jetbrains.annotations.NotNull;
import org.reactivestreams.Subscription;

import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;

public class DubboSubscription<T> implements Subscription {

/**
* dubbo rpc調(diào)用的結(jié)果Future引用,用于維護(hù)生命周期
* 當(dāng)響應(yīng)式流被取消時(shí)需要取消這個(gè)Future
*/
@NotNull
private final AtomicReference<Future<T>> futureRef = new AtomicReference<>();

/**
* 實(shí)際的接口調(diào)用
*/
@NotNull
private final Callable<T> actual;
/**
* 當(dāng)rpc調(diào)用成功時(shí)執(zhí)行
*/
@NotNull
private final Consumer<Optional<T>> onNext;
/**
* 當(dāng)rpc調(diào)用異常時(shí)執(zhí)行
*/
@NotNull
private final Consumer<Throwable> onError;

public DubboSubscription(@NotNull Callable<T> actual, @NotNull Consumer<Optional<T>> onNext, @NotNull Consumer<Throwable> onError) {
this.actual = actual;
this.onNext = onNext;
this.onError = onError;
}

@Override
public void request(long n) {
try {
RpcContext rpcContext = RpcContext.getContext();
// 異步執(zhí)行dubbo rpc請(qǐng)求
this.actual.call();
// 取到當(dāng)前請(qǐng)求的Future<T>
Future<T> future = rpcContext.getFuture();
if (!(future instanceof FutureAdapter)) {
// Future<T>類型驗(yàn)證,要求是 com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter,
// 因?yàn)镕utureAdapter包裝了 com.alibaba.dubbo.remoting.exchange.ResponseFuture對(duì)象,它支持設(shè)置結(jié)果回調(diào)
throw new UnsupportedOperationException(String.format("The future is not a instance of [%s]", FutureAdapter.class.getName()));
}
futureRef.set(future); // 設(shè)置Future引用
FutureAdapter<T> futureAdapter = (FutureAdapter<T>) future;
futureAdapter
.getFuture()
.setCallback(
new ResponseCallback() {
@SuppressWarnings("unchecked")
@Override
public void done(Object response) {
try {
T t = (T) ((Result) response).recreate(); // 結(jié)果對(duì)象反序列化
DubboSubscription.this.onNext.accept(Optional.ofNullable(t)); // dubbo rpc調(diào)用成功
} catch (Throwable e) {
DubboSubscription.this.onError(e); // 一般為反序列化操作引起的異常
}
}

@Override
public void caught(Throwable e) {
// // dubbo rpc調(diào)用異常
DubboSubscription.this.onError(e);
}
}
);
} catch (Throwable e) {
// 其它異常
onError(e);
}
}

private void onError(Throwable e) {
// 執(zhí)行onError回調(diào)
DubboSubscription.this.onError.accept(e);
}

/**
* 響應(yīng)式流取消時(shí)回調(diào)
*/
@Override
public void cancel() {
Future<T> future = futureRef.get();
if (future != null && !future.isDone()) {
future.cancel(true);
}
}
}

于是基于我們的響應(yīng)式dubbo客戶端,服務(wù)接口的調(diào)用方式為:

@Component
public class DubboReactiveExample {

@Reference
private ReactiveReference<VideoInfoService> videoInfoService;

/**
* 響應(yīng)式請(qǐng)求
*/
@NotNull
public Maybe<VideoInfo> maybeVideoInfo(long vid) {
// 提交響應(yīng)式請(qǐng)求
return videoInfoService.maybe(service -> service.getVideoInfo(vid));
}

/**
* 響應(yīng)式請(qǐng)求
*/
@NotNull
public Single<Optional<VideoInfo>> singleVideoInfo(long vid) {
// 提交響應(yīng)式請(qǐng)求
return videoInfoService.single(service -> service.getVideoInfo(vid));
}
}

注:此處的實(shí)現(xiàn)為常規(guī)的Spring生命周期處理,與本文無關(guān),具體實(shí)現(xiàn)細(xì)節(jié)不再贅述

擴(kuò)展com.alibaba.dubbo.config.spring.beans.factory.annotation.ReferenceAnnotationBeanPostProcessor使dubbo自動(dòng)注入框架支持ReactiveReference<Service>類型,自動(dòng)設(shè)置為異步模式,且向目標(biāo)Component中注入的實(shí)際類型為ReactiveReferenceImpl<Service>

5、實(shí)踐過程

我們?cè)谧x懂業(yè)務(wù)流一節(jié)中對(duì)視頻對(duì)象創(chuàng)建主流程做了說明,然后將主流程分成了5個(gè)階段,現(xiàn)在我們使用RxJava代碼來描述這5個(gè)階段(注:后續(xù)代碼僅用于演示主流程的實(shí)現(xiàn)過程,并不是完整的實(shí)現(xiàn)過程)。

輸入?yún)?shù)Args

  • 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,Args被設(shè)計(jì)為不可變
/**
* 入?yún)b
* 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,Args被設(shè)計(jì)為不可變
*/
@Builder
@Getter
public class Args {
@NotNull
private final long accountId;

private final String title;
private final String coverImg;
private final String other;

/**
* 視頻內(nèi)容校驗(yàn)值
*/
private final String signature;
... // other fields
}

過程結(jié)果PassObj

  • 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,PassObj被設(shè)計(jì)為不可變
/**
* 響應(yīng)式流處理過程對(duì)象包裝
* 為保證每個(gè)流處理結(jié)點(diǎn)的無狀態(tài)性,PassObj被設(shè)計(jì)為不可變
*/
@Builder(toBuilder = true)
@Getter
public class PassObj {
private final Args args;
private final Account account;
private final boolean argsAccepted;
private final VideoInfo videoInfo;
private final VideoInfoExtend videoInfoExtend;
private final PugcInfo pugcInfo;
private final CDNNodeInfo cdnNodeInfo;
... // other fields
}

響應(yīng)式流主干

  • 清晰地表達(dá)了業(yè)務(wù)流的執(zhí)行過程
@NotNull
public Single<PassObj> create(@NotNull Args inputArgs) {
return Single.fromCallable(() -> PassObj.builder().args(inputArgs).build())
.flatMap(this::checkStage) // 1 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
.flatMap(this::createVideoObjectStage)// 2 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
.flatMap(this::createCorrelateObjectStage)// 3 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段
.flatMap(this::cdnDispatchStage)// 4 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)\
.doOnSuccess(passObj -> { // 5 結(jié)束
// 視頻對(duì)象創(chuàng)建成功
})
.doOnError(e -> log.error(e.getMessage(), e))
;
}

每個(gè)階段的詳細(xì)實(shí)現(xiàn):

數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性

/**
* 1 數(shù)據(jù)校驗(yàn)階段:帳號(hào)狀態(tài)/入?yún)⒑弦?guī)/內(nèi)容重復(fù)性
*/
@NotNull
public Single<PassObj> checkStage(@NotNull PassObj passObj) {
return this.checkAndGetAccount(passObj) // 驗(yàn)證并獲取account對(duì)象
.map(account -> passObj.toBuilder().account(account).build())// 帳號(hào)狀態(tài)校驗(yàn)通過, 重建passObj,并裝配account
.flatMap(_passObj ->
this.checkInputArgs(_passObj) // 入?yún)⒑弦?guī)性校驗(yàn)
.map(argsAccepted -> _passObj.toBuilder().argsAccepted(argsAccepted).build())// 入?yún)⒑弦?guī)性校驗(yàn)通過, 重建passObj,并裝配校驗(yàn)結(jié)果
)
.flatMap(_passObj -> // 視頻元數(shù)據(jù)信息校驗(yàn)
this.videoMetaService.single(s -> s.getVideoMetaBySignature(passObj.getArgs().getSignature()))
.map($videoMeta -> {
if ($videoMeta.isPresent()) {
// 視頻元數(shù)據(jù)信息存在,說明視頻上傳是重復(fù)的,執(zhí)行相關(guān)的異常處理
throw new VideoDuplicatedException();
}
// 視頻沒有重復(fù),可以創(chuàng)建,向后傳遞passObj
return _passObj;
})
)
;
}
  • 驗(yàn)證并獲取Account對(duì)象
/**
* 驗(yàn)證并獲取Account對(duì)象
*/
@NotNull
private Single<Account> checkAndGetAccount(@NotNull PassObj passObj) {
return Maybe.fromCallable(() -> passObj.getArgs().getAccountId())// 取accountId
.filter(accountId -> accountId > 0L) // 取基本有效的值
.flatMap(accountId -> this.accountService.maybe(s -> s.checkAccount(accountId)))// 驗(yàn)證帳號(hào)狀態(tài)
.defaultIfEmpty(false)
.doOnSuccess(accountAccepted -> {
if (!accountAccepted) {
// 帳號(hào)狀態(tài)校驗(yàn)失敗
// 通過明確的異常來處理失敗過程
throw new AccountRejectedException();
}
})
.flatMapMaybe(_accountAccepted -> this.accountService.maybe(s -> s.getAccount(passObj.getArgs().getAccountId())))
.switchIfEmpty(Single.error(AccountObjectNullPointerException::new))
;
}
  • 入?yún)⒑弦?guī)性校驗(yàn)
/**
* 入?yún)⒑弦?guī)性校驗(yàn)
*/
private Single<Boolean> checkInputArgs(@NotNull PassObj passObj) { // 入?yún)⒑弦?guī)性校驗(yàn)可并發(fā)執(zhí)行
return Single.zip(
this.contentAuditService.maybe(s -> s.titleCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(TitleRejectedException::new)), // title
this.contentAuditService.maybe(s -> s.coverImgCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(CoverImgRejectedException::new)), // coverImage
this.contentAuditService.maybe(s -> s.otherCheck(passObj.getArgs().getTitle())).switchIfEmpty(Single.error(OtherRejectedException::new)), // others
(titleAccepted, coverImgAccepted, otherAccepted) -> titleAccepted && coverImgAccepted && otherAccepted
);
}

對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)

/**
* 2 對(duì)象創(chuàng)建階段: 視頻對(duì)象存儲(chǔ)
*/
@NotNull
public Single<PassObj> createVideoObjectStage(@NotNull PassObj passObj) {
return Single.fromCallable(() ->
// 包裝VideoInfo對(duì)象
VideoInfo.builder()
.userId(passObj.getAccount().getId())
.title(passObj.getArgs().getTitle())
.coverImg(passObj.getArgs().getCoverImg())
.other(passObj.getArgs().getOther())
.build()
)
.flatMapMaybe(videoInfo ->
this.videoInfoService.maybe(s -> s.createVideoInfo(videoInfo)) // 保存視頻對(duì)象
.filter(createdVideoInfo -> createdVideoInfo.getId() > 0L) // 驗(yàn)證保存結(jié)果
)
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoCreateException::new)) // 異常處理
.map(createdVideoInfo -> passObj.toBuilder().videoInfo(createdVideoInfo).build()) // 將保存結(jié)果裝配到passObj中
;
}

關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段

/**
* 3 關(guān)聯(lián)數(shù)據(jù)對(duì)象創(chuàng)建階段
*/
@NotNull
public Single<PassObj> createCorrelateObjectStage(@NotNull PassObj passObj) {
return Single.zip( // 必要內(nèi)容:自媒體業(yè)務(wù)/視頻對(duì)象擴(kuò)展 可并發(fā)執(zhí)行
Single.fromCallable(() -> // 視頻對(duì)象擴(kuò)展
VideoInfoExtend.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(videoInfoExtend ->
this.videoInfoService.maybe(s -> s.createVideoInfoExtend(videoInfoExtend))
.onErrorResumeNext(e -> Maybe.error(() -> new VideoInfoExtendCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(VideoInfoExtendCreateException::new)) // 異常處理
),
Single.fromCallable(() -> // 自媒體業(yè)務(wù)
PugcInfo.builder()
.userId(passObj.getAccount().getId())
.videoInfoId(passObj.getVideoInfo().getId())
.build()
)
.flatMap(pugcInfo ->
this.pugcService.maybe(s -> s.createPugcInfo(pugcInfo))
.onErrorResumeNext(e -> Maybe.error(() -> new PugcInfoCreateException(e))) // 異常處理
.switchIfEmpty(Single.error(PugcInfoCreateException::new)) // 異常處理
),
(createdVideoInfoExtend, createdPugcInfo) ->
passObj.toBuilder()
.videoInfoExtend(createdVideoInfoExtend)
.pugcInfo(createdPugcInfo)
.build()
)
.doOnSuccess(updatedPassObj -> { // 可選內(nèi)容:視頻元數(shù)據(jù)/轉(zhuǎn)碼/其它 不關(guān)注結(jié)果,異步執(zhí)行
Maybe.fromCallable(() ->
VideoMeta.builder()
.signature(updatedPassObj.getArgs().getSignature())
.build()
)
.flatMap(videoMeta -> this.videoMetaService.maybe(s -> s.createVideoMeta(videoMeta)))
.doOnError(e -> log.error(e.getMessage(), e))
.subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoKeyFrame(updatedPassObj.getVideoInfo())).subscribe();
this.videoTranscodingService.maybe(s -> s.createVideoResolution(updatedPassObj.getVideoInfo())).subscribe();
this.otherOptionalService.maybe(s -> s.doSomething(updatedPassObj.getVideoInfo())).subscribe();
})
;
}

上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)

/**
* 上傳準(zhǔn)備:從CDN調(diào)度視頻內(nèi)容存儲(chǔ)結(jié)點(diǎn)
*/
@NotNull
public Single<PassObj> cdnDispatchStage(@NotNull PassObj passObj) {
return cdnDispatchService.maybe(CDNDispatchService::dispatch) // 調(diào)度一個(gè)CDN上傳節(jié)點(diǎn)
.onErrorResumeNext(e -> Maybe.error(() -> new CDNDispatchException(e))) // 調(diào)用異常
.switchIfEmpty(Single.error(CDNDispatchException::new)) // 沒有取到節(jié)點(diǎn), 異常處理
.map(cdnNodeInfo ->
passObj.toBuilder()
.cdnNodeInfo(cdnNodeInfo)
.build()
)
;
}

6、總結(jié)

以上內(nèi)容是搜狐視頻服務(wù)端PUGC團(tuán)隊(duì),首次在核心業(yè)務(wù)接口中應(yīng)用響應(yīng)式異步非阻塞架構(gòu)的思考和實(shí)施過程,文中主要闡述了兩個(gè)內(nèi)容:

  • Dubbo響應(yīng)式客戶端的實(shí)現(xiàn)過程
  • 視頻對(duì)象創(chuàng)建接口的業(yè)務(wù)分析和實(shí)現(xiàn)過程

通過本次重構(gòu),我們獲得了如下收益:

  • 重新梳理業(yè)務(wù)流,將業(yè)務(wù)流同響應(yīng)式流整合,基于響應(yīng)式編程規(guī)范業(yè)務(wù)流的實(shí)現(xiàn)過程
  • 視頻對(duì)象創(chuàng)建接口平均響應(yīng)時(shí)間從256ms降低到146ms,性能提供顯著

結(jié)束,感謝閱讀!

責(zé)任編輯:未麗燕 來源: 搜狐技術(shù)產(chǎn)品
相關(guān)推薦

2021-02-27 16:08:17

Java異步非阻塞

2015-07-03 10:12:04

編程同步非阻塞

2024-09-05 09:41:57

2012-05-29 10:44:17

WebApp

2019-07-23 11:01:57

Python同步異步

2012-02-22 21:15:41

unixIO阻塞

2022-06-22 08:16:29

異步非阻塞框架

2021-05-25 08:20:37

編程技能開發(fā)

2021-03-04 08:34:55

同步阻塞非阻塞

2012-10-10 10:00:27

同步異步開發(fā)Java

2024-12-02 00:57:17

非阻塞異步編程

2023-07-12 08:16:54

JVM工具包Vert.x

2018-03-28 08:52:53

阻塞非阻塞I

2009-08-21 14:25:23

C#異步傳輸字符串

2025-02-17 13:23:34

Python同步阻塞MySQL

2022-09-01 08:00:00

響應(yīng)式編程集成

2023-12-06 07:28:47

阻塞IO異步IO

2024-09-23 17:15:28

Python并發(fā)并行

2024-12-10 08:09:15

2021-06-04 18:14:15

阻塞非阻塞tcp
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)