感悟優(yōu)化——Netty對(duì)JDK緩沖區(qū)的內(nèi)存池零拷貝改造
NIO中緩沖區(qū)是數(shù)據(jù)傳輸?shù)幕A(chǔ),JDK通過ByteBuffer實(shí)現(xiàn),Netty框架中并未采用JDK原生的ByteBuffer,而是構(gòu)造了ByteBuf。
ByteBuf對(duì)ByteBuffer做了大量的優(yōu)化,比如說內(nèi)存池,零拷貝,引用計(jì)數(shù)(不依賴GC),本文主要是分析這些優(yōu)化,學(xué)習(xí)這些優(yōu)化思想,學(xué)以致用,在實(shí)際工程中,借鑒這些優(yōu)化方案和思想。
直接內(nèi)存和堆內(nèi)存
首先先講一下這里面需要用的基礎(chǔ)知識(shí),在JVM中 內(nèi)存可分為兩大塊,一個(gè)是堆內(nèi)存,一個(gè)是直接內(nèi)存。這里簡(jiǎn)單介紹一下
堆內(nèi)存:
堆內(nèi)存是Jvm所管理的內(nèi)存,相比方法區(qū),棧內(nèi)存,堆內(nèi)存是***的一塊。所有的對(duì)象實(shí)例實(shí)例以及數(shù)組都要在堆上分配。
Java的垃圾收集器是可以在堆上回收垃圾。
直接內(nèi)存:
JVM使用Native函數(shù)在堆外分配內(nèi)存,之后通過Java堆中的DirectByteBuffer對(duì)象作為這塊內(nèi)存的引用進(jìn)行操作。直接內(nèi)存不會(huì)受到Java堆的限制,只受本機(jī)內(nèi)存影響。
Java的GC只會(huì)在老年區(qū)滿了觸發(fā)Full GC時(shí),才會(huì)去順便清理直接內(nèi)存的廢棄對(duì)象。
JDK原生緩沖區(qū)ByteBuffer
在NIO中,所有數(shù)據(jù)都是用緩沖區(qū)處理的。讀寫數(shù)據(jù),都是在緩沖區(qū)中進(jìn)行的。緩存區(qū)實(shí)質(zhì)是是一個(gè)數(shù)組,通常使用字節(jié)緩沖區(qū)——ByteBuffer。
屬性:
使用方式:
ByteBuffer可以申請(qǐng)兩種方式的內(nèi)存,分別為堆內(nèi)存和直接內(nèi)存,首先看申請(qǐng)堆內(nèi)存。
- // 申請(qǐng)堆內(nèi)存
- ByteBuffer HeapbyteBuffer = ByteBuffer.allocate(1024);
很簡(jiǎn)單,就一行代碼,再看看allocate方法。
- public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity);
- }
其實(shí)就是new一個(gè)HeapByteBuffer對(duì)象。這個(gè) HeapByteBuffer繼承自ByteBuffer,構(gòu)造器采用了父類的構(gòu)造器,如下所示:
- HeapByteBuffer(int cap, int lim) { // package-private
- super(-1, 0, lim, cap, new byte[cap], 0); /*
- hb = new byte[cap];
- offset = 0;
- */
- }//ByteBuffer構(gòu)造器
- ByteBuffer(int mark, int pos, int lim, int cap, // package-private
- byte[] hb, int offset)
- {
- super(mark, pos, lim, cap);
- this.hb = hb;
- this.offset = offset;
- }
結(jié)合ByteBuffer的四個(gè)屬性,初始化的時(shí)候就可以賦值capaticy,limit,position,mark,至于byte[] hb, int offsef這兩個(gè)屬性,JDK文檔給出的解釋是 backing array , and array offset 。它是一個(gè)回滾數(shù)組,offset是數(shù)組的偏移值。
申請(qǐng)直接內(nèi)存:
- // 申請(qǐng)直接內(nèi)存
- ByteBuffer DirectbyteBuffer = ByteBuffer.allocateDirect(1024);
allocateDirect()實(shí)際上就是new的一個(gè)DirectByteBuffer對(duì)象,不過這個(gè)new 一個(gè)普通對(duì)象不一樣。這里使用了Native函數(shù)來申請(qǐng)內(nèi)存,在Java中就是調(diào)用unsafe對(duì)象
- public static ByteBuffer allocateDirect(int capacity) { return new DirectByteBuffer(capacity);
- }
- DirectByteBuffer(int cap) { // package-private
- super(-1, 0, cap, cap);
- boolean pa = VM.isDirectMemoryPageAligned(); int ps = Bits.pageSize(); long size = Math.max(1L, (long)cap + (pa ? ps : 0));
- Bits.reserveMemory(size, cap); long base = 0; try { base = unsafe.allocateMemory(size);
- } catch (OutOfMemoryError x) {
- Bits.unreserveMemory(size, cap); throw x;
- } unsafe.setMemory(base, size, (byte) 0); if (pa && (base % ps != 0)) { // Round up to page boundary
- address = base + ps - (base & (ps - 1));
- } else {
- address = base;
- }
- cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
- att = null;
- }
- View Code
申請(qǐng)方法不同的內(nèi)存有不同的用法。接下來看一看ByteBuffer的常用方法與如何使用
ByteBuffer的常用方法與使用方式
Bytebuf的讀和寫是使用put()和get()方法實(shí)現(xiàn)的
- // 讀操作public byte get() { return hb[ix(nextGetIndex())];
- }final int nextGetIndex() { if (position >= limit) throw new BufferUnderflowException(); return position++;
- }// 寫操作public ByteBuffer put(byte x) {
- hb[ix(nextPutIndex())] = x; return this;
- }final int nextPutIndex() { if (position >= limit) throw new BufferOverflowException(); return position++;
- }
從代碼中可以看出,讀和寫操作都會(huì)改變ByteBuffer的position屬性,這兩個(gè)操作是共用的position屬性。這樣就會(huì)帶來一個(gè)問題,讀寫操作會(huì)導(dǎo)致數(shù)據(jù)出錯(cuò)啊,數(shù)據(jù)位置出錯(cuò)。
ByteBuffer提供了flip()方法,讀寫模式切換,切換的時(shí)候會(huì)改變position和limit的位置??纯磃lip()怎么實(shí)現(xiàn)的:
- public final Buffer flip() { // 1. 設(shè)置 limit 為當(dāng)前位置
- limit = position; // 2. 設(shè)置 position 為0
- position = 0;
- mark = -1; return this;
- }
這里就不重點(diǎn)介紹了,有些細(xì)節(jié)可以自己去深究。
Netty的ByteBuf
Netty使用的自身的ByteBuf對(duì)象來進(jìn)行數(shù)據(jù)傳輸,本質(zhì)上使用了外觀模式對(duì)JDK的ByteBuffer進(jìn)行封裝。
相較于原生的ByteBuffer,Netty的ByteBuf做了很多優(yōu)化,零拷貝,內(nèi)存池加速,讀寫索引。
為什么要使用內(nèi)存池?
首先要明白一點(diǎn),Netty的內(nèi)存池是不依賴于JVM本身的GC的。
回顧一下直接內(nèi)存的GC:
上文提到Java的GC只會(huì)在老年區(qū)滿了觸發(fā)Full GC時(shí),才會(huì)去順便清理直接內(nèi)存的廢棄對(duì)象。
JVM中的直接內(nèi)存,存在堆內(nèi)存中其實(shí)就是DirectByteBuffer類,它本身其實(shí)很小,真的內(nèi)存是在堆外,這里是映射關(guān)系。
每次申請(qǐng)直接內(nèi)存,都先看看是否超限 —— 直接內(nèi)存的限額默認(rèn)(可用 -XX:MaxDirectMemorySize 重新設(shè)定)。
如果超過限額,就會(huì)主動(dòng)執(zhí)行System.gc(),這樣會(huì)帶來一個(gè)影響,系統(tǒng)會(huì)中斷100ms。如果沒有成功回收直接內(nèi)存,并且還是超過直接內(nèi)存的限額,就會(huì)拋出OOM——內(nèi)存溢出。
繼續(xù)從GC角度分析,DirectByteBuffer熬過了幾次young gc之后,會(huì)進(jìn)入老年代。當(dāng)老年代滿了之后,會(huì)觸發(fā)Full GC。
因?yàn)楸旧砗苄?,很難占滿老年代,因此基本不會(huì)觸發(fā)Full GC,帶來的后果是大量堆外內(nèi)存一直占著不放,無法進(jìn)行內(nèi)存回收。
還有***一個(gè)辦法,就是依靠申請(qǐng)額度超限時(shí)觸發(fā)的system.gc(),但是前面提到,它會(huì)中斷進(jìn)程100ms,如果在這100ms的之間,系統(tǒng)未完成GC,仍會(huì)拋出OOM。
所以這個(gè)***一個(gè)辦法也不是完全保險(xiǎn)的。
Netty使用了引用計(jì)數(shù)的方式,主動(dòng)回收內(nèi)存?;厥盏膶?duì)象包括非池直接內(nèi)存,和內(nèi)存池中的內(nèi)存。
內(nèi)存池的內(nèi)存泄露檢測(cè)?
Netty中使用引用計(jì)數(shù)機(jī)制來管理資源,ByteBuf實(shí)際上是實(shí)現(xiàn)了ReferenceCounted接口,當(dāng)實(shí)例化ByteBuf對(duì)象時(shí),引用計(jì)數(shù)加1。
當(dāng)應(yīng)用代碼保持一個(gè)對(duì)象引用時(shí),會(huì)調(diào)用retain方法將計(jì)數(shù)增加1,對(duì)象使用完畢進(jìn)行釋放,調(diào)用release將計(jì)數(shù)器減1.
當(dāng)引用計(jì)數(shù)變?yōu)?時(shí),對(duì)象將釋放所有的資源,返回內(nèi)存池。
Netty內(nèi)存泄漏檢測(cè)級(jí)別:
禁用(DISABLED) - 完全禁止泄露檢測(cè)。不推薦。
簡(jiǎn)單(SIMPLE) - 告訴我們?nèi)拥?%的緩沖是否發(fā)生了泄露。默認(rèn)。
高級(jí)(ADVANCED) - 告訴我們?nèi)拥?%的緩沖發(fā)生泄露的地方
偏執(zhí)(PARANOID) - 跟高級(jí)選項(xiàng)類似,但此選項(xiàng)檢測(cè)所有緩沖,而不僅僅是取樣的那1%。此選項(xiàng)在自動(dòng)測(cè)試階段很有用。如果構(gòu)建(build)輸出包含了LEAK,可認(rèn)為構(gòu)建失敗也可以使用JVM的-Dio.netty.leakDetectionLevel選項(xiàng)來指定泄漏檢測(cè)級(jí)別。
內(nèi)存跟蹤
在內(nèi)存池中分配內(nèi)存,得到的ByteBuf對(duì)象都是經(jīng)過 toLeakAwareBuffer()方法封裝的,該方法作用就是對(duì)ByteBuf對(duì)象進(jìn)行引用計(jì)數(shù),使用 SimpleLeakAwareByteBuf或者 AdvancedLeakAwareByteBuf 來包裝ByteBuf。此外該方法只對(duì)非池內(nèi)存中的直接內(nèi)存和內(nèi)存池中的內(nèi)存進(jìn)行內(nèi)存泄露檢測(cè)。
- //裝飾器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf來包裝原始的ByteBufprotected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
- ResourceLeakTracker<ByteBuf> leak;
- //根據(jù)設(shè)置的Level來選擇使用何種裝飾器
- switch (ResourceLeakDetector.getLevel()) { case SIMPLE: //創(chuàng)建用于跟蹤和表示內(nèi)容泄露的ResourcLeak對(duì)象
- leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) {
- //只在ByteBuf.order方法中調(diào)用ResourceLeak.record
- buf = new SimpleLeakAwareByteBuf(buf, leak);
- } break; case ADVANCED:
- case PARANOID:
- leak = AbstractByteBuf.leakDetector.track(buf); if (leak != null) {
- //只在ByteBuf.order方法中調(diào)用ResourceLeak.record
- buf = new AdvancedLeakAwareByteBuf(buf, leak);
- } break; default:
- break;
- } return buf;
- }
實(shí)際上,內(nèi)存泄露檢測(cè)是在 AbstractByteBuf.leakDetector.track(buf)進(jìn)行的,來看看track方法的具體實(shí)現(xiàn)。
- /**
- * Creates a new {@link ResourceLeakTracker} which is expected to be closed via
- * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated.
- *
- * @return the {@link ResourceLeakTracker} or {@code null}
- */
- @SuppressWarnings("unchecked")
- public final ResourceLeakTracker<T> track(T obj) { return track0(obj);
- } @SuppressWarnings("unchecked")
- private DefaultResourceLeak track0(T obj) {
- Level level = ResourceLeakDetector.level;
- // 不進(jìn)行內(nèi)存跟蹤
- if (level == Level.DISABLED) { return null;
- } if (level.ordinal() < Level.PARANOID.ordinal()) { //如果監(jiān)控級(jí)別低于PARANOID,在一定的采樣頻率下報(bào)告內(nèi)存泄露
- if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
- reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks);
- } return null;
- } //每次需要分配 ByteBuf 時(shí),報(bào)告內(nèi)存泄露情況
- reportLeak(); return new DefaultResourceLeak(obj, refQueue, allLeaks);
- }
再來看看返回對(duì)象——DefaultResourceLeak,他的實(shí)現(xiàn)方式如下:
- private static final class DefaultResourceLeak<T> extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak {
它繼承了虛引用WeakReference,虛引用完全不影響目標(biāo)對(duì)象的垃圾回收,但是會(huì)在目標(biāo)對(duì)象被VM垃圾回收時(shí)加入到引用隊(duì)列,
正常情況下ResourceLeak對(duì)象,會(huì)將監(jiān)控的資源的引用計(jì)數(shù)為0時(shí)被清理掉。
但是當(dāng)資源的引用計(jì)數(shù)失常,ResourceLeak對(duì)象也會(huì)被加入到引用隊(duì)列.
存在著這樣一種情況:沒有成對(duì)調(diào)用ByteBuf的retain和relaease方法,導(dǎo)致ByteBuf沒有被正常釋放,當(dāng) ResourceLeak(引用隊(duì)列) 中存在元素時(shí),即表明有內(nèi)存泄露。
Netty中的 reportLeak()方法來報(bào)告內(nèi)存泄露情況,通過檢查引用隊(duì)列來判斷是否有內(nèi)存泄露,并報(bào)告跟蹤情況.
方法代碼如下:
- View Code
Handler中的內(nèi)存處理機(jī)制
Netty中有handler鏈,消息有本Handler傳到下一個(gè)Handler。所以Netty引入了一個(gè)規(guī)則,誰是***使用者,誰負(fù)責(zé)釋放。
根據(jù)誰***使用誰負(fù)責(zé)釋放的原則,每個(gè)Handler對(duì)消息可能有三種處理方式
對(duì)原消息不做處理,調(diào)用 ctx.fireChannelRead(msg)把原消息往下傳,那不用做什么釋放。
將原消息轉(zhuǎn)化為新的消息并調(diào)用 ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。
如果已經(jīng)不再調(diào)用ctx.fireChannelRead(msg)傳遞任何消息,那更要把原消息release掉。
假設(shè)每一個(gè)Handler都把消息往下傳,Handler并也不知道誰是啟動(dòng)Netty時(shí)所設(shè)定的Handler鏈的***一員,所以Netty在Handler鏈的最末補(bǔ)了一個(gè)TailHandler,如果此時(shí)消息仍然是ReferenceCounted類型就會(huì)被release掉。
總結(jié):
1.Netty在不同的內(nèi)存泄漏檢測(cè)級(jí)別情況下,采樣概率是不一樣的,在Simple情況下出現(xiàn)了Leak,要設(shè)置“-Dio.netty.leakDetectionLevel=advanced”再跑一次代碼,找到創(chuàng)建和訪問的地方。
2.Netty中的內(nèi)存泄露檢測(cè)是通過對(duì)ByteBuf對(duì)象進(jìn)行裝飾,利用虛引用和引用計(jì)數(shù)來對(duì)非池中的直接內(nèi)存和內(nèi)存池中內(nèi)存進(jìn)行跟蹤,判斷是否發(fā)生內(nèi)存泄露。
3.計(jì)數(shù)器基于 AtomicIntegerFieldUpdater,因?yàn)锽yteBuf對(duì)象很多,如果都把int包一層AtomicInteger花銷較大,而AtomicIntegerFieldUpdater只需要一個(gè)全局的靜態(tài)變量。
Netty中的內(nèi)存單位
Netty中將內(nèi)存池分為五種不同的形態(tài):Arena,ChunkList,Chunk,Page,SubPage.
首先來看Netty***的內(nèi)存單位PoolArena——連續(xù)的內(nèi)存塊。它是由多個(gè)PoolChunkList和兩個(gè)SubPagePools(一個(gè)是tinySubPagePool,一個(gè)是smallSubPagePool)組成的。如下圖所示:
1.PoolChunkList是一個(gè)雙向的鏈表,PoolChunkList負(fù)責(zé)管理多個(gè)PoolChunk的生命周期。
2.PoolChunk中包含多個(gè)Page,Page的大小默認(rèn)是8192字節(jié),也可以設(shè)置系統(tǒng)變量io.netty.allocator.pageSize來改變頁的大小。自定義頁大小有如下限制:1.必須大于4096字節(jié),2.必須是2的整次數(shù)冪。
3.塊(PoolChunk)的大小是由頁的大小和maxOrder算出來的,計(jì)算公式是: chunkSize = 2^{maxOrder} * pageSize。 maxOrder的默認(rèn)值是11,也可以通過io.netty.allocator.maxOrder系統(tǒng)變量設(shè)置,只能是0-14的范圍,所以chunksize的默認(rèn)大小為:(2^11)*8192=16MB
Page中包含多個(gè)SubPage。
PoolChunk內(nèi)部維護(hù)了一個(gè)平衡二叉樹,如下圖所示:
PoolSubPage
通常一個(gè)頁(page)的大小就達(dá)到了10^13(8192字節(jié)),通常一次申請(qǐng)分配內(nèi)存沒有這么大,可能很小。
于是Netty將頁(page)劃分成更小的片段——SubPage
Netty定義這樣的內(nèi)存單元是為了更好的分配內(nèi)存,接下來看一下一個(gè)ByteBuf是如何在內(nèi)存池中申請(qǐng)內(nèi)存的。
Netty如何分配內(nèi)存池中的內(nèi)存?
分配原則:
內(nèi)存池中的內(nèi)存分配是在PoolArea中進(jìn)行的。
- 申請(qǐng)小于PageSize(默認(rèn)8192字節(jié))的內(nèi)存,會(huì)在SubPagePools中進(jìn)行分配,如果申請(qǐng)內(nèi)存小于512字節(jié),則會(huì)在tingSubPagePools中進(jìn)行分配,如果大于512小于PageSize字節(jié),則會(huì)在smallSubPagePools進(jìn)行分配。
- 申請(qǐng)大于PageSize的內(nèi)存,則會(huì)在PoolChunkList中進(jìn)行分配。
- 申請(qǐng)大于ChunkSize的內(nèi)存,則不會(huì)在內(nèi)存池中申請(qǐng),而且也不會(huì)重用該內(nèi)存。
應(yīng)用中在內(nèi)存池中申請(qǐng)內(nèi)存的方法:
- // 在內(nèi)存池中申請(qǐng) 直接內(nèi)存
- ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 在內(nèi)存池中申請(qǐng) 堆內(nèi)存
- ByteBuf heapByteBuf = ByteBufAllocator.DEFAULT.heapBuffer(1024);
接下來,一層一層的看下來,在Netty中申請(qǐng)內(nèi)存是如何實(shí)現(xiàn)的。就拿申請(qǐng)直接內(nèi)存舉例,首先看directBuffer方法。
- // directBuffer方法實(shí)現(xiàn)
- @Override
- public ByteBuf directBuffer(int initialCapacity) { return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
- }
- // 校驗(yàn)申請(qǐng)大小,返回申請(qǐng)的直接內(nèi)存
- @Override
- public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { if (initialCapacity == 0 && maxCapacity == 0) { return emptyBuf;
- }
- validate(initialCapacity, maxCapacity); return newDirectBuffer(initialCapacity, maxCapacity);
- } //PooledByteBufAllocator類中的 newDirectBuffer方法的實(shí)現(xiàn)
- @Override
- protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) { // Netty避免每個(gè)線程對(duì)內(nèi)存池的競(jìng)爭(zhēng),在每個(gè)線程都提供了PoolThreadCache線程內(nèi)的內(nèi)存池
- PoolThreadCache cache = threadCache.get();
- PoolArena<ByteBuffer> directArena = cache.directArena; // 如果緩存存在,則分配內(nèi)存
- final ByteBuf buf; if (directArena != null) {
- buf = directArena.allocate(cache, initialCapacity, maxCapacity);
- } else { // 緩存不存在,則分配非池內(nèi)存
- buf = PlatformDependent.hasUnsafe() ?
- UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) : new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
- }
- // 通過toLeakAwareBuffer包裝成內(nèi)存泄漏檢測(cè)的buffer
- return toLeakAwareBuffer(buf);
- }
一般情況下,內(nèi)存都是在buf = directArena.allocate(cache, initialCapacity, maxCapacity)這行代碼進(jìn)行內(nèi)存分配的,也就是說在內(nèi)存的連續(xù)塊PoolArena中進(jìn)行的內(nèi)存分配。
接下來,我們根據(jù)內(nèi)存分配原則來進(jìn)行內(nèi)存研讀PoolArena中的allocate方法。
- 1 PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 2 PooledByteBuf<T> buf = newByteBuf(maxCapacity); 3 allocate(cache, buf, reqCapacity); 4 return buf; 5 } 6
- 7 private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { 8 final int normCapacity = normalizeCapacity(reqCapacity); 9 if (isTinyOrSmall(normCapacity)) { // capacity < pageSize10 int tableIdx;11 PoolSubpage<T>[] table;12 boolean tiny = isTiny(normCapacity);13 if (tiny) { // < 51214 15 // 如果申請(qǐng)內(nèi)存小于512字節(jié),則會(huì)在tingSubPagePools中進(jìn)行分配16 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {17 // was able to allocate out of the cache so move on18 return;19 }20 tableIdx = tinyIdx(normCapacity);21 table = tinySubpagePools;22 } else {23 // 如果大于512小于PageSize字節(jié),則會(huì)在smallSubPagePools進(jìn)行分配24 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {25 // was able to allocate out of the cache so move on26 return;27 }28 tableIdx = smallIdx(normCapacity);29 table = smallSubpagePools;30 }31 32 final PoolSubpage<T> head = table[tableIdx];33 34 /**
- 35 * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
- 36 * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
- 37 */38 synchronized (head) {39 final PoolSubpage<T> s = head.next;40 if (s != head) {41 assert s.doNotDestroy && s.elemSize == normCapacity;42 long handle = s.allocate();43 assert handle >= 0;44 s.chunk.initBufWithSubpage(buf, handle, reqCapacity);45 incTinySmallAllocation(tiny);46 return;47 }48 }49 synchronized (this) {50 allocateNormal(buf, reqCapacity, normCapacity);51 }52 53 incTinySmallAllocation(tiny);54 return;55 }56 if (normCapacity <= chunkSize) {57 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {58 // was able to allocate out of the cache so move on59 return;60 }61 synchronized (this) {62 allocateNormal(buf, reqCapacity, normCapacity);63 ++allocationsNormal;64 }65 } else {66 // Huge allocations are never served via the cache so just call allocateHuge67 allocateHuge(buf, reqCapacity);68 }69 }
如何使用內(nèi)存池?
底層IO處理線程的緩沖區(qū)使用堆外直接緩沖區(qū),減少一次IO復(fù)制。業(yè)務(wù)消息的編解碼使用堆緩沖區(qū),分配效率更高,而且不涉及到內(nèi)核緩沖區(qū)的復(fù)制問題。
Netty默認(rèn)不使用內(nèi)存池,需要在創(chuàng)建服務(wù)端或者客戶端的時(shí)候進(jìn)行配置。
- //Boss線程池內(nèi)存池配置.
- .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) //Work線程池內(nèi)存池配置.
- .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT);
本人的想法是:
1.I/O處理線程使內(nèi)存池中的直接內(nèi)存,開啟以上配置
2.在handler處理業(yè)務(wù)的時(shí)候,使用內(nèi)存池中的堆內(nèi)存
還有一點(diǎn)值得注意的是:在使用完內(nèi)存池中的ByteBuf,一定要記得釋放,即調(diào)用release():
- // 在內(nèi)存池中申請(qǐng) 直接內(nèi)存
- ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024); // 歸還到內(nèi)存池
- directByteBuf.release();
如果handler繼承了SimpleChannelInboundHandler,那么它將會(huì)自動(dòng)釋放Bytefuf.詳情可見:
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { boolean release = true; try { if (acceptInboundMessage(msg)) { @SuppressWarnings("unchecked")
- I imsg = (I) msg;
- channelRead0(ctx, imsg);
- } else {
- release = false;
- ctx.fireChannelRead(msg);
- }
- } finally {
- // autoRelease默認(rèn)為true
- if (autoRelease && release) {
- // 釋放Bytebuf,歸還到內(nèi)存池
- ReferenceCountUtil.release(msg);
- }
- }
- }
零拷貝:
該部分是重點(diǎn)介紹的部分,首先將它與傳統(tǒng)的I/O read和write操作作對(duì)比,看看有什么不同,首先需要理解一下用戶態(tài)和內(nèi)存態(tài)的概念
用戶態(tài)(User Mode)和內(nèi)核態(tài)(Kernel Mode),也可以叫用戶空間和內(nèi)核
用戶態(tài):受限的訪問內(nèi)存,并且不允許訪問硬件設(shè)備。
內(nèi)核態(tài):本質(zhì)上是一個(gè)軟件,可以控制計(jì)算機(jī)的硬件資源(如網(wǎng)卡,硬盤),可以訪問內(nèi)存所有數(shù)據(jù)。
用戶程序都是運(yùn)行在用戶態(tài)中的,比如JVM,就是用戶程序,所以它運(yùn)行在用戶態(tài)中。
用戶態(tài)是不能直接訪問硬件設(shè)備的,如果需要一次I/O操作,那就必須利用系統(tǒng)調(diào)用機(jī)制切換到內(nèi)核態(tài)(用戶態(tài)與內(nèi)核態(tài)之間的轉(zhuǎn)換稱為上下文切換),進(jìn)行硬盤讀寫。
比如說一次傳統(tǒng)網(wǎng)絡(luò)I/O:
***步,從用戶態(tài)切換到內(nèi)核態(tài),將用戶緩沖區(qū)的數(shù)據(jù)拷貝到內(nèi)核緩沖區(qū),執(zhí)行send操作。
第二步,數(shù)據(jù)發(fā)送由底層的操作系統(tǒng)進(jìn)行,此時(shí)從內(nèi)核態(tài)切換到用戶態(tài),將內(nèi)核緩存區(qū)的數(shù)據(jù)拷貝到網(wǎng)卡的緩沖區(qū)
總結(jié):也就是一次普通的網(wǎng)絡(luò)I/O,至少經(jīng)過兩次上下文切換,和兩次內(nèi)存拷貝。
什么是零拷貝?
當(dāng)需要傳輸?shù)臄?shù)據(jù)遠(yuǎn)大于內(nèi)核緩沖區(qū)的大小時(shí),內(nèi)核緩沖區(qū)就成為I/O的性能瓶頸。零拷貝就是杜絕了內(nèi)核緩沖區(qū)與用戶緩沖區(qū)的的數(shù)據(jù)拷貝。
所以零拷貝適合大數(shù)據(jù)量的傳輸。
拿傳統(tǒng)的網(wǎng)絡(luò)I/O做對(duì)比,零拷貝I/O是怎樣的一個(gè)過程:
用戶程序執(zhí)行transferTo(),將用戶緩沖區(qū)待發(fā)送的數(shù)據(jù)拷貝到網(wǎng)卡緩沖區(qū)。
很簡(jiǎn)單,一步完成,中間少了用戶態(tài)到內(nèi)存態(tài)的拷貝。
Netty中零拷貝如何實(shí)現(xiàn)
Netty的中零拷貝與上述零拷貝是不一樣的,它并不是系統(tǒng)層面上的零拷貝,只是相對(duì)于ByteBuf而言的。
Netty中的零拷貝:
1.CompositeByteBuf,將多個(gè)ByteBuf合并為一個(gè)邏輯上的ByteBuf,避免了各個(gè)ByteBuf之間的拷貝。
使用方式:
- CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer();
- compositeByteBuf.addComponents(true, ByteBuf1, ByteBuf1);
注意: addComponents***個(gè)參數(shù)必須為true,那么writeIndex才不為0,才能從compositeByteBuf中讀到數(shù)據(jù)。
2.wrapedBuffer()方法,將byte[]數(shù)組包裝成ByteBuf對(duì)象。
- byte[] bytes = data.getBytes();ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes);
Unpooled.wrappedBuffer(bytes)就是進(jìn)行了byte[]數(shù)組的包裝工作,過程中不存在內(nèi)存拷貝。
即包裝出來的ByteBuf和byte[]數(shù)組指向了同一個(gè)存儲(chǔ)空間。因?yàn)橹狄?,所以bytes修改也會(huì)影響 byteBuf 的值。
3.ByteBuf的分割,slice()方法。將一個(gè)ByteBuf對(duì)象切分成多個(gè)ByteBuf對(duì)象。
- ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024);ByteBuf header = directByteBuf.slice(0,50);ByteBuf body = directByteBuf.slice(51,1024);
header和body兩個(gè)ByteBuf對(duì)象實(shí)際上還是指向directByteBuf的存儲(chǔ)空間。
總結(jié):
本文很長(zhǎng)很長(zhǎng),博主陸陸續(xù)續(xù)寫了有一個(gè)月的時(shí)間。但是只是窺探Netty內(nèi)存池中的冰山一角,更多是要在實(shí)際項(xiàng)目中進(jìn)行驗(yàn)證才能起到效果。