基于 Netty 源碼學習那些并發(fā)技巧
減小鎖的粒度
總說周知,使用synchronized 修飾的代碼塊在單位時間內只允許一個線程操作臨界資源,這使得其他無法操作臨界資源的線程都會阻塞在synchronized所修飾的臨界資源的所持有的ObjectMonitor的_WaitSet 中:
所以通過減小鎖的粒度,可以保證高并發(fā)場景下等待的線程可以盡可能快的得到臨界資源,從而提升程序整體執(zhí)行的并發(fā)度:
這一點Netty中內存池的源碼PoolArena的計算活躍分配數量的方法numActiveAllocations的處理就做的非常出色,可以看到它在計算val時并沒有鎖住整個方法,而是在需要進行臨界計算的部分加一個synchronized 關鍵字:
@Override
public long numActiveAllocations() {
//運算
long val = allocationsSmall.value() + allocationsHuge.value()
- deallocationsHuge.value();
//必要的部分上鎖
synchronized (this) {
val += allocationsNormal - (deallocationsSmall + deallocationsNormal);
}
return max(val, 0);
}
減小空間占用
netty通過totalPendingSize計算待發(fā)送的數據大小,單位為long,為了保證并發(fā)計算的準確性,netty將其設置為volatile保證可見性,再通過AtomicLongFieldUpdater將其封裝為原子類:
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =
AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");
@SuppressWarnings("UnusedDeclaration")
private volatile long totalPendingSize;
那么問題來了?為什么不直接使用AtomicLong類型呢? 總說周知,包裝類內部包含markword等對象頭字段,整體大小遠大于基類型,所以netty為了保證CPU緩存能夠一次性加載totalPendingSize就將其設置為基類型,并自封裝為原子類進行操作,在保證線程安全的同時,又能保證CPU緩存加載和執(zhí)行的效率:
提高鎖的效率
對于需要進行計數但不經常查看結果的變量,Netty使用LongAdder 而不是AtomicLong:
@SuppressJava6Requirement(reason = "Usage guarded by java version check")
final class LongAdderCounter extends LongAdder implements LongCounter {
@Override
public long value() {
return longValue();
}
}
這里筆者也簡單介紹一下原因,前者進行并發(fā)的時候會讓線程計算隨機運算得到LongAdder內部計數數組的某個位置,LongAdder會根據當前計數線程競爭情況對數組進行動態(tài)擴容,最終在計算結果的時候,需要將數組中的所有元素結合起來。 所以這種數據結構對于并發(fā)累加操作性能表現比較好,但是對于統(tǒng)計就表現的差一點,這一點對于LongAdderCounter 這種不定時統(tǒng)計來說再合適不過:
選用合適的并發(fā)技巧
NioEventLoop采用無鎖串行化的設計思路,通過整體并發(fā),局部串行的方式替代多線程消費單個阻塞隊列的方案,這種方案結合了netty中多連接少量線程處理的場景,采用mpsc這種多消費者單生產者隊列讓并發(fā)的線程提交移步任務交給少量的nio線程處理,在整體并行的同時,通過優(yōu)化eventLoop線程邏輯又能讓eventLoop線程能夠高效的串行處理:
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
//提交任務到mpsc隊列中
addTask(task);
//如果當前提交任務的不是eventLoop線程,則從eventLoopGroup中啟動一個線程
//......
if (!addTaskWakesUp && immediate) {
wakeup(inEventLoop);
}
}
然后NIO線程每次循環(huán)時調用runAllTasks有序執(zhí)行:
protected boolean runAllTasks(long timeoutNanos) {
//......
for (;;) {
//執(zhí)行任務
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
//從mpsc中有序獲取
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
小結
本文基于源碼的角度分析了netty中的并發(fā)技巧,希望對你有幫助。