方案設(shè)計(jì):基于庫表分段掃描和數(shù)據(jù)Redis預(yù)熱,優(yōu)化分布式延遲任務(wù)觸達(dá)時(shí)效性
本文轉(zhuǎn)載自微信公眾號(hào)「bugstack蟲洞?!?,作者小傅哥。轉(zhuǎn)載本文請(qǐng)聯(lián)系bugstack蟲洞棧公眾號(hào)。
一、前言
不卷了,能用就行!
哈哈哈,說好的不卷了,能湊活用就行了。但每次接到新需求時(shí)都手癢,想結(jié)合著上一次的架構(gòu)設(shè)計(jì)和落地經(jīng)驗(yàn),在這一次需求上在迭代更新,或者找到完全顛覆之前的更優(yōu)方案。卷完代碼的那一刻總是神清氣爽
其實(shí)大部分喜歡寫代碼的一類純粹碼農(nóng),都是比較卷的,就比如一個(gè)需求在實(shí)現(xiàn)上是能用大概是P5、如果這個(gè)做出來的功能不只是能用還非常好用是P6、除了好用還凝練共性需求開發(fā)成通用的組件服務(wù)是P7。每一個(gè)成長(zhǎng)過來的碼農(nóng),都是在造輪子的路上一次次驗(yàn)證自己的想法和加以實(shí)踐,絕對(duì)不是一篇篇的八股文就能累出來一個(gè)高級(jí)的技術(shù)大牛。
二、延遲任務(wù)場(chǎng)景
什么是延遲任務(wù)?
當(dāng)我們的實(shí)際業(yè)務(wù)需求場(chǎng)景中,有一些活動(dòng)開始前的狀態(tài)變更、訂單結(jié)算后的T+1對(duì)賬、貸款單息費(fèi)的產(chǎn)生,都是需要使用到延遲任務(wù)來進(jìn)行觸達(dá)。實(shí)際的操作一般會(huì)有 Quartz、Schedule 來對(duì)你的庫表數(shù)據(jù)進(jìn)行定時(shí)掃描和處理,當(dāng)條件滿足后做數(shù)據(jù)狀態(tài)的變更或者產(chǎn)生新的數(shù)據(jù)插入到表中。
這樣一個(gè)簡(jiǎn)單的需求就是延遲任務(wù)最初需求,如果需求前期內(nèi)容較少、使用方不多,可能在實(shí)際開發(fā)中就只是一個(gè)單臺(tái)機(jī)器直接對(duì)著表一頓輪訓(xùn)就完事了。但隨著業(yè)務(wù)需求的發(fā)展和功能的復(fù)雜度提升,往往反饋到研發(fā)設(shè)計(jì)和實(shí)現(xiàn),就不那么簡(jiǎn)單了,比如:你需要保障盡可能低延遲完成較大規(guī)模的數(shù)據(jù)量掃描處理,否則就像貸款單息費(fèi)的產(chǎn)生,已經(jīng)到了第二天用戶還沒看到自己的息費(fèi)信息或者是還款后的重新對(duì)賬,可能就這個(gè)時(shí)候就要產(chǎn)生客訴了。
那么,類似這樣的場(chǎng)景該如何設(shè)計(jì)呢?
三、延遲任務(wù)設(shè)計(jì)
通常的任務(wù)中心處理流程主要,主要是由定時(shí)任務(wù)掃描任務(wù)庫表,把即將達(dá)到超時(shí)時(shí)間的任務(wù)信息掃描到處理隊(duì)列(內(nèi)存/MQ消息),再由業(yè)務(wù)系統(tǒng)進(jìn)行處理任務(wù),處理完成后更新庫表中的任務(wù)狀態(tài)。
高延時(shí)任務(wù)調(diào)度
問題:
海量數(shù)據(jù)規(guī)模較大的任務(wù)列表數(shù)據(jù),在分庫分表下該需要快速掃描。
任務(wù)掃描服務(wù)與業(yè)務(wù)邏輯處理,耦合在一起,不具有通用性和復(fù)用性。
細(xì)分任務(wù)體系有些是需要低延遲處理的,不能等待過長(zhǎng)時(shí)間。
1. 任務(wù)表方式
除了一些較小的狀態(tài)變更場(chǎng)景,例如在各自業(yè)務(wù)的庫表中,就包含了一個(gè)狀態(tài)字段,這個(gè)字段一方面有程序邏輯處理變更的狀態(tài),也有到達(dá)指定到期時(shí)間后由任務(wù)服務(wù)自動(dòng)變更處理的操作,一般這類功能,直接設(shè)計(jì)到自己的庫表中即可。
那么還有一些較大也較為頻繁使用的場(chǎng)景,如果都是在每個(gè)系統(tǒng)的各自所需的N多個(gè)表中,都添加這樣的字段進(jìn)行維護(hù),就顯得非常冗余了,也不那么易于維護(hù)。所以針對(duì)這樣的場(chǎng)景就很適合做一個(gè)通用的任務(wù)延時(shí)系統(tǒng),各業(yè)務(wù)系統(tǒng)把需要被延時(shí)執(zhí)行的動(dòng)作提交到延時(shí)系統(tǒng)中,再有延時(shí)系統(tǒng)在指定時(shí)間下進(jìn)行回調(diào),回調(diào)的動(dòng)作可以是接口或者M(jìn)Q消息進(jìn)行觸達(dá)。例如可以設(shè)計(jì)這樣一個(gè)任務(wù)調(diào)度表:
任務(wù)調(diào)度庫表設(shè)計(jì)
抽取的任務(wù)調(diào)度表,主要是拿到什么任務(wù),在什么時(shí)間發(fā)起動(dòng)作,具體的動(dòng)作處理仍交給業(yè)務(wù)工程處理。
大批量的各自業(yè)務(wù)的任務(wù)進(jìn)行集中處理,則需要設(shè)計(jì)一個(gè)分庫分表,滿足于后續(xù)業(yè)務(wù)體量的增長(zhǎng)。
門牌號(hào)設(shè)計(jì),針對(duì)一張表的掃描,如果數(shù)據(jù)量較大,又不希望只是一個(gè)任務(wù)掃描一個(gè)表,可以多個(gè)任務(wù)掃描一個(gè)表,加到掃描的體量。這個(gè)時(shí)候就需要一個(gè)門牌號(hào)來隔離不同任務(wù)掃描的范圍,避免掃描出重復(fù)的任務(wù)數(shù)據(jù)。
2. 低延遲方式
低延遲處理方案,是在任務(wù)表方式的基礎(chǔ)上,新增加的時(shí)間把控處理。它可以把即將到期的前一段時(shí)間的任務(wù),放置到 Redis 集群隊(duì)里中,在消費(fèi)的時(shí)候再從隊(duì)列中 pop 出來,這樣可以更快的接近任務(wù)的處理時(shí)效,避免因?yàn)閽邘扉g隔較大延遲任務(wù)執(zhí)行。
任務(wù)處理流程
- 在接收業(yè)務(wù)系統(tǒng)提交進(jìn)來的延遲任務(wù)時(shí),按照?qǐng)?zhí)行時(shí)間的長(zhǎng)短放置到任務(wù)庫或者也同步到 Redis 集群中,一些執(zhí)行時(shí)間較晚的任務(wù)則可以先放到任務(wù)庫,再通過掃描的方式添加到超時(shí)任務(wù)執(zhí)行隊(duì)列中。
- 那么關(guān)于這塊的設(shè)計(jì)核心在于 Redis 隊(duì)列的使用,以及為了保證消費(fèi)的可靠性需要引入二階段消費(fèi)、注冊(cè) ZK 注冊(cè)中心至少保證一次消費(fèi)的處理。本文重點(diǎn)主要放在 Redis 隊(duì)列的設(shè)計(jì),其他更多的邏輯處理,可以按照業(yè)務(wù)需求進(jìn)行擴(kuò)展和完善
Redis 消費(fèi)隊(duì)列
Redis 消費(fèi)隊(duì)列
- 按照消息體計(jì)算對(duì)應(yīng)數(shù)據(jù)所屬的槽位 index = CRC32 & 7
- StoreQueue 采用 Slot 按照 SlotKey = #{topic}_#{index} 和 Sorted Set 的數(shù)據(jù)結(jié)構(gòu)按執(zhí)行任務(wù)分?jǐn)?shù)排序,存放任務(wù)執(zhí)行信息。定時(shí)消息將時(shí)間戳作為分?jǐn)?shù),消費(fèi)時(shí)每次彈出分?jǐn)?shù)小于當(dāng)前時(shí)間戳的一個(gè)消息
- 為了保障每條消息至少可消費(fèi)一次,消費(fèi)者不是直接 pop 有序集合中的元素,而是將元素從 StoreQueue 移動(dòng)到 PrepareQueue 并返回消息給消費(fèi)者。消費(fèi)成功后再從 PrepareQueue 從刪除,如果消費(fèi)失敗則從PreapreQueue 重新移動(dòng)到 StoreQueue,這樣二階段消費(fèi)的方式進(jìn)行處理。
- 參考文檔:2021 阿里技術(shù)人的百寶黑皮書PDF文,低延遲的超時(shí)中心實(shí)現(xiàn)方式
簡(jiǎn)單案例
@Test
public void test_delay_queue() throws InterruptedException {
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue("TASK");
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
new Thread(() -> {
try {
while (true){
Object take = blockingQueue.take();
System.out.println(take);
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
int i = 0;
while (true){
delayedQueue.offerAsync("測(cè)試" + ++i, 100L, TimeUnit.MILLISECONDS);
Thread.sleep(1000L);
}
}
測(cè)試數(shù)據(jù)
2022-02-13 WARN 204760 --- [ Finalizer] i.l.c.resource.DefaultClientResources : io.lettuce.core.resource.DefaultClientResources was not shut down properly, shutdown() was not called before it's garbage-collected. Call shutdown() or shutdown(long,long,TimeUnit)
測(cè)試1
測(cè)試2
測(cè)試3
測(cè)試4
測(cè)試5
Process finished with exit code -1
- 源碼:https://github.com/fuzhengwei/TimeOutCenter
- 描述:使用 redisson 中的 DelayedQueue 作為消息隊(duì)列,寫入后等待消費(fèi)時(shí)間進(jìn)行 POP 消費(fèi)。
四、總結(jié)
- 調(diào)度任務(wù)的使用在實(shí)際的場(chǎng)景中非常頻繁,例如我們經(jīng)常使用 xxl-job,也有一些大廠自研的分布式任務(wù)調(diào)度組件,這些可能原本都是很小很簡(jiǎn)單的功能,但經(jīng)過抽象、整合、提煉,變成了一個(gè)個(gè)核心通用的中間件服務(wù)。
- 當(dāng)我們?cè)诳紤]使用任務(wù)調(diào)度的時(shí)候,無論哪種方式的設(shè)計(jì)和實(shí)現(xiàn),都需要考慮這個(gè)功能使用時(shí)候的以為迭代和維護(hù)性,如果僅僅是一個(gè)非常小的場(chǎng)景,又沒多少人使用的話,那么在自己機(jī)器上折騰就可以。過渡的設(shè)計(jì)和使用有時(shí)候也會(huì)把研發(fā)資源代入泥潭
- 其實(shí)各項(xiàng)技術(shù)的知識(shí)點(diǎn),都像是一個(gè)個(gè)工具,刀槍棍棒斧鉞鉤,那能怎么結(jié)合各自的特點(diǎn),把這些兵器用起來,才是一個(gè)程序員不斷成長(zhǎng)的過程。如果你希望了解更多此類有深度的技術(shù)內(nèi)容,可以加入 Lottery 分布式抽獎(jiǎng)秒殺系統(tǒng) 學(xué)習(xí)更有價(jià)值的更抗用的實(shí)戰(zhàn)手段。