Java高進(jìn)進(jìn)階之FastThreadLocal源碼詳解(修復(fù)ThreadLocal的缺陷)
前言
ThreadLocal被ThreadLocalMap中的entry的key弱引用,如果出現(xiàn)GC的情況時(shí),
沒(méi)有被其他對(duì)象引用,會(huì)被回收,但是ThreadLocal對(duì)應(yīng)的value卻不會(huì)回收,容易造成內(nèi)存泄漏,這也間接導(dǎo)致了內(nèi)存溢出以及數(shù)據(jù)假丟失;
那么問(wèn)題來(lái)了,有沒(méi)有更高效的ThreadLocal有;
今天我們就來(lái)分析一波FastThreadLocalThread
一、FastThreadLocalThread源碼分析
Netty為了在某些場(chǎng)景下提高性能,改進(jìn)了jdk ThreadLocal,Netty實(shí)現(xiàn)的FastThreadLocal 優(yōu)化了Java 原生 ThreadLocal 的訪(fǎng)問(wèn)速度,存儲(chǔ)速度。避免了檢測(cè)弱引用帶來(lái)的 value 回收難問(wèn)題,和數(shù)組位置沖突帶來(lái)的線(xiàn)性查找問(wèn)題,解決這些問(wèn)題并不是沒(méi)有代價(jià);
Netty實(shí)現(xiàn)的 FastThreadLocal 底層也是通過(guò)數(shù)組存儲(chǔ) value 對(duì)象,與Java原生ThreadLocal使用自身作為Entry的key不同,F(xiàn)astThreadLocal通過(guò)保存數(shù)組的全局唯一下標(biāo),實(shí)現(xiàn)了對(duì)value的快速訪(fǎng)問(wèn)。同時(shí)FastThreadLocal 也實(shí)現(xiàn)了清理對(duì)象的方法;
1、FastThreadLocalThread
在Netty中,要使用 FastThreadLocal 實(shí)現(xiàn)線(xiàn)程本地變量需要將線(xiàn)程包裝成 FastThreadLocalThread ,如果不是 FastThreadLocalThread ,會(huì)使用 slowThreadLocalMap的 ThreadLocal 來(lái)存儲(chǔ)變量副本;
- io.netty.util.concurrent.DefaultThreadFactory
- @Override
- public Thread newThread(Runnable r) {
- Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
- // 一般daemon為false,意思是不設(shè)置為守護(hù)線(xiàn)程
- if (t.isDaemon() != daemon) {
- t.setDaemon(daemon);
- }
- // 優(yōu)先級(jí) 默認(rèn)為5
- if (t.getPriority() != priority) {
- t.setPriority(priority);
- }
- return t;
- }
- protected Thread newThread(Runnable r, String name) {
- return new FastThreadLocalThread(threadGroup, r, name);
- }
FastThreadLocalThread 繼承自Thread類(lèi),有如下成員變量:
- io.netty.util.concurrent.FastThreadLocalThread
- // 任務(wù)執(zhí)行完,是否清除FastThreadLocal的標(biāo)記
- private final boolean cleanupFastThreadLocals;
- // 類(lèi)似于Thread類(lèi)中ThreadLocalMap,為了實(shí)現(xiàn)FastThreadLocal
- private InternalThreadLocalMap threadLocalMap;
2、 InternalThreadLocalMap
FastThreadLocalThread.threadLocalMap 是 InternalThreadLocalMap 對(duì)象實(shí)例。在第一次獲取FTL數(shù)據(jù)時(shí),會(huì)初始化FastThreadLocalThread.threadLocalMap,調(diào)用的構(gòu)造函數(shù)如下:
- private InternalThreadLocalMap() {
- //為了簡(jiǎn)便,InternalThreadLocalMap父類(lèi)
- //UnpaddedInternalThreadLocalMap不展開(kāi)介紹
- super(newIndexedVariableTable());
- }
- //默認(rèn)的數(shù)組大小為32,且使用UNSET對(duì)象填充數(shù)組
- //如果下標(biāo)處數(shù)據(jù)為UNSET,則表示沒(méi)有數(shù)據(jù)
- private static Object[] newIndexedVariableTable() {
- Object[] array = new Object[32];
- Arrays.fill(array, UNSET);
- return array;
- }
為了避免寫(xiě)時(shí)候影響同一cpu緩沖行的其他數(shù)據(jù)并發(fā)訪(fǎng)問(wèn),其使用了緩存行填充技術(shù) (cpu 緩沖行填充),在類(lèi)定義中聲明了如下long字段進(jìn)行填充;
- //InternalThreadLocalMap
- // Cache line padding (must be public)
- // With CompressedOops enabled, an instance of this class should occupy at least 128 bytes.
- public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8, rp9;
FTL使用的數(shù)組下標(biāo)是InternalThreadLocalMap中的靜態(tài)變量nextIndex統(tǒng)一遞增生成的:
- static final AtomicInteger nextIndex = new AtomicInteger();
- public static int nextVariableIndex() {
- //Netty中所有FTL數(shù)組下標(biāo)都是通過(guò)遞增這個(gè)靜態(tài)變量實(shí)現(xiàn)的
- //采用靜態(tài)變量生成所有FTL元素在數(shù)組中的下標(biāo)會(huì)造成一個(gè)問(wèn)題,
- //會(huì)造成InternalThreadLocalMap中數(shù)組不必要的自動(dòng)擴(kuò)容
- int index = nextIndex.getAndIncrement();
- if (index < 0) {
- nextIndex.decrementAndGet();
- throw new IllegalStateException("too many thread-local indexed variables");
- }
- return index;
- }
InternalThreadLocalMap.nextVariableIndex()方法獲取FTL在該FastThreadLocalThread.threadLocalMap數(shù)組下標(biāo),因?yàn)镮nternalThreadLocalMap.nextVariableIndex() 使用靜態(tài)域 nextIndex 遞增維護(hù)所有FTL的下標(biāo),會(huì)造成后面實(shí)例化的 FTL 下標(biāo)過(guò)大,如果FTL下標(biāo)大于其對(duì)應(yīng) FastThreadLocalThread.threadLocalMap 數(shù)組的長(zhǎng)度,會(huì)進(jìn)行數(shù)組的自動(dòng)擴(kuò)容,如下:
- private void expandIndexedVariableTableAndSet(int index, Object value) {
- Object[] oldArray = indexedVariables;
- final int oldCapacity = oldArray.length;
- //下面復(fù)雜的實(shí)現(xiàn)是為了將newCapacity規(guī)范為最接近的一個(gè)2的指數(shù),
- //這段代碼在早期的 jdk HashMap 中見(jiàn)過(guò)
- int newCapacity = index;
- newCapacity |= newCapacity >>> 1;
- newCapacity |= newCapacity >>> 2;
- newCapacity |= newCapacity >>> 4;
- newCapacity |= newCapacity >>> 8;
- newCapacity |= newCapacity >>> 16;
- newCapacity ++;
- Object[] newArray = Arrays.copyOf(oldArray, newCapacity);
- Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);
- newArray[index] = value;
- indexedVariables = newArray;
- }
3、FastThreadLocal
構(gòu)造函數(shù):
有兩個(gè)重要的下標(biāo)域,F(xiàn)TL不僅在FastThreadLocalThread.threadLocalMap中保存了用戶(hù)實(shí)際使用的value(在數(shù)組中的下標(biāo)為index),還在數(shù)組中保存為了實(shí)現(xiàn)清理記錄的相關(guān)數(shù)據(jù),也即下標(biāo)variablesToRemoveIndex,一般情況 variablesToRemoveIndex = 0;因?yàn)関ariablesToRemoveIndex 是靜態(tài)變量,所以全局唯一;
- //如果在該FTL中放入了數(shù)據(jù),也就實(shí)際調(diào)用了其set或get函數(shù),會(huì)在
- //該FastThreadLocalThread.threadLocalMap數(shù)組的
- // variablesToRemoveIndex下標(biāo)處放置一個(gè)IdentityHashMap,
- //并將該FTL放入IdentityHashMap中,在后續(xù)清理時(shí)會(huì)取出
- //variablesToRemoveIndex下標(biāo)處的IdentityHashMap進(jìn)行清理
- private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();
- //在threadLocalMap數(shù)組中存放實(shí)際數(shù)據(jù)的下標(biāo)
- private final int index;
- public FastThreadLocal() {
- index = InternalThreadLocalMap.nextVariableIndex();
- }
用戶(hù)可擴(kuò)展的函數(shù):
- //初始化 value 函數(shù)
- protected V initialValue() throws Exception {
- return null;
- }
- //讓使用者在該FTL被移除時(shí)可以有機(jī)會(huì)做些操作。
- protected void onRemoval(@SuppressWarnings("UnusedParameters") V value) throws Exception { }
FastThreadLocalThread
cleanupFastThreadLocals 字段在 4.1 的最新版本中已經(jīng)沒(méi)有在用到了
- /**
- * true,表示FTL會(huì)在線(xiàn)程結(jié)束時(shí)被主動(dòng)清理 見(jiàn) FastThreadLocalRunnable 類(lèi)
- * false,需要將FTL放入后臺(tái)清理線(xiàn)程的隊(duì)列中
- */
- // This will be set to true if we have a chance to wrap the Runnable.
- //這個(gè)字段則用于標(biāo)識(shí)該線(xiàn)程在結(jié)束時(shí)是否會(huì)主動(dòng)清理FTL
- private final boolean cleanupFastThreadLocals;
- //次對(duì)象將在 第一次 FastThreadLocal.get 和 FastThreadLocal.set 時(shí)候創(chuàng)建
- private InternalThreadLocalMap threadLocalMap;
- public FastThreadLocalThread(Runnable target) {
- super(FastThreadLocalRunnable.wrap(target));
- cleanupFastThreadLocals = true;
- }
4、 set 方法
- public final void set(V value) {
- //判斷設(shè)置的 value 值是否是缺省值
- if (value != InternalThreadLocalMap.UNSET) {
- //獲取當(dāng)前線(xiàn)程的 InternalThreadLocalMap , 如果當(dāng)前線(xiàn)程為FastThreadLocalThread,那么直接通過(guò)threadLocalMap引用獲取
- //否則通過(guò) jdk 原生的 threadLocal 獲取
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- //FastThreadLocal 對(duì)應(yīng)的 index 下標(biāo)的 value 替換成新的 value
- setKnownNotUnset(threadLocalMap, value);
- } else {
- //如果放置的對(duì)象為UNSET,則表示清理,會(huì)對(duì)該FTL進(jìn)行清理,類(lèi)似毒丸對(duì)象
- remove();
- }
- }
這里擴(kuò)容方會(huì)調(diào)用 InternalThreadLocalMap.expandIndexedVariableTableAndSet
- private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
- //在數(shù)組下標(biāo)index處放置實(shí)際對(duì)象,如果index大于數(shù)組length,會(huì)進(jìn)行數(shù)組擴(kuò)容.
- if (threadLocalMap.setIndexedVariable(index, value)) {
- //放置成功之后,將該FTL加入到 variablesToRemoveIndex 下標(biāo)的
- //IdentityHashMap,等待后續(xù)清理
- addToVariablesToRemove(threadLocalMap, this);
- }
- }
- /**
- * 該FTL加入到variablesToRemoveIndex下標(biāo)的IdentityHashMap
- * IdentityHashMap的特性可以保證同一個(gè)實(shí)例不會(huì)被多次加入到該位置
- */
- @SuppressWarnings("unchecked")
- private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
- //獲取 variablesToRemoveIndex下標(biāo)處的 IdentityHashMap
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- Set<FastThreadLocal<?>> variablesToRemove;
- //如果是第一次獲取,則 variablesToRemoveIndex下標(biāo)處的值為 UNSET
- if (v == InternalThreadLocalMap.UNSET || v == null) {
- //新建一個(gè)新的 IdentityHashMap 并
- variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
- //放入到下標(biāo)variablesToRemoveIndex處
- threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
- } else {
- variablesToRemove = (Set<FastThreadLocal<?>>) v;
- }
- //將該FTL放入該IdentityHashMap中
- variablesToRemove.add(variable);
- }
下面看InternalThreadLocalMap.get()實(shí)現(xiàn):
- public static InternalThreadLocalMap get() {
- Thread thread = Thread.currentThread();
- //首先看當(dāng)前 thread 是否為FastThreadLocalThread實(shí)例
- //如果是的話(huà),可以快速通過(guò)引用,獲取到其 threadLocalMap
- if (thread instanceof FastThreadLocalThread) {
- return fastGet((FastThreadLocalThread) thread);
- } else {
- //如果不是,則 jdk 原生慢速獲取到其 threadLocalMap
- return slowGet();
- }
- }
5、 get 方法
get方法極為簡(jiǎn)單,實(shí)現(xiàn)如下:
- ===========================FastThreadLocal==========================
- public final V get() {
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
- Object v = threadLocalMap.indexedVariable(index);
- if (v != InternalThreadLocalMap.UNSET) {
- return (V) v;
- }
- return initialize(threadLocalMap);
- }
首先獲取當(dāng)前線(xiàn)程的map,然后根據(jù) FastThreadLocal的index 獲取value,然后返回,如果是空對(duì)象,則通過(guò) initialize 返回,initialize 方法會(huì)將返回值設(shè)置到 map 的槽位中,并放進(jìn) Set 中;
- initialize
- ============================FastThreadLocal==========================
- private V initialize(InternalThreadLocalMap threadLocalMap) {
- V v = null;
- try {
- //1、獲取初始值
- v = initialValue();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- // 2、設(shè)置value到InternalThreadLocalMap中
- threadLocalMap.setIndexedVariables(index, v);
- // 3、添加當(dāng)前的FastThreadLocal到InternalThreadLocalMap的Set<FastThreadLocal<?>>中
- addToVariablesToRemove(threadLocalMap, this);
- return v;
- }
- //初始化參數(shù):由子類(lèi)復(fù)寫(xiě)
- protected V initialValue() throws Exception {
- return null;
- }
- 獲取 ThreadLocalMap
- 直接通過(guò)索引取出對(duì)象
- 如果為空那么調(diào)用初始化方法初始化
6、ftl的資源回收機(jī)制
netty中ftl的兩種回收機(jī)制回收機(jī)制:
自動(dòng):使用ftlt執(zhí)行一個(gè)被FastThreadLocalRunnable wrap的Runnable任務(wù),在任務(wù)執(zhí)行完畢后會(huì)自動(dòng)進(jìn)行ftl的清理;
手動(dòng):ftl和InternalThreadLocalMap都提供了remove方法,在合適的時(shí)候用戶(hù)可以(有的時(shí)候也是必須,例如普通線(xiàn)程的線(xiàn)程池使用ftl)手動(dòng)進(jìn)行調(diào)用,進(jìn)行顯示刪除;
- FastThreadLocalRunnable
- final class FastThreadLocalRunnable implements Runnable {
- private final Runnable runnable;
- @Override
- public void run() {
- try {
- runnable.run();
- } finally {
- FastThreadLocal.removeAll();
- }
- }
- static Runnable wrap(Runnable runnable) {
- return runnable instanceof FastThreadLocalRunnable
- ? runnable : new FastThreadLocalRunnable(runnable);
- }
- }
如果將線(xiàn)程執(zhí)行的任務(wù)包裝成 FastThreadLocalRunnable,那么在任務(wù)執(zhí)行完后自動(dòng)刪除ftl的資源。
- ===============================FastThreadLocal===========================
- public static void removeAll() {
- // 獲取到map
- InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
- if (threadLocalMap == null) {
- return;
- }
- try {
- // 獲取到Set<FastThreadLocal>集合
- Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
- if (v != null && v != InternalThreadLocalMap.UNSET) {
- @SuppressWarnings("unchecked")
- Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
- // 將Set轉(zhuǎn)換為數(shù)組
- FastThreadLocal<?>[] variablesToRemoveArray =
- variablesToRemove.toArray(new FastThreadLocal[variablesToRemove.size()]);
- // 遍歷數(shù)組,刪除每一個(gè)FastThreadLocal對(duì)應(yīng)的value
- for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
- tlv.remove(threadLocalMap);
- }
- }
- } finally {
- // 刪除當(dāng)前線(xiàn)程的InternalThreadLocalMap
- InternalThreadLocalMap.remove();
- }
- }
- public static void remove() {
- Thread thread = Thread.currentThread();
- if (thread instanceof FastThreadLocalThread) {
- // 將FastThreadLocalThread 內(nèi)部的map置位null
- ((FastThreadLocalThread) thread).setThreadLocalMap(null);
- } else {
- // 將 ThreadLocal內(nèi)部ThreadLocalMap 中的value置位null
- slowThreadLocalMap.remove();
- }
- }
remove方法:
- ===============================FastThreadLocal==========================
- private void remove() {
- remove(InternalThreadLocalMap.getIfSet());
- }
- private void remove(InternalThreadLocalMap threadLocalMap) {
- if (threadLocalMap == null) {
- return;
- }
- // 從 InternalThreadLocalMap 中刪除當(dāng)前的FastThreadLocal對(duì)應(yīng)的value并設(shè)UNSET
- Object v = threadLocalMap.removeIndexedVariable(index);
- // 從 InternalThreadLocalMap 中的Set<FastThreadLocal<?>>中刪除當(dāng)前的FastThreadLocal對(duì)象
- removeFromVariablesToRemove(threadLocalMap, this);
- // 如果刪除的是有效值,則進(jìn)行onRemove方法的回調(diào)
- if (v != InternalThreadLocalMap.UNSET) {
- try {
- // 回調(diào)子類(lèi)復(fù)寫(xiě)的onRemoved方法,默認(rèn)為空實(shí)現(xiàn)
- onRemoved((V) v);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- }
總結(jié)
只有不斷的學(xué)習(xí),不斷的找到自己的缺點(diǎn)才可以進(jìn)步;
一起加油;