基于TTL 解決線程池中 ThreadLocal 線程無法共享的問題
在Java的并發(fā)編程領(lǐng)域中,ThreadLocal被廣泛運用來解決線程安全困境,它巧妙地為每個線程提供獨立的變量副本,有效規(guī)避了線程間數(shù)據(jù)共享的問題。
不過,在使用線程池時,傳遞線程局部變量在父子線程之間并非易事。這是因為ThreadLocal的設(shè)計初衷僅在于線程內(nèi)的數(shù)據(jù)隔離,無法支持跨線程間的數(shù)據(jù)傳遞。
背景
在基于Java的應(yīng)用開發(fā)領(lǐng)域,尤其是在利用Spring框架、異步處理和微服務(wù)架構(gòu)構(gòu)建系統(tǒng)時,常常需要在不同線程或服務(wù)之間傳遞用戶會話、數(shù)據(jù)庫事務(wù)或其他上下文信息。
舉例來說,在處理用戶請求的Web服務(wù)中,記錄日志是必不可少的一環(huán)。這些日志需包含請求的獨特標(biāo)識(如請求ID),這個ID在請求進(jìn)入服務(wù)時生成,并會貫穿整個處理流程,包括可能并發(fā)執(zhí)行的多個子任務(wù)或被分配到線程池中不同線程上執(zhí)行。(在分布式場景中通常會稱之為traceId)
在這種情況下,使用ThreadLocal來存儲請求ID會帶來問題:并發(fā)執(zhí)行的子任務(wù)無法訪問父線程ThreadLocal中存儲的請求ID,而且在使用線程池時,線程的重用可能導(dǎo)致請求ID被錯誤地共享或丟失。
偽代碼:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocalExample {
private static ThreadLocal<String> requestId = new ThreadLocal<>();
public static void main(String[] args) {
requestId.set("12345"); // 設(shè)置請求ID
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println("Child task running in a separate thread: " + requestId.get());
});
executor.shutdown();
}
}
在這個示例中,父線程設(shè)置了請求ID為"12345",但是當(dāng)子任務(wù)在另一個線程中執(zhí)行時,無法訪問到父線程中的ThreadLocal變量requestId,因此子任務(wù)無法獲取到請求ID,可能會輸出null或者""。
偽代碼:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadLocalThreadPoolExample {
private static ThreadLocal<String> requestId = new ThreadLocal<>();
public static void main(String[] args) {
requestId.set("12345"); // 設(shè)置請求ID
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.submit(() -> {
System.out.println("Child task running in a thread pool: " + requestId.get());
});
// 另一個任務(wù)復(fù)用線程
executor.submit(() -> {
System.out.println("Another child task running in the same thread: " + requestId.get());
});
executor.shutdown();
}
}
在這個示例中,如果線程池中的兩個任務(wù)在同一個線程中執(zhí)行,且沒有正確處理ThreadLocal變量,可能會導(dǎo)致第二個任務(wù)獲取到了第一個任務(wù)的請求ID,導(dǎo)致請求ID的錯誤共享。
技術(shù)選型
為了應(yīng)對這一難題,可以采用TransmittableThreadLocal(TTL)這一阿里巴巴開源工具庫,專為解決在使用線程池等會重用線程的情況下,ThreadLocal無法正確管理線程上下文的問題而設(shè)計。
GitHub開源地址:https://github.com/alibaba/transmittable-thread-local
TransmittableThreadLocal基于ThreadLocal進(jìn)行擴展,提供了跨線程傳遞數(shù)據(jù)的能力,確保父線程傳遞值給子線程,并支持線程池等場景下的線程數(shù)據(jù)隔離。
此外,還有JDK自帶的InheritableThreadLocal,用于主子線程間參數(shù)傳遞。然而,這種方式存在一個限制:必須在主線程手動創(chuàng)建子線程才可使用,而在線程池中則難以實現(xiàn)此種傳遞機制。
具體實現(xiàn)
依賴引入
首先,需在項目中引入TransmittableThreadLocal的依賴。若為Maven項目,可添加以下依賴:
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version><!-- 使用最新版本 --></version>
</dependency>
使用TransmittableThreadLocal存儲請求ID
public class RequestContext {
// 使用TransmittableThreadLocal來存儲請求ID
private static final ThreadLocal<String> requestIdTL = new TransmittableThreadLocal<>();
public static void setRequestId(String requestId) {
requestIdTL.set(requestId);
}
public static String getRequestId() {
return requestIdTL.get();
}
public static void clear() {
requestIdTL.remove();
}
}
創(chuàng)建一個線程池,并使用TTL提供的工具類確保線程池兼容TransmittableThreadLocal
import com.alibaba.ttl.threadpool.TtlExecutors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolUtil {
private static final ExecutorService pool = Executors.newFixedThreadPool(10);
// 使用TtlExecutors工具類包裝原始的線程池,使其兼容TransmittableThreadLocal
public static final ExecutorService ttlExecutorService = TtlExecutors.getTtlExecutorService(pool);
public static ExecutorService getExecutorService() {
return ttlExecutorService;
}
}
TtlExecutors是TransmittableThreadLocal(TTL)庫中的一款實用工具類,其機制在于對Java標(biāo)準(zhǔn)庫中的ExecutorService、ScheduledExecutorService等線程池接口的實例進(jìn)行包裝。
通過這種封裝,確保在使用線程池時,能夠正確地傳遞TransmittableThreadLocal中存儲的上下文數(shù)據(jù),即使任務(wù)在不同線程中執(zhí)行。這對于解決在使用線程池時ThreadLocal變量值傳遞的問題至關(guān)重要。
執(zhí)行并行任務(wù),并在任務(wù)中使用RequestContext來訪問請求ID
import java.util.stream.IntStream;
public class Application {
public static void main(String[] args) {
// 模擬Web應(yīng)用中為每個請求設(shè)置唯一的請求ID
String requestId = "REQ-" + System.nanoTime();
RequestContext.setRequestId(requestId);
try {
ExecutorService executorService = ThreadPoolUtil.getExecutorService();
IntStream.range(0, 5).forEach(i ->
executorService.submit(() -> {
// 在子線程中獲取并打印請求ID
System.out.println("Task " + i + " running in thread " + Thread.currentThread().getName() + " with Request ID: " + RequestContext.getRequestId());
})
);
} finally {
// 清理資源
RequestContext.clear();
ThreadPoolUtil.getExecutorService().shutdown();
}
}
}