服務down機了,線程池中的數據如何保證不丟失?
前言
最近有位小伙伴在我的技術群里,問了我一個問題:服務down機了,線程池中如何保證不丟失數據?
這個問題挺有意思的,今天通過這篇文章,拿出來跟大家一起探討一下。
1 什么是線程池?
之前沒有線程池的時候,我們在代碼中,創(chuàng)建一個線程有兩種方式:
- 繼承Thread類
- 實現Runnable接口
雖說通過這兩種方式創(chuàng)建一個線程,非常方便。
但也帶來了下面的問題:
- 創(chuàng)建和銷毀一個線程,都是比較耗時,頻繁的創(chuàng)建和銷毀線程,非常影響系統(tǒng)的性能。
- 無限制的創(chuàng)建線程,會導致內存不足。
- 有新任務過來時,必須要先創(chuàng)建好線程才能執(zhí)行,不能直接復用線程。
為了解決上面的這些問題,Java中引入了:線程池。
它相當于一個存放線程的池子。
使用線程池帶來了下面3個好處:
- 降低資源消耗。通過重復利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。
- 提高響應速度。當任務到達時,可以直接使用已有空閑的線程,不需要的等到線程創(chuàng)建就能立即執(zhí)行。
- 提高線程的可管理性。線程是稀缺資源,如果無限制的創(chuàng)建,不僅會消耗系統(tǒng)資源,還會降低系統(tǒng)的穩(wěn)定性。而如果我們使用線程池,可以對線程進行統(tǒng)一的分配、管理和監(jiān)控。
2 線程池原理
先看看線程池的構造器:
public ThreadPoolExecutor(
int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
- corePoolSize:核心線程數,線程池維護的最少線程數。
- maximumPoolSize:最大線程數,線程池允許創(chuàng)建的最大線程數。
- keepAliveTime:線程存活時間,當線程數超過核心線程數時,多余的空閑線程的存活時間。
- unit:時間單位。
- workQueue:任務隊列,用于保存等待執(zhí)行的任務。
- threadFactory:線程工廠,用于創(chuàng)建新線程。
- handler:拒絕策略,當任務無法執(zhí)行時的處理策略。
線程池的核心流程圖如下:
圖片
線程池的工作過程如下:
- 線程池初始化:根據corePoolSize初始化核心線程。
- 任務提交:當任務提交到線程池時,根據當前線程數判斷:
- 若當前線程數小于corePoolSize,創(chuàng)建新的線程執(zhí)行任務。
- 若當前線程數大于或等于corePoolSize,任務被加入workQueue隊列。
- 任務處理:當有空閑線程時,從workQueue中取出任務執(zhí)行。
- 線程擴展:若隊列已滿且當前線程數小于maximumPoolSize,創(chuàng)建新的線程處理任務。
- 線程回收:當線程空閑時間超過keepAliveTime,多余的線程會被回收,直到線程數不超過corePoolSize。
- 拒絕策略:若隊列已滿且當前線程數達到maximumPoolSize,則根據拒絕策略處理新任務。
說白了在線程池中,多余的任務會被放到workQueue任務隊列中。
這個任務隊列的數據保存在內存中。
這樣就會出現一些問題。
接下來,看看線程池有哪些問題。
3 線程池有哪些問題?
在JDK中為了方便大家創(chuàng)建線程池,專門提供了Executors這個工具類。
3.1 隊列過大
Executors.newFixedThreadPool,它可以創(chuàng)建固定線程數量的線程池,任務隊列使用的是LinkedBlockingQueue,默認最大容量是Integer.MAX_VALUE。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
如果向newFixedThreadPool線程池中提交的任務太多,可能會導致LinkedBlockingQueue非常大,從而出現OOM問題。
3.2 線程太多
Executors.newCachedThreadPool,它可以創(chuàng)建可緩沖的線程池,最大線程數量是Integer.MAX_VALUE,任務隊列使用的是SynchronousQueue。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
如果向newCachedThreadPool線程池中提交的任務太多,可能會導致創(chuàng)建大量的線程,也會出現OOM問題。
3.3 數據丟失
如果線程池在執(zhí)行過程中,服務突然被重啟了,可能會導致線程池中的數據丟失。
上面的OOM問題,我們在日常開發(fā)中,可以通過自定義線程池的方式解決。
比如創(chuàng)建這樣的線程池:
new ThreadPoolExecutor(8,
10,
30L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(300),
threadFactory);
自定義了一個最大線程數量和任務隊列都在可控范圍內線程池。
這樣做基本上不會出現OOM問題。
但線程池的數據丟失問題,光靠自身的功能很難解決。
4 如何保證數據不丟失?
線程池中的數據,是保存到內存中的,一旦遇到服務器重啟了,數據就會丟失。
之前的系統(tǒng)流程是這樣的:
圖片
用戶請求過來之后,先處理業(yè)務邏輯1,它是系統(tǒng)的核心功能。
然后再將任務提交到線程池,由它處理業(yè)務邏輯2,它是系統(tǒng)的非核心功能。
但如果線程池在處理的過程中,服務down機了,此時,業(yè)務邏輯2的數據就會丟失。
那么,如何保證數據不丟失呢?
答:需要提前做持久化。
我們優(yōu)化的系統(tǒng)流程如下:
圖片
用戶請求過來之后,先處理業(yè)務邏輯1,緊接著向DB中寫入一條任務數據,狀態(tài)是:待執(zhí)行。
處理業(yè)務邏輯1和向DB寫任務數據,可以在同一個事務中,方便出現異常時回滾。
然后有一個專門的定時任務,每個一段時間,按添加時間升序,分頁查詢狀態(tài)是待執(zhí)行的任務。
最早的任務,最先被查出來。
然后將查出的任務提交到線程池中,由它處理業(yè)務邏輯2。
處理成功之后,修改任務的待執(zhí)行狀態(tài)為:已執(zhí)行。
需要注意的是:業(yè)務邏輯2的處理過程,要做冪等性設計,同一個請求允許被執(zhí)行多次,其結果不會有影響。
如果此時,線程池在處理的過程中,服務down機了,業(yè)務邏輯2的數據會丟失。
但此時DB中保存了任務的數據,并且丟失那些任務的狀態(tài)還是:待執(zhí)行。
在下一次定時任務周期開始執(zhí)行時,又會將那些任務數據重新查詢出來,重新提交到線程池中。
業(yè)務邏輯2丟失的數據,又自動回來了。
如果要考慮失敗的情況,還需要在任務表中增加一個失敗次數字段。
在定時任務的線程池中執(zhí)行業(yè)務邏輯2失敗了,在下定時任務執(zhí)行時可以自動重試。
但不可能無限制的一直重試下去。
當失敗超過了一定的次數,可以將任務狀態(tài)改成:失敗。
這樣后續(xù)可以人工處理。