電商真實(shí)對(duì)賬系統(tǒng)是如何設(shè)計(jì)并優(yōu)化的
前言
往期文章在熱點(diǎn)數(shù)據(jù)如何更新的一篇文章中有提到對(duì)賬系統(tǒng)。其實(shí)我在實(shí)際業(yè)務(wù)場(chǎng)景中是有遇到過(guò)類似對(duì)賬的優(yōu)化問(wèn)題的。說(shuō)優(yōu)化之前要掌握一點(diǎn)就是一定要掌握J(rèn)ava并發(fā)包的相關(guān)特性。本章節(jié)對(duì)此有很大依賴。
- 熱點(diǎn)數(shù)據(jù)高效更新文章:
inventory hint,解決熱點(diǎn)數(shù)據(jù)如何高效更新
Java并發(fā)包簡(jiǎn)說(shuō)
CountDownLatch和CyclicBarrier
區(qū)別
CountDownLatch:
- 不可以重復(fù)使用,計(jì)數(shù)器無(wú)法被重置
經(jīng)典案例比如門(mén)衛(wèi)休息休要等所有人下班才可以關(guān)門(mén)休息 CyclicBarrier:
- 可以重復(fù)使用,是一個(gè)同步輔助類,允許一組線程相互等待,直到到達(dá)某個(gè)公共屏障點(diǎn)(common barrier point)。假設(shè)設(shè)計(jì)一組固定大小的線程的程序中,這些線程必須不是的互相等待,此時(shí)就可以使用CyclicBarrier。因?yàn)樵揵arrier在釋放等待線程后可以重用,從而稱之為循環(huán)的barrier。
經(jīng)典案例:比如運(yùn)動(dòng)員跑步,需要所有人準(zhǔn)備好之后裁判才可以發(fā)令讓大家在同一時(shí)刻去跑。有依賴關(guān)系
案例
有一天,老大匆忙趕來(lái),提到對(duì)賬系統(tǒng)最近變得越來(lái)越緩慢,希望能迅速進(jìn)行優(yōu)化。經(jīng)過(guò)了解對(duì)賬系統(tǒng)的業(yè)務(wù)流程,發(fā)現(xiàn)其實(shí)相當(dāng)簡(jiǎn)單:用戶通過(guò)在線商城下單會(huì)生成電子訂單并存儲(chǔ)在訂單數(shù)據(jù)庫(kù)中;隨后物流會(huì)生成派送單用于發(fā)貨,派送單則保存在派送單庫(kù)中。為了避免漏發(fā)或重復(fù)派送,對(duì)賬系統(tǒng)每天會(huì)核查是否存在異常訂單。
目前對(duì)賬系統(tǒng)的處理邏輯很簡(jiǎn)單:首先查詢訂單,然后查詢派送單,接著比對(duì)訂單和派送單,將差異記錄寫(xiě)入差異庫(kù)。對(duì)賬系統(tǒng)的核心代碼經(jīng)過(guò)抽象后,也并不復(fù)雜,主要是在單線程中循環(huán)執(zhí)行訂單和派送單的查詢,進(jìn)行對(duì)賬操作,最后將結(jié)果寫(xiě)入差異庫(kù)。
偽代碼
while(存在未對(duì)賬訂單){
// 查詢未對(duì)賬訂單
pos = getPOrders();
// 查詢派送單
dos = getDOrders();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫(xiě)入差異庫(kù)
save(diff);
}
利用Java并行優(yōu)化對(duì)賬系統(tǒng)
首先要找出對(duì)賬系統(tǒng)的瓶頸所在。目前,由于訂單量和派送單量龐大,導(dǎo)致查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 的速度較慢。是否有一種快速優(yōu)化的方法呢?目前對(duì)賬系統(tǒng)是單線程執(zhí)行的。對(duì)于這樣的串行系統(tǒng),優(yōu)化性能的第一個(gè)想法是能否利用多線程并行處理。
因此,我們可以看出對(duì)賬系統(tǒng)的瓶頸在哪里:查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 是否能夠并行處理呢?很顯然,這兩個(gè)操作之間并沒(méi)有依賴關(guān)系。將這兩個(gè)耗時(shí)操作并行化后,與單線程執(zhí)行相比,您會(huì)發(fā)現(xiàn)在相同時(shí)間段內(nèi),并行執(zhí)行的吞吐量接近單線程的兩倍,優(yōu)化效果頗為明顯。
有了這個(gè)思路,接下來(lái)我們看看如何用代碼實(shí)現(xiàn)。在下面的代碼中,我們創(chuàng)建了兩個(gè)線程 T1 和 T2,分別并行執(zhí)行查詢未對(duì)賬訂單 getPOrders() 和查詢派送單 getDOrders() 的操作。主線程則負(fù)責(zé)執(zhí)行對(duì)賬操作 check()和將差異寫(xiě)入 save() 的操作。值得注意的是:主線程需要等待線程 T1 和 T2 執(zhí)行完畢后才能執(zhí)行 check() 和 save() 這兩個(gè)操作。為此,我們通過(guò)調(diào)用 T1.join() 和 T2.join() 實(shí)現(xiàn)等待,當(dāng)線程 T1 和 T2 結(jié)束時(shí),調(diào)用了 T1.join() 和 T2.join() 的主線程將從阻塞狀態(tài)中解除,隨后執(zhí)行后續(xù)的 check() 和 save() 操作。
偽代碼
while(存在未對(duì)賬訂單){
// 查詢未對(duì)賬訂單
Thread T1 = new Thread(()->{
pos = getPOrders();
});
T1.start();
// 查詢派送單
Thread T2 = new Thread(()->{
dos = getDOrders();
});
T2.start();
// 等待 T1、T2 結(jié)束
T1.join();
T2.join();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫(xiě)入差異庫(kù)
save(diff);
}
用 CountDownLatch 實(shí)現(xiàn)線程等待
經(jīng)過(guò)上述優(yōu)化,基本上可以向老板報(bào)告工作完成了,但仍有一些遺憾之處。我相信您也已經(jīng)注意到了,在 while 循環(huán)中每次都會(huì)創(chuàng)建新的線程,而創(chuàng)建線程是一個(gè)耗時(shí)的操作。因此,最好能夠重復(fù)利用已創(chuàng)建的線程。您想到了線程池,確實(shí),線程池能夠解決這個(gè)問(wèn)題。
通過(guò)線程池進(jìn)行優(yōu)化后:我們首先創(chuàng)建了一個(gè)固定大小為2的線程池,并在 while 循環(huán)中重復(fù)利用這些線程。一切看起來(lái)都進(jìn)行得很順利,但似乎有一個(gè)問(wèn)題無(wú)法解決,即主線程如何知道 getPOrders() 和 getDOrders() 這兩個(gè)操作何時(shí)執(zhí)行完成。在前面的方案中,主線程通過(guò)調(diào)用線程 T1 和 T2 的 join() 方法來(lái)等待它們退出,但是在線程池方案中,線程根本就不會(huì)退出,因此 join() 方法失效了。
那么,如何解決這個(gè)問(wèn)題呢?您可以想出許多方法,其中最直接的方法是使用一個(gè)計(jì)數(shù)器。將其初始值設(shè)為2,執(zhí)行完 pos = getPOrders(); 后減 1,執(zhí)行完 dos = getDOrders(); 后也減 1。主線程在這之后等待計(jì)數(shù)器等于0;當(dāng)計(jì)數(shù)器等于0時(shí),說(shuō)明這兩個(gè)查詢操作已執(zhí)行完畢。等待計(jì)數(shù)器為0實(shí)際上是一種條件變量,使用管程實(shí)現(xiàn)起來(lái)也并不復(fù)雜。
然而,我并不建議在實(shí)際項(xiàng)目中實(shí)施上述方案,因?yàn)镴ava并發(fā)包中已經(jīng)提供了類似功能的工具類:CountDownLatch,我們直接使用即可。在下面的代碼示例中,我們?cè)?nbsp;while 循環(huán)中首先創(chuàng)建了一個(gè)CountDownLatch,計(jì)數(shù)器的初始值為2。在 pos = getPOrders(); 和 dos = getDOrders(); 兩個(gè)語(yǔ)句后,通過(guò)調(diào)用 latch.countDown(); 實(shí)現(xiàn)對(duì)計(jì)數(shù)器的減1操作。在主線程中,通過(guò)調(diào)用 latch.await(); 實(shí)現(xiàn)對(duì)計(jì)數(shù)器等于0的等待。
偽代碼
// 創(chuàng)建 2 個(gè)線程的線程池
Executor executor =
Executors.newFixedThreadPool(2);
while(存在未對(duì)賬訂單){
// 計(jì)數(shù)器初始化為 2
CountDownLatch latch =
new CountDownLatch(2);
// 查詢未對(duì)賬訂單
executor.execute(()-> {
pos = getPOrders();
latch.countDown();
});
// 查詢派送單
executor.execute(()-> {
dos = getDOrders();
latch.countDown();
});
// 等待兩個(gè)查詢操作結(jié)束
latch.await();
// 執(zhí)行對(duì)賬操作
diff = check(pos, dos);
// 差異寫(xiě)入差異庫(kù)
save(diff);
}
進(jìn)一步優(yōu)化性能
經(jīng)過(guò)以上一系列的優(yōu)化,終于可以松一口氣,準(zhǔn)備交付項(xiàng)目。然而,在交付之前,再次審視一番是值得的,或許還存在優(yōu)化的空間。
前面我們已經(jīng)實(shí)現(xiàn)了將 getPOrders() 和 getDOrders() 這兩個(gè)查詢操作并行化,但是這兩個(gè)查詢操作與對(duì)賬操作 check() 和 save() 之間仍然是串行執(zhí)行的。很顯然,這兩個(gè)查詢操作與對(duì)賬操作也可以并行執(zhí)行,即在執(zhí)行對(duì)賬操作的同時(shí)可以進(jìn)行下一輪的查詢操作。
接下來(lái),我們?cè)偎伎既绾螌?shí)現(xiàn)這一優(yōu)化。兩次查詢操作能夠與對(duì)賬操作并行執(zhí)行,而對(duì)賬操作又依賴于查詢操作的結(jié)果,這明顯具有生產(chǎn)者-消費(fèi)者模型的特征。兩次查詢操作充當(dāng)生產(chǎn)者,對(duì)賬操作為消費(fèi)者。為了實(shí)現(xiàn)這種模型,我們需要一個(gè)隊(duì)列來(lái)存儲(chǔ)生產(chǎn)者產(chǎn)生的數(shù)據(jù),消費(fèi)者則從隊(duì)列中取出數(shù)據(jù)執(zhí)行相應(yīng)操作。
針對(duì)這個(gè)對(duì)賬項(xiàng)目,我設(shè)計(jì)了兩個(gè)隊(duì)列,其元素之間存在對(duì)應(yīng)關(guān)系。具體來(lái)說(shuō),訂單查詢操作將訂單查詢結(jié)果插入訂單隊(duì)列,派送單查詢操作將派送單插入派送單隊(duì)列,這兩個(gè)隊(duì)列的元素之間是一一對(duì)應(yīng)的。使用兩個(gè)隊(duì)列的好處在于,對(duì)賬操作可以每次從訂單隊(duì)列取出一個(gè)元素和派送單隊(duì)列中取出一個(gè)元素,然后執(zhí)行對(duì)賬操作,確保數(shù)據(jù)的一致性。
接下來(lái),讓我們看看如何通過(guò)雙隊(duì)列實(shí)現(xiàn)完全并行化。一個(gè)直接的思路是:一個(gè)線程 T1 執(zhí)行訂單查詢工作,另一個(gè)線程 T2 執(zhí)行派送單查詢工作。當(dāng)線程 T1 和 T2 都各自生產(chǎn)完一條數(shù)據(jù)時(shí),通知線程 T3 執(zhí)行對(duì)賬操作。這一想法看似簡(jiǎn)單,實(shí)際上仍然存在一個(gè)條件:T1 和 T2 的工作節(jié)奏必須一致,保持同步,否則一個(gè)快一個(gè)慢將影響各自生產(chǎn)數(shù)據(jù)并通知 T3 的過(guò)程。
只有在T1和T2各自生產(chǎn)完一條數(shù)據(jù)時(shí)才能繼續(xù)執(zhí)行,也就是說(shuō),T1和T2需要相互等待,保持步調(diào)一致。同時(shí),當(dāng)T1和T2都生產(chǎn)完一條數(shù)據(jù)時(shí),還需能夠通知T3執(zhí)行對(duì)賬操作。
用 CyclicBarrier 實(shí)現(xiàn)線程同步
接下來(lái)我們將實(shí)現(xiàn)上述方案中提到的方法。該方案的難點(diǎn)在于兩個(gè)方面:一是確保線程 T1 和 T2 的步調(diào)一致,二是能夠有效通知線程 T3。
在解決這兩個(gè)難點(diǎn)的過(guò)程中,仍然可以利用一個(gè)計(jì)數(shù)器。將計(jì)數(shù)器初始化為2,每當(dāng)線程 T1 和 T2 生產(chǎn)完一條數(shù)據(jù)時(shí),都將計(jì)數(shù)器減1。若計(jì)數(shù)器大于0,則線程 T1 或 T2需要等待。當(dāng)計(jì)數(shù)器等于0時(shí),通知線程 T3,喚醒等待的線程 T1 或 T2,并將計(jì)數(shù)器重置為2。如此,線程 T1 和 T2 在生產(chǎn)下一條數(shù)據(jù)時(shí),可以繼續(xù)使用這個(gè)計(jì)數(shù)器。
建議不要在實(shí)際項(xiàng)目中直接實(shí)現(xiàn)這一邏輯,因?yàn)镴ava并發(fā)包中已經(jīng)提供了相關(guān)的工具類:CyclicBarrier。在下面的代碼中,我們首先創(chuàng)建了一個(gè)初始值為2的CyclicBarrier計(jì)數(shù)器。需要注意的是,在創(chuàng)建CyclicBarrier時(shí),傳入了一個(gè)回調(diào)函數(shù)。當(dāng)計(jì)數(shù)器減至0時(shí),該回調(diào)函數(shù)會(huì)被調(diào)用。
線程 T1 負(fù)責(zé)查詢訂單,每查到一條數(shù)據(jù),調(diào)用barrier.await()將計(jì)數(shù)器減1,并等待計(jì)數(shù)器變?yōu)?。線程 T2 負(fù)責(zé)查詢派送單,處理方式與線程 T1 類似。當(dāng) T1 和 T2 都調(diào)用barrier.await()時(shí),計(jì)數(shù)器會(huì)減至0,此時(shí) T1 和 T2可以繼續(xù)執(zhí)行下一步操作,并調(diào)用barrier的回調(diào)函數(shù)執(zhí)行對(duì)賬操作。
值得一提的是,CyclicBarrier的計(jì)數(shù)器具有自動(dòng)重置功能。當(dāng)計(jì)數(shù)器減至0時(shí),會(huì)自動(dòng)重新設(shè)定為您設(shè)置的初始值。這一特性確實(shí)方便實(shí)用。
偽代碼
// 訂單隊(duì)列
Vector<P> pos;
// 派送單隊(duì)列
Vector<D> dos;
// 執(zhí)行回調(diào)的線程池
Executor executor =
Executors.newFixedThreadPool(1);
final CyclicBarrier barrier =
new CyclicBarrier(2, ()->{
executor.execute(()->check());
});
void check(){
P p = pos.remove(0);
D d = dos.remove(0);
// 執(zhí)行對(duì)賬操作
diff = check(p, d);
// 差異寫(xiě)入差異庫(kù)
save(diff);
}
void checkAll(){
// 循環(huán)查詢訂單庫(kù)
Thread T1 = new Thread(()->{
while(存在未對(duì)賬訂單){
// 查詢訂單庫(kù)
pos.add(getPOrders());
// 等待
barrier.await();
}
});
T1.start();
// 循環(huán)查詢運(yùn)單庫(kù)
Thread T2 = new Thread(()->{
while(存在未對(duì)賬訂單){
// 查詢運(yùn)單庫(kù)
dos.add(getDOrders());
// 等待
barrier.await();
}
});
T2.start();
}
CountDownLatch 和 CyclicBarrier 是Java并發(fā)包提供的兩個(gè)非常便捷的線程同步工具類。在這里,有必要再次強(qiáng)調(diào)它們之間的區(qū)別:CountDownLatch 主要用于解決一個(gè)線程等待多個(gè)線程的情況,可以類比于旅游團(tuán)團(tuán)長(zhǎng)必須等待所有游客齊集后才能繼續(xù)前行;而CyclicBarrier 則是一組線程相互等待,有點(diǎn)像幾個(gè)驢友之間的互助合作。此外,CountDownLatch 的計(jì)數(shù)器不支持重復(fù)利用,即一旦計(jì)數(shù)器降至0,后續(xù)調(diào)用await()的線程將直接通過(guò)。相比之下,CyclicBarrier 的計(jì)數(shù)器可以循環(huán)利用,同時(shí)具有自動(dòng)重置功能,一旦計(jì)數(shù)器減至0,將會(huì)自動(dòng)重置為設(shè)定的初始值。此外,CyclicBarrier 還支持設(shè)置回調(diào)函數(shù),功能更加豐富。