自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

不可思議!億級(jí)數(shù)據(jù)竟然如此輕松同步至ES!

云計(jì)算 云原生
文件源指的是數(shù)據(jù)源來源于文本文件,適合中等數(shù)據(jù)的同步。ECP和對象存儲(chǔ)進(jìn)行了對接,用戶可以上傳文件至對象存儲(chǔ),在任務(wù)執(zhí)行時(shí),ECP會(huì)讀取對象存儲(chǔ)中的文本數(shù)據(jù)。

1 這是一個(gè)背景

最近接了一個(gè)需求,要提供一個(gè)隨意組合多個(gè)條件來查詢訂單數(shù)據(jù)的功能,看著數(shù)據(jù)庫里過億的訂單量,頭發(fā)不爭氣的又脫落了兩根代表這個(gè)需求不簡單。

圖片圖片

脫落的兩根頭發(fā),不是技術(shù)實(shí)現(xiàn)上很難,其實(shí)技術(shù)實(shí)現(xiàn)上清晰明了,就是通過數(shù)據(jù)異構(gòu),將數(shù)據(jù)同步到ES,利用ES的倒排索引、緩存等能力,提供多條件復(fù)雜查詢的能力,而ES集群我們已經(jīng)有了。

但有些數(shù)據(jù),在目前的ES索引中是不存在的,也就是說,我需要將過億的訂單數(shù)據(jù)從訂單數(shù)據(jù)庫重新刷一遍到ES中,而這一頓操作下來得需要一周的時(shí)間!

什么?你不信,那咱們來捋一捋。

2 捋一捋訂單數(shù)據(jù)同步到ES中的復(fù)雜度

2.1 數(shù)據(jù)同步ES索引流程

圖片圖片

如上圖所示,就是將數(shù)據(jù)同步到ES索引的過程。

首先需要從訂單數(shù)據(jù)庫查詢所有的訂單數(shù)據(jù),然后根據(jù)訂單數(shù)據(jù)上保存的用戶ID,商品ID等信息從用戶服務(wù),商品服務(wù)查詢相關(guān)信息,經(jīng)過處理與組裝后落到ES集群中。

之所以要查詢用戶信息和商品信息,是因?yàn)楫悩?gòu)在ES索引中的訂單數(shù)據(jù),并不會(huì)與mysql中的數(shù)據(jù)一一對應(yīng),有很多根據(jù)商品類目,用戶信息等查詢訂單信息的訴求存在,因此在這里就需要查詢很多的上游服務(wù)來組裝信息。

2.2 來梳理下是否有難點(diǎn)?

  1. 從數(shù)據(jù)庫把上億的訂單數(shù)據(jù)讀取出來。這個(gè)操作不能影響到線上業(yè)務(wù),因此查詢的訂單數(shù)據(jù)庫一般是從庫,OK,配置多數(shù)據(jù)源來讀取數(shù)據(jù)吧,而且上億的訂單一般采用的都是分庫分表來存儲(chǔ)的,我們是分了16個(gè)庫,每個(gè)庫16個(gè)表,總共256張表,嘿嘿。
  2. 上億的訂單數(shù)據(jù)不能一次性全部讀取到內(nèi)存吧,不然內(nèi)存冒煙都存不下啊。所以得考慮分頁,分頁直接limit也不好,隨著數(shù)據(jù)量越大,速度越慢,所以得考慮一個(gè)游標(biāo),嗯,選一個(gè)字段當(dāng)游標(biāo)吧,游標(biāo)最好唯一且遞增。
  3. 從多個(gè)服務(wù)獲取數(shù)據(jù),這些數(shù)據(jù)所在的服務(wù)一般都屬于公司的其它部門,讀取數(shù)據(jù)的時(shí)候也不能影響到人家的服務(wù)吧,你這里查詢的是嘎嘎猛,一看人家的服務(wù)都崩了,這個(gè)黑鍋就飛來了。所以這里得考慮限流吧,得考慮隔離吧?不說全鏈路隔離,成本太高,起碼關(guān)鍵服務(wù)得隔離一下。
  4. 數(shù)據(jù)同步一段時(shí)間,產(chǎn)品來問,同步多久了啊,大概還有多久能完成啊,數(shù)據(jù)量大概是多少啊,一臉懵,不知道啊。
  5. 如果中途同步失敗了,咋處理啊,是不是得重試,咋重試,重試策略是啥?失敗有沒有報(bào)警,能不能及時(shí)感知并處理???如果同步一段時(shí)間中斷了咋整啊?有沒有記錄從哪中斷的?能否從中斷處繼續(xù)同步啊,不然從頭開始又得N天,哭了。
  6. 同步了一部分,發(fā)現(xiàn)有問題需要暫停一會(huì),咋整?
  7. 如果只想同步部分?jǐn)?shù)據(jù)不一致的訂單數(shù)據(jù),可能就2,3個(gè)訂單,咋整,是不是還得提供按照手動(dòng)輸入訂單ID同步ES數(shù)據(jù)的能力?
  8. 同步過程是咋樣的?開始時(shí)間?結(jié)束時(shí)間?共耗時(shí)多久?操作人是誰?這些統(tǒng)計(jì)數(shù)據(jù)從哪來?
  9. 想夜深人靜的時(shí)候同步數(shù)據(jù),這有時(shí)候?qū)I(yè)務(wù)的影響小,定個(gè)鬧鐘晚上起?
  10. 現(xiàn)在不單需要同步訂單的數(shù)據(jù)了,還需要同步商品ES集群的數(shù)據(jù),這些邏輯還得重新寫一遍?

啊啊啊啊,想想都頭疼?。?/p>

所以,一些事情看著簡單,其實(shí)并沒有那么簡單。

3 神奇的服務(wù)

為了讓頭發(fā)更有歸屬感,針對上述的難點(diǎn)開發(fā)了一款神奇的服務(wù),那就是ECP。它可以將整個(gè)流程自動(dòng)化、可視化的處理,降低數(shù)據(jù)異構(gòu)到ES的成本任務(wù)界面如下所示:

圖片圖片

3.1 ECP的簡單運(yùn)行流程

簡單來說,ECP的作用就是將數(shù)據(jù)從數(shù)據(jù)源讀取出來,然后推送給ES寫服務(wù)。因?yàn)閿?shù)據(jù)處理的邏輯因不同的業(yè)務(wù)而異,ES寫服務(wù)由各個(gè)對接方來實(shí)現(xiàn),因此一個(gè)簡單的流程如下圖:

圖片圖片

這里面涉及到一些技術(shù)細(xì)節(jié),比如如何進(jìn)行多數(shù)據(jù)源數(shù)據(jù)讀取,數(shù)據(jù)源配置,sql校驗(yàn),動(dòng)態(tài)限流、SPI機(jī)制、重試策略與故障感知、探活與故障恢復(fù),環(huán)境隔離等等。

下面一一介紹下:

3.2 多數(shù)據(jù)源數(shù)據(jù)讀取

ECP支持目前支持三個(gè)數(shù)據(jù)源數(shù)據(jù)的讀取,分別為ID源,文本源、以及腳本源。

3.2.1 ID源

有個(gè)文本框用來輸入ID。這種場景適用于小數(shù)據(jù)的數(shù)據(jù)同步,比如發(fā)現(xiàn)一些數(shù)據(jù)庫和ES的數(shù)據(jù)不一致了,就簡單的刷一下數(shù)據(jù)。

圖片圖片

3.2.2 文件源

文件源指的是數(shù)據(jù)源來源于文本文件,適合中等數(shù)據(jù)的同步。ECP和對象存儲(chǔ)進(jìn)行了對接,用戶可以上傳文件至對象存儲(chǔ),在任務(wù)執(zhí)行時(shí),ECP會(huì)讀取對象存儲(chǔ)中的文本數(shù)據(jù)。

這種情況需要注意的是,用戶上傳的文件有可能會(huì)比較大,直接都讀取到內(nèi)存再處理不現(xiàn)實(shí),因此這里采用的是流的方式進(jìn)行讀取,讀取一批處理一批,再釋放一批,不會(huì)造成OOM。

圖片圖片

簡化的處理方式如下:

try (Response response = OK_HTTP_CLIENT.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }

  // 以流的方式讀取文件數(shù)據(jù)
  InputStream inputStream = response.body().byteStream();
  BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));

}

3.2.3 腳本源

腳本源適用于大數(shù)據(jù)量的數(shù)據(jù)同步。

腳本本質(zhì)上就是SQL和數(shù)據(jù)源的結(jié)合。

用戶在ECP中配置數(shù)據(jù)庫的連接信息,然后配置SQL。ECP會(huì)執(zhí)行該SQL,將數(shù)據(jù)從配置的數(shù)據(jù)庫中讀取出來,推送到ES寫服務(wù)中。

腳本源可以支持上億數(shù)據(jù)的讀取與推送,如下圖為訂單庫(分庫分表)配置的腳本信息:

圖片圖片

3.2.4 腳本源大數(shù)據(jù)讀取的實(shí)現(xiàn)

將幾億數(shù)據(jù)讀取到內(nèi)存中來處理顯然不可能,因此采用局部數(shù)據(jù)的讀取與處理才是正道。

在業(yè)務(wù)中,經(jīng)常使用的是分頁,但分頁如果僅是使用limit offset,size,待offset的值比較大時(shí),性能會(huì)急劇下降,形成慢SQL,甚至拖累整個(gè)數(shù)據(jù)庫的性能。

因此在分頁數(shù)量比較大時(shí),需要指定一個(gè)有索引的字段作為游標(biāo),該游標(biāo)可以提高分頁的性能,如在訂單表中,若在訂單ID是遞增的且有設(shè)置了索引,SQL就可以這么寫:select * from t_order where order_id > xxx order by order_id desc limit 10; 利用order_id值的變化就可以起到分頁的效果。

這種方式雖好,但讓用戶選定游標(biāo)索引無疑增加了使用的門檻,因此ECP沒有采用上述分頁的形式來讀取大數(shù)據(jù),而是采用JDBC游標(biāo)查詢的方式,如下所示:

// 建立連接
       conn = DriverManager.getConnection(url, param.getDsUsername(), param.getDsPassword());
       // 創(chuàng)建查詢
       stmt = conn.createStatement(java.sql.ResultSet.TYPE_FORWARD_ONLY, java.sql.ResultSet.CONCUR_READ_ONLY);
       stmt.setFetchSize(param.getFetchSize());

游標(biāo)查詢每次讀取fetchSize大小的數(shù)據(jù)量,可以很好的避免讀取大數(shù)據(jù)量導(dǎo)致的OOM問題。

3.3 SQL的解析與校驗(yàn)

用戶配置SQL腳本,ECP需要對該SQL腳本進(jìn)行校驗(yàn)與修改,傳統(tǒng)的字符串處理(比如正則)雖然在一定情況下可以滿足需求,但是容易出錯(cuò)。因此ECP采用的是Druid的SQL解析工具包,可以將SQL解析成AST語法樹,以便對SQL進(jìn)行各種處理。如下圖所示:

圖片圖片

ECP提供的數(shù)據(jù)樣例查詢,會(huì)對SQL自動(dòng)拼接上limit 1。

圖片圖片

圖片圖片

3.4 動(dòng)態(tài)限流的實(shí)現(xiàn)

限流分集群限流和單機(jī)限流,經(jīng)過評(píng)估,在能簡單就簡單的原則下,我們采用的是單機(jī)限流,限流組件使用的是guava的RateLimiter。

圖片圖片

當(dāng)在頁面上修改QPS的值時(shí),會(huì)將該值同步到數(shù)據(jù)庫中,有個(gè)調(diào)度任務(wù)會(huì)不斷地掃描該值的變動(dòng),將變動(dòng)的值同步到RateLimiter組件中。

當(dāng)然,也可以采用數(shù)據(jù)監(jiān)聽的策略(比如廣播MQ),讓變動(dòng)值同步到RateLimiter更及時(shí),但這種方式還需引入其它組件,復(fù)雜度嗷嗷上升,不符合我們簡單實(shí)現(xiàn)的策略。

動(dòng)態(tài)限流的實(shí)現(xiàn)流程如下:

圖片圖片

如下圖是在不同的時(shí)間點(diǎn)修改了限流值后的QPS變化圖:

圖片圖片

3.5 重試策略與故障感知

ES中和DB中的數(shù)據(jù)要盡可能的保證實(shí)時(shí)一致性,但最終一致性是必須要保證的,所以數(shù)據(jù)推送、處理失敗的時(shí)候要進(jìn)行重試,如何重試?

首先需要了解下失敗的類型,制定合適的重試策略,知彼知己,百戰(zhàn)不殆嘛!

一、網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的接口調(diào)用超時(shí)。在調(diào)用微服務(wù)RPC接口的時(shí)候,由于網(wǎng)絡(luò)抖動(dòng)等情況,會(huì)導(dǎo)致接口調(diào)用超時(shí),但很快就會(huì)恢復(fù),通常情況下也就偶爾一次,下一次調(diào)用就會(huì)正常。

二、數(shù)據(jù)處理邏輯異常。這種情況下,異常沒辦法自恢復(fù),只能人工介入。

三、上游服務(wù)異常。如上游服務(wù)壓力過大導(dǎo)致接口調(diào)用失敗,這時(shí)候就需要我們緩一緩再繼續(xù)處理,不能一個(gè)勁的調(diào)用導(dǎo)致上游服務(wù)崩潰掉。

結(jié)合上面的失敗類型的特點(diǎn),斐波那契數(shù)列的重試策略就非常適合 斐波那契數(shù)列的特點(diǎn)是:1,1,2,3,5,8,13,21,34,55,89…

當(dāng)?shù)谝淮问〉臅r(shí)候,延時(shí)1秒后就重試,如果此時(shí)是網(wǎng)絡(luò)抖動(dòng)導(dǎo)致的超時(shí),重試就成功了,不影響數(shù)據(jù)處理的速度 若失敗的次數(shù)越多,重試的間隔時(shí)間就會(huì)越長,這也會(huì)兼顧到上述二、三的失敗類型。

重試組件使用的是Guava Retry,簡單的偽代碼如下:

// 重試組件配置
private final Retryer<Boolean> RETRYER = RetryerBuilder.<Boolean>newBuilder()
            // 對中斷類的異常不重試
            .retryIfException(input -> !isPauseException(input))
            // 1,1,2,3,5,8,13,21,33...
            .withWaitStrategy(WaitStrategies.fibonacciWait(1000, 30, TimeUnit.SECONDS))
           // 重試次數(shù)達(dá)到一定的次數(shù)后,不再重試
            .withStopStrategy(StopStrategies.stopAfterAttempt(MAX_RETRY_TIMES))
            .withRetryListener(new RetryListener() {
                @Override
                public <V> void onRetry(Attempt<V> attempt) {
                    if (attempt.hasException()) {
                        log.error("act=【DataFlushRpcCallRetry】desc=【重試】重試次數(shù)=【{}】重試異常=【{}】", attempt.getAttemptNumber(), attempt.getExceptionCause());
                        // 重試超過閾值進(jìn)行報(bào)警提醒
                        alarmIfExceedThreshold(attempt);
                    }
                }
            })
            .build();

// 將執(zhí)行邏輯抽象為Runnable,對外暴露該方法
public void execute(Runnable runnable) {
    innerExecute(runnable,RETRYER);
}


private void innerExecute(Runnable runnable, Retryer<Boolean> retryer) {
    try {
    retryer.call(() -> {
        runnable.run();
        return true;
       });
    } catch (Exception e) {
       log.error("act=【DataFlushRpcCallRetry】desc=【重試異?!縠rror=【{}】", e);
       throw new IllegalStateException(e);
    }
}

若重試到一定次數(shù)之后依然是失敗的話,則會(huì)將錯(cuò)誤信息發(fā)送到報(bào)警群。根據(jù)推送的信息,可以明確知道錯(cuò)誤的類型,重試的次數(shù),以及任務(wù)的創(chuàng)建人等等信息,無需查看日志,即可定位大部分的問題。如下圖:

圖片圖片

3.6 將數(shù)據(jù)推送給哪個(gè)服務(wù)來處理?-SPI機(jī)制

ECP是個(gè)通用的服務(wù),因此需要將共性功能收攏在一起做成成品,將非共性的功能抽象一下,交給各個(gè)對接方去實(shí)現(xiàn)。

從簡單實(shí)現(xiàn)的角度來看,若有某個(gè)服務(wù)想要對接ECP,我們在ECP上開發(fā)一下,調(diào)用該服務(wù)的接口,將數(shù)據(jù)推送給該服務(wù),思路雖清晰明了,但對接及維護(hù)成本極高,且沒有一個(gè)統(tǒng)一的規(guī)范,因此不可取,其流程如下圖:

圖片圖片

Java上有個(gè)很好的思想可以解決這個(gè)問題,那就是SPI。因此由ECP提供一個(gè)接口,制定一個(gè)規(guī)范,具體的ES索引數(shù)據(jù)的組裝邏輯由各個(gè)對接方去實(shí)現(xiàn)。

這樣,若有一個(gè)新的對接方接入,只要實(shí)現(xiàn)接口即可,ECP無需做任何改動(dòng)。

圖片圖片

至于服務(wù)發(fā)現(xiàn),ECP采用的配置的方式,也就是在新建任務(wù)的時(shí)候,選擇數(shù)據(jù)推送的消費(fèi)方服務(wù),如下圖:

圖片圖片

對于實(shí)現(xiàn)方式,得益于公司內(nèi)部自研的RPC框架,提供了動(dòng)態(tài)指定調(diào)用服務(wù)的方式,偽代碼如下:

Reference<IEsIndexFlushAPI> reference = new Reference<>();
// 設(shè)置調(diào)用的服務(wù)名
reference.setServiceName(serviceName);
// 設(shè)置接口名
reference.setInterfaceClass(IEsIndexFlushAPI.class);
// 設(shè)置上下文
reference.setApplicationConfig(applicationConfig);
// 獲取接口實(shí)例
IEsIndexFlushAPI iEsIndexFlushAPI = ES_INDEX_FLUSH_API_MAP.computeIfAbsent(serviceName, s -> reference.refer());
// 接口調(diào)用
log.info("act=【EsIndexFlushApiInvoker】desc=【請求值】serviceName=【{}】dataListSize=【{}】indexNameList=【{}】tag=【{}】", serviceName,request.getDataList().size(),request.getIndexNameList(),request.getTag() );
EMApiResult<FlushResponse> result = iEsIndexFlushAPI.flush(request);

3.7 環(huán)境隔離

同步數(shù)據(jù)是個(gè)比較重的操作,這個(gè)操作不應(yīng)該影響到線上業(yè)務(wù) 因此,同步數(shù)據(jù)的服務(wù)應(yīng)當(dāng)與線上服務(wù)隔離開 ECP整合了架構(gòu)組提供的標(biāo)簽路由功能,可以在整個(gè)請求鏈路中調(diào)用指定標(biāo)簽的服務(wù),實(shí)現(xiàn)環(huán)境隔離。

ECP標(biāo)簽路由配置圖:

圖片圖片

如下圖,若在ECP上配置任務(wù)的標(biāo)簽路由為FLUSH,則在同步任務(wù)執(zhí)行過程中,會(huì)自動(dòng)調(diào)用鏈路中綁定了FLUSH標(biāo)簽的服務(wù)分組。

圖片圖片

若某些服務(wù)沒有配置為FLUSH標(biāo)簽的分組,這時(shí)就會(huì)自動(dòng)請求該服務(wù)的線上正常環(huán)境。這樣,就可以做到一定程度上的環(huán)境隔離。

圖片圖片

3.8 探活與任務(wù)故障恢復(fù)機(jī)制

在推送數(shù)據(jù)的過程中,若發(fā)生了不可描述的事情導(dǎo)致任務(wù)中斷,咋整?

到了需求DeadLine,發(fā)現(xiàn)任務(wù)在某年某月某日進(jìn)度為1%的時(shí)候停了,哭了。

而且工作時(shí)間緊,任務(wù)重,總不能一定盯著任務(wù),看有沒有中斷吧?這不適合,也不禮貌。

當(dāng)然,這種情況在ECP是不會(huì)發(fā)生的,因?yàn)镋CP是有“自救包”的。下面聊下ECP的任務(wù)探活和中斷恢復(fù)機(jī)制。

如下圖,在ECP中有探活和任務(wù)故障恢復(fù)兩大組件 探活組件負(fù)責(zé)監(jiān)控當(dāng)前任務(wù)線程的執(zhí)行狀態(tài),若任務(wù)線程正在執(zhí)行,則對該任務(wù)的存活時(shí)間進(jìn)行續(xù)期 任務(wù)故障恢復(fù)組件負(fù)責(zé)掃描當(dāng)前未完成的任務(wù),若任務(wù)上次存活時(shí)間大于指定的閾值時(shí),則拉取該任務(wù)恢復(fù)執(zhí)行。

圖片圖片

續(xù)期的偽代碼如下:

@Scheduled(fixedDelay = ScheduleTimeConstants.KEEP_ALIVE_MILLS)
    public void renewal(){
        futureMap.forEach((taskId,future)->{
            if (!future.isDone()){
                log.info("act=【renewal】desc=【任務(wù)續(xù)期】taskId=【{}】續(xù)期時(shí)間=【{}】",taskId, DateUtils.dateToString(new Date(),DateUtils.PATTERN));
                contextService.renewal(taskId);
            }else {
                log.info("act=【renewal】desc=【任務(wù)結(jié)束】taskId=【{}】",taskId);
                futureMap.remove(taskId);
            }
        });
    }

任務(wù)故障恢復(fù)的偽代碼如下:

@Scheduled(fixedDelay = ScheduleTimeConstants.RESTART_TASK_MILLS)
    public void restartTask(){

     // 1.查詢當(dāng)前未完成的任務(wù)
        List<TaskFlushExecuteContextPO> contextPOS = contextService.queryRunningTask();

        for (TaskFlushExecuteContextPO contextPO : contextPOS) {
            // 2.計(jì)算上次存活到當(dāng)前的時(shí)間
            Integer durationMin = calculateTimeSinceLastAlive();

      // 3.若時(shí)間大于指定閾值 則對任務(wù)重新拉起
            if (durationMin >= MAX_DURATION_MIN){
                log.info("act=【restartTask】desc=【任務(wù)重新拉起】taskId=【{}】",contextPO.getTaskId());
                // 4.更新alive_time進(jìn)行鎖定 防止并發(fā)執(zhí)行
                int i = contextExtMapper.casUpdateAliveTime();
                if (i >0){
                    // 5.重新拉起任務(wù)
                    restart0(contextPO, aliveTime);
                }
            }
        }
    }

3.9 平滑遷移的實(shí)現(xiàn)

將數(shù)據(jù)同步到ES,通常有兩種方式:

  1. 直接把數(shù)據(jù)同步到原索引上。
  2. 新建一個(gè)索引,利用雙寫以及切換別名的方式實(shí)現(xiàn)流量的平滑遷移。

對于新建一個(gè)索引的場景,往往是索引Mapping的改變,或者是為了不影響原索引,保證操作可回滾。

針對這種場景,ECP分析了歷來大家手動(dòng)操作刷ES索引的步驟,將流程進(jìn)行抽象,歸納了以下幾個(gè)步驟,如下圖:

圖片圖片

ECP提供了平滑遷移組件,其內(nèi)部整合了Apollo配置中心實(shí)現(xiàn)推送能力,其簡要的實(shí)現(xiàn)流程如下圖:

圖片圖片

3.10 優(yōu)雅的日志記錄

如下圖所示展示了該任務(wù)操作的日志,原則上日志記錄為非核心業(yè)務(wù),需要與核心業(yè)務(wù)代碼進(jìn)行剝離,因此使用注解式流水記錄是個(gè)很好的選擇。

圖片圖片

但注解式流水記錄有個(gè)問題,就是在很多的場景下,流水里面的值需要?jiǎng)討B(tài)獲取,利用注解可以實(shí)現(xiàn)嗎? 答案是可以的,在上圖所示中,任務(wù)ID、數(shù)據(jù)來源都是動(dòng)態(tài)數(shù)據(jù),那如何實(shí)現(xiàn)的呢?看下面代碼:

@Flow(subjectIdEp = "#taskPO.id",subjectType = SubjectTypeEnum.TASK,operateFlowType = OperateFlowTypeEnum.CREATE_TASK,content = "'創(chuàng)建任務(wù),任務(wù)ID:' + #taskPO.id ")
    public void saveTaskWithUser(TaskPO taskPO) {
        String name = LoginUserContext.get().getName();
        taskPO.setCreator(name);
        taskPO.setModifier(name);
        taskMapper.insertSelective(taskPO);
    }

subjectIdEp為流水主題ID,#taskPo.id為一個(gè)表達(dá)式,可用動(dòng)態(tài)獲取參數(shù)taskPo中的id值,這里利用了springEl表達(dá)式的能力。

content = "'創(chuàng)建任務(wù),任務(wù)ID:' + #taskPO.id " 為流水信息,同樣利用了springEL表達(dá)式,動(dòng)態(tài)獲取請求參數(shù)taskPo中的id信息。

但有些信息需要一系列的計(jì)算才可以獲取到,而不是單純的從對象中取值,這也是可以實(shí)現(xiàn)的。如下:

@Flow(subjectIdEp = "#contextPO.taskId",
            subjectType = SubjectTypeEnum.TASK,
            operateFlowType = OperateFlowTypeEnum.DATA_FLUSH,
            content = "'【數(shù)據(jù)同步】異常中斷任務(wù)恢復(fù)執(zhí)行,中斷時(shí)間:' + T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime)")
    @Transactional(rollbackFor = Exception.class,isolation = Isolation.REPEATABLE_READ)
    public void restart0(TaskFlushExecuteContextPO contextPO, Date aliveTime) {
        log.info("act=【restartTask】desc=【任務(wù)重新拉起】taskId=【{}】原aliveTime=【{}】", contextPO.getTaskId(), aliveTime);
        dsProcessorExecutor.executeAndKeepAliveMonitor(contextPO.getTaskId());
    }

其中T(com.zhuanzhuan.esmanage.utils.DateUtils).dateToStringSimple(#aliveTime) 代表執(zhí)行的是DateUtils.dateToStringSimple 方法,也就是說表達(dá)式是可以調(diào)用方法的,包括從spring容器中獲取對象,調(diào)用對象的方法均可。

這種注解式流水的實(shí)現(xiàn)原理,就是利用SPEL表達(dá)式和Spring Aop的特性,寫一個(gè)切面,攔截自定義的flow注解即可,偽代碼如下:

// 定義切面,攔截FLOW注解
@Around("@annotation(com.zhuanzhuan.esmanage.entity.annotation.Flow)")
public Object around(ProceedingJoinPoint point) throws Throwable {

    // 調(diào)用目標(biāo)方法
    Object result = null;
    try {
        result = point.proceed();
        recordFlow(point,result);
        return result;
    } catch (Throwable e) {
        recordException(point,e);
        throw e;
    }
}


// 流水記錄的實(shí)現(xiàn)
private void recordFlow(ProceedingJoinPoint point, Object result) {
    // try catch 防止影響主邏輯
    //TODO 看是否需要寫在一個(gè)事務(wù)中,主要評(píng)估流水的重要性
    try {
        MethodSignature signature = (MethodSignature) point.getSignature();
        Flow flowAnnotation = getFlowAnnotation(signature);

        // 組裝參數(shù)上下文
        EvaluationContext evaluationContext = buildContext(point, signature);

        evaluationContext.setVariable("result",result);

        // ID表達(dá)式
        String subjectIdEp = flowAnnotation.subjectIdEp();

        // content表達(dá)式
        String content = getContent(flowAnnotation, evaluationContext);

    // SPEL解析表達(dá)式
        Expression expression = PARSER.parseExpression(subjectIdEp);
        Integer subjectId = (Integer)expression.getValue(evaluationContext);
        record(flowAnnotation, subjectId, content);
    } catch (Exception e) {
        log.error("記錄操作流水失敗", e);
    }
}

4 總結(jié)

總得來說,ECP的實(shí)現(xiàn)中有很多的技術(shù)細(xì)節(jié)需要考慮,技術(shù)難度一般。 

實(shí)際上,在我們大部分的項(xiàng)目中,考驗(yàn)的就是對細(xì)節(jié)的把控。

ps:感謝ChatGPT對本文名稱的大力支持

關(guān)于作者

閆展,轉(zhuǎn)轉(zhuǎn)交易中臺(tái)研發(fā)工程師

責(zé)任編輯:武曉燕 來源: 轉(zhuǎn)轉(zhuǎn)技術(shù)
相關(guān)推薦

2013-10-10 13:07:25

方物

2021-11-10 06:38:01

Python鏈?zhǔn)?/a>操作

2023-04-06 09:44:00

ChatGPT行業(yè)質(zhì)量

2017-03-21 08:52:20

神經(jīng)網(wǎng)絡(luò)聲譽(yù)

2020-07-02 15:40:11

Spring BootJar包Java

2011-02-23 08:50:22

C#.NETdynamic

2010-07-15 16:21:03

不可思議的服務(wù)器

2022-01-24 15:57:34

Python返回功能代碼

2011-07-18 13:35:14

HTML 5

2014-07-26 22:18:51

2013-07-31 15:06:58

未來的WebWebGLWeb

2012-02-13 11:01:27

N9Android 4.0

2017-11-08 14:07:45

數(shù)據(jù)庫MySQL慢查分析

2014-01-14 10:33:42

開源硬件開源

2012-05-16 17:28:32

智能手機(jī)

2019-05-27 09:56:00

數(shù)據(jù)庫高可用架構(gòu)

2024-08-22 14:16:08

2021-03-03 07:12:47

Windows10操作系統(tǒng)微軟

2016-07-06 11:56:52

思科漢堡光纖骨干網(wǎng)

2023-04-04 22:31:11

GPT-5人工智能
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)