自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

感悟優(yōu)化——Netty對(duì)JDK緩沖區(qū)的內(nèi)存池零拷貝改造

存儲(chǔ) 存儲(chǔ)軟件
ByteBuf對(duì)ByteBuffer做了大量的優(yōu)化,比如說內(nèi)存池,零拷貝,引用計(jì)數(shù)(不依賴GC),本文主要是分析這些優(yōu)化,學(xué)習(xí)這些優(yōu)化思想,學(xué)以致用,在實(shí)際工程中,借鑒這些優(yōu)化方案和思想。

NIO中緩沖區(qū)是數(shù)據(jù)傳輸?shù)幕A(chǔ),JDK通過ByteBuffer實(shí)現(xiàn),Netty框架中并未采用JDK原生的ByteBuffer,而是構(gòu)造了ByteBuf。

ByteBuf對(duì)ByteBuffer做了大量的優(yōu)化,比如說內(nèi)存池,零拷貝,引用計(jì)數(shù)(不依賴GC),本文主要是分析這些優(yōu)化,學(xué)習(xí)這些優(yōu)化思想,學(xué)以致用,在實(shí)際工程中,借鑒這些優(yōu)化方案和思想。

[[240161]]

直接內(nèi)存和堆內(nèi)存

首先先講一下這里面需要用的基礎(chǔ)知識(shí),在JVM中 內(nèi)存可分為兩大塊,一個(gè)是堆內(nèi)存,一個(gè)是直接內(nèi)存。這里簡(jiǎn)單介紹一下

堆內(nèi)存:

堆內(nèi)存是Jvm所管理的內(nèi)存,相比方法區(qū),棧內(nèi)存,堆內(nèi)存是***的一塊。所有的對(duì)象實(shí)例實(shí)例以及數(shù)組都要在堆上分配。

Java的垃圾收集器是可以在堆上回收垃圾。

直接內(nèi)存:

JVM使用Native函數(shù)在堆外分配內(nèi)存,之后通過Java堆中的DirectByteBuffer對(duì)象作為這塊內(nèi)存的引用進(jìn)行操作。直接內(nèi)存不會(huì)受到Java堆的限制,只受本機(jī)內(nèi)存影響。

Java的GC只會(huì)在老年區(qū)滿了觸發(fā)Full GC時(shí),才會(huì)去順便清理直接內(nèi)存的廢棄對(duì)象。

JDK原生緩沖區(qū)ByteBuffer

在NIO中,所有數(shù)據(jù)都是用緩沖區(qū)處理的。讀寫數(shù)據(jù),都是在緩沖區(qū)中進(jìn)行的。緩存區(qū)實(shí)質(zhì)是是一個(gè)數(shù)組,通常使用字節(jié)緩沖區(qū)——ByteBuffer。

屬性:

使用方式:

ByteBuffer可以申請(qǐng)兩種方式的內(nèi)存,分別為堆內(nèi)存和直接內(nèi)存,首先看申請(qǐng)堆內(nèi)存。

  1. // 申請(qǐng)堆內(nèi)存  
  2. ByteBuffer HeapbyteBuffer = ByteBuffer.allocate(1024); 

很簡(jiǎn)單,就一行代碼,再看看allocate方法。

  1. public static ByteBuffer allocate(int capacity) {        if (capacity < 0)            throw new IllegalArgumentException();        return new HeapByteBuffer(capacity, capacity);  
  2.     } 

其實(shí)就是new一個(gè)HeapByteBuffer對(duì)象。這個(gè) HeapByteBuffer繼承自ByteBuffer,構(gòu)造器采用了父類的構(gòu)造器,如下所示:

  1. HeapByteBuffer(int cap, int lim) {            // package-private 
  2.  
  3.         super(-1, 0, lim, cap, new byte[cap], 0);        /* 
  4.         hb = new byte[cap]; 
  5.         offset = 0; 
  6.         */ 
  7.     }//ByteBuffer構(gòu)造器 
  8.   ByteBuffer(int mark, int pos, int lim, int cap,   // package-private 
  9.                  byte[] hb, int offset) 
  10.     { 
  11.         super(mark, pos, lim, cap); 
  12.         this.hb = hb; 
  13.         this.offset = offset; 
  14.     } 

結(jié)合ByteBuffer的四個(gè)屬性,初始化的時(shí)候就可以賦值capaticy,limit,position,mark,至于byte[] hb, int offsef這兩個(gè)屬性,JDK文檔給出的解釋是 backing array , and array offset 。它是一個(gè)回滾數(shù)組,offset是數(shù)組的偏移值。

申請(qǐng)直接內(nèi)存:

  1. // 申請(qǐng)直接內(nèi)存  
  2.  ByteBuffer DirectbyteBuffer = ByteBuffer.allocateDirect(1024); 

allocateDirect()實(shí)際上就是new的一個(gè)DirectByteBuffer對(duì)象,不過這個(gè)new 一個(gè)普通對(duì)象不一樣。這里使用了Native函數(shù)來申請(qǐng)內(nèi)存,在Java中就是調(diào)用unsafe對(duì)象

  1. public static ByteBuffer allocateDirect(int capacity) {        return new DirectByteBuffer(capacity); 
  2.     } 
  3.  
  4.  DirectByteBuffer(int cap) {                   // package-private 
  5.  
  6.         super(-1, 0, cap, cap); 
  7.         boolean pa = VM.isDirectMemoryPageAligned();        int ps = Bits.pageSize();        long size = Math.max(1L, (long)cap + (pa ? ps : 0)); 
  8.         Bits.reserveMemory(size, cap);        long base = 0;        try {            base = unsafe.allocateMemory(size); 
  9.         } catch (OutOfMemoryError x) { 
  10.             Bits.unreserveMemory(size, cap);            throw x; 
  11.         }        unsafe.setMemory(base, size, (byte) 0);        if (pa && (base % ps != 0)) {            // Round up to page boundary 
  12.             address = base + ps - (base & (ps - 1)); 
  13.         } else { 
  14.             address = base; 
  15.         } 
  16.         cleaner = Cleaner.create(this, new Deallocator(base, size, cap)); 
  17.         att = null
  18.  
  19.     } 
  20. View Code

申請(qǐng)方法不同的內(nèi)存有不同的用法。接下來看一看ByteBuffer的常用方法與如何使用

ByteBuffer的常用方法與使用方式

Bytebuf的讀和寫是使用put()和get()方法實(shí)現(xiàn)的

  1. // 讀操作public byte get() {    return hb[ix(nextGetIndex())]; 
  2. }final int nextGetIndex() {    if (position >= limit)        throw new BufferUnderflowException();    return position++; 
  3. }// 寫操作public ByteBuffer put(byte x) { 
  4.     hb[ix(nextPutIndex())] = x;    return this; 
  5. }final int nextPutIndex() {    if (position >= limit)        throw new BufferOverflowException();    return position++; 

從代碼中可以看出,讀和寫操作都會(huì)改變ByteBuffer的position屬性,這兩個(gè)操作是共用的position屬性。這樣就會(huì)帶來一個(gè)問題,讀寫操作會(huì)導(dǎo)致數(shù)據(jù)出錯(cuò)啊,數(shù)據(jù)位置出錯(cuò)。

ByteBuffer提供了flip()方法,讀寫模式切換,切換的時(shí)候會(huì)改變position和limit的位置??纯磃lip()怎么實(shí)現(xiàn)的:

  1. public final Buffer flip() {    // 1. 設(shè)置 limit 為當(dāng)前位置 
  2.     limit = position;    // 2. 設(shè)置 position 為0 
  3.     position = 0; 
  4.     mark = -1;    return this; 

這里就不重點(diǎn)介紹了,有些細(xì)節(jié)可以自己去深究。

Netty的ByteBuf

Netty使用的自身的ByteBuf對(duì)象來進(jìn)行數(shù)據(jù)傳輸,本質(zhì)上使用了外觀模式對(duì)JDK的ByteBuffer進(jìn)行封裝。

相較于原生的ByteBuffer,Netty的ByteBuf做了很多優(yōu)化,零拷貝,內(nèi)存池加速,讀寫索引。

為什么要使用內(nèi)存池?

首先要明白一點(diǎn),Netty的內(nèi)存池是不依賴于JVM本身的GC的。

回顧一下直接內(nèi)存的GC:

上文提到Java的GC只會(huì)在老年區(qū)滿了觸發(fā)Full GC時(shí),才會(huì)去順便清理直接內(nèi)存的廢棄對(duì)象。

JVM中的直接內(nèi)存,存在堆內(nèi)存中其實(shí)就是DirectByteBuffer類,它本身其實(shí)很小,真的內(nèi)存是在堆外,這里是映射關(guān)系。

每次申請(qǐng)直接內(nèi)存,都先看看是否超限 —— 直接內(nèi)存的限額默認(rèn)(可用 -XX:MaxDirectMemorySize 重新設(shè)定)。

如果超過限額,就會(huì)主動(dòng)執(zhí)行System.gc(),這樣會(huì)帶來一個(gè)影響,系統(tǒng)會(huì)中斷100ms。如果沒有成功回收直接內(nèi)存,并且還是超過直接內(nèi)存的限額,就會(huì)拋出OOM——內(nèi)存溢出。

繼續(xù)從GC角度分析,DirectByteBuffer熬過了幾次young gc之后,會(huì)進(jìn)入老年代。當(dāng)老年代滿了之后,會(huì)觸發(fā)Full GC。

因?yàn)楸旧砗苄?,很難占滿老年代,因此基本不會(huì)觸發(fā)Full GC,帶來的后果是大量堆外內(nèi)存一直占著不放,無法進(jìn)行內(nèi)存回收。

還有***一個(gè)辦法,就是依靠申請(qǐng)額度超限時(shí)觸發(fā)的system.gc(),但是前面提到,它會(huì)中斷進(jìn)程100ms,如果在這100ms的之間,系統(tǒng)未完成GC,仍會(huì)拋出OOM。

所以這個(gè)***一個(gè)辦法也不是完全保險(xiǎn)的。

Netty使用了引用計(jì)數(shù)的方式,主動(dòng)回收內(nèi)存?;厥盏膶?duì)象包括非池直接內(nèi)存,和內(nèi)存池中的內(nèi)存。

內(nèi)存池的內(nèi)存泄露檢測(cè)?

Netty中使用引用計(jì)數(shù)機(jī)制來管理資源,ByteBuf實(shí)際上是實(shí)現(xiàn)了ReferenceCounted接口,當(dāng)實(shí)例化ByteBuf對(duì)象時(shí),引用計(jì)數(shù)加1。

當(dāng)應(yīng)用代碼保持一個(gè)對(duì)象引用時(shí),會(huì)調(diào)用retain方法將計(jì)數(shù)增加1,對(duì)象使用完畢進(jìn)行釋放,調(diào)用release將計(jì)數(shù)器減1.

當(dāng)引用計(jì)數(shù)變?yōu)?時(shí),對(duì)象將釋放所有的資源,返回內(nèi)存池。

Netty內(nèi)存泄漏檢測(cè)級(jí)別:

    禁用(DISABLED)   - 完全禁止泄露檢測(cè)。不推薦。

    簡(jiǎn)單(SIMPLE)     - 告訴我們?nèi)拥?%的緩沖是否發(fā)生了泄露。默認(rèn)。

    高級(jí)(ADVANCED)   - 告訴我們?nèi)拥?%的緩沖發(fā)生泄露的地方

    偏執(zhí)(PARANOID)   - 跟高級(jí)選項(xiàng)類似,但此選項(xiàng)檢測(cè)所有緩沖,而不僅僅是取樣的那1%。此選項(xiàng)在自動(dòng)測(cè)試階段很有用。如果構(gòu)建(build)輸出包含了LEAK,可認(rèn)為構(gòu)建失敗也可以使用JVM的-Dio.netty.leakDetectionLevel選項(xiàng)來指定泄漏檢測(cè)級(jí)別。

內(nèi)存跟蹤

在內(nèi)存池中分配內(nèi)存,得到的ByteBuf對(duì)象都是經(jīng)過 toLeakAwareBuffer()方法封裝的,該方法作用就是對(duì)ByteBuf對(duì)象進(jìn)行引用計(jì)數(shù),使用 SimpleLeakAwareByteBuf或者 AdvancedLeakAwareByteBuf 來包裝ByteBuf。此外該方法只對(duì)非池內(nèi)存中的直接內(nèi)存和內(nèi)存池中的內(nèi)存進(jìn)行內(nèi)存泄露檢測(cè)。

  1. //裝飾器模式,用SimpleLeakAwareByteBuf或AdvancedLeakAwareByteBuf來包裝原始的ByteBufprotected static ByteBuf toLeakAwareBuffer(ByteBuf buf) { 
  2.         ResourceLeakTracker<ByteBuf> leak; 
  3.     //根據(jù)設(shè)置的Level來選擇使用何種裝飾器 
  4.         switch (ResourceLeakDetector.getLevel()) {            case SIMPLE:          //創(chuàng)建用于跟蹤和表示內(nèi)容泄露的ResourcLeak對(duì)象 
  5.                 leak = AbstractByteBuf.leakDetector.track(buf);                if (leak != null) { 
  6.           //只在ByteBuf.order方法中調(diào)用ResourceLeak.record 
  7.                     buf = new SimpleLeakAwareByteBuf(buf, leak); 
  8.                 }                break;            case ADVANCED: 
  9.             case PARANOID: 
  10.                 leak = AbstractByteBuf.leakDetector.track(buf);                if (leak != null) { 
  11.           //只在ByteBuf.order方法中調(diào)用ResourceLeak.record 
  12.                     buf = new AdvancedLeakAwareByteBuf(buf, leak); 
  13.                 }                break;            default
  14.                 break; 
  15.         }        return buf; 
  16.     } 

實(shí)際上,內(nèi)存泄露檢測(cè)是在 AbstractByteBuf.leakDetector.track(buf)進(jìn)行的,來看看track方法的具體實(shí)現(xiàn)。

  1. /** 
  2.      * Creates a new {@link ResourceLeakTracker} which is expected to be closed via 
  3.      * {@link ResourceLeakTracker#close(Object)} when the related resource is deallocated. 
  4.      * 
  5.      * @return the {@link ResourceLeakTracker} or {@code null
  6.      */ 
  7.     @SuppressWarnings("unchecked"
  8.     public final ResourceLeakTracker<T> track(T obj) {        return track0(obj); 
  9.     }    @SuppressWarnings("unchecked"
  10.     private DefaultResourceLeak track0(T obj) { 
  11.         Level level = ResourceLeakDetector.level
  12.       // 不進(jìn)行內(nèi)存跟蹤 
  13.         if (level == Level.DISABLED) {            return null
  14.         }        if (level.ordinal() < Level.PARANOID.ordinal()) {         //如果監(jiān)控級(jí)別低于PARANOID,在一定的采樣頻率下報(bào)告內(nèi)存泄露 
  15.             if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) { 
  16.                 reportLeak();                return new DefaultResourceLeak(obj, refQueue, allLeaks); 
  17.             }            return null
  18.         }        //每次需要分配 ByteBuf 時(shí),報(bào)告內(nèi)存泄露情況 
  19.         reportLeak();        return new DefaultResourceLeak(obj, refQueue, allLeaks); 
  20.     } 

再來看看返回對(duì)象——DefaultResourceLeak,他的實(shí)現(xiàn)方式如下:

  1. private static final class DefaultResourceLeak<T>            extends WeakReference<Object> implements ResourceLeakTracker<T>, ResourceLeak { 

它繼承了虛引用WeakReference,虛引用完全不影響目標(biāo)對(duì)象的垃圾回收,但是會(huì)在目標(biāo)對(duì)象被VM垃圾回收時(shí)加入到引用隊(duì)列,

正常情況下ResourceLeak對(duì)象,會(huì)將監(jiān)控的資源的引用計(jì)數(shù)為0時(shí)被清理掉。

但是當(dāng)資源的引用計(jì)數(shù)失常,ResourceLeak對(duì)象也會(huì)被加入到引用隊(duì)列.

存在著這樣一種情況:沒有成對(duì)調(diào)用ByteBuf的retain和relaease方法,導(dǎo)致ByteBuf沒有被正常釋放,當(dāng) ResourceLeak(引用隊(duì)列) 中存在元素時(shí),即表明有內(nèi)存泄露。

Netty中的 reportLeak()方法來報(bào)告內(nèi)存泄露情況,通過檢查引用隊(duì)列來判斷是否有內(nèi)存泄露,并報(bào)告跟蹤情況.

方法代碼如下:

  1. View Code 

Handler中的內(nèi)存處理機(jī)制

Netty中有handler鏈,消息有本Handler傳到下一個(gè)Handler。所以Netty引入了一個(gè)規(guī)則,誰是***使用者,誰負(fù)責(zé)釋放。

根據(jù)誰***使用誰負(fù)責(zé)釋放的原則,每個(gè)Handler對(duì)消息可能有三種處理方式

對(duì)原消息不做處理,調(diào)用 ctx.fireChannelRead(msg)把原消息往下傳,那不用做什么釋放。

將原消息轉(zhuǎn)化為新的消息并調(diào)用 ctx.fireChannelRead(newMsg)往下傳,那必須把原消息release掉。

如果已經(jīng)不再調(diào)用ctx.fireChannelRead(msg)傳遞任何消息,那更要把原消息release掉。

假設(shè)每一個(gè)Handler都把消息往下傳,Handler并也不知道誰是啟動(dòng)Netty時(shí)所設(shè)定的Handler鏈的***一員,所以Netty在Handler鏈的最末補(bǔ)了一個(gè)TailHandler,如果此時(shí)消息仍然是ReferenceCounted類型就會(huì)被release掉。

總結(jié):

1.Netty在不同的內(nèi)存泄漏檢測(cè)級(jí)別情況下,采樣概率是不一樣的,在Simple情況下出現(xiàn)了Leak,要設(shè)置“-Dio.netty.leakDetectionLevel=advanced”再跑一次代碼,找到創(chuàng)建和訪問的地方。

2.Netty中的內(nèi)存泄露檢測(cè)是通過對(duì)ByteBuf對(duì)象進(jìn)行裝飾,利用虛引用和引用計(jì)數(shù)來對(duì)非池中的直接內(nèi)存和內(nèi)存池中內(nèi)存進(jìn)行跟蹤,判斷是否發(fā)生內(nèi)存泄露。

3.計(jì)數(shù)器基于 AtomicIntegerFieldUpdater,因?yàn)锽yteBuf對(duì)象很多,如果都把int包一層AtomicInteger花銷較大,而AtomicIntegerFieldUpdater只需要一個(gè)全局的靜態(tài)變量。

Netty中的內(nèi)存單位

Netty中將內(nèi)存池分為五種不同的形態(tài):Arena,ChunkList,Chunk,Page,SubPage.

首先來看Netty***的內(nèi)存單位PoolArena——連續(xù)的內(nèi)存塊。它是由多個(gè)PoolChunkList和兩個(gè)SubPagePools(一個(gè)是tinySubPagePool,一個(gè)是smallSubPagePool)組成的。如下圖所示:

 

1.PoolChunkList是一個(gè)雙向的鏈表,PoolChunkList負(fù)責(zé)管理多個(gè)PoolChunk的生命周期。

2.PoolChunk中包含多個(gè)Page,Page的大小默認(rèn)是8192字節(jié),也可以設(shè)置系統(tǒng)變量io.netty.allocator.pageSize來改變頁的大小。自定義頁大小有如下限制:1.必須大于4096字節(jié),2.必須是2的整次數(shù)冪。

3.塊(PoolChunk)的大小是由頁的大小和maxOrder算出來的,計(jì)算公式是: chunkSize = 2^{maxOrder} * pageSize。 maxOrder的默認(rèn)值是11,也可以通過io.netty.allocator.maxOrder系統(tǒng)變量設(shè)置,只能是0-14的范圍,所以chunksize的默認(rèn)大小為:(2^11)*8192=16MB

Page中包含多個(gè)SubPage。

PoolChunk內(nèi)部維護(hù)了一個(gè)平衡二叉樹,如下圖所示:

 

PoolSubPage

通常一個(gè)頁(page)的大小就達(dá)到了10^13(8192字節(jié)),通常一次申請(qǐng)分配內(nèi)存沒有這么大,可能很小。

于是Netty將頁(page)劃分成更小的片段——SubPage

 

Netty定義這樣的內(nèi)存單元是為了更好的分配內(nèi)存,接下來看一下一個(gè)ByteBuf是如何在內(nèi)存池中申請(qǐng)內(nèi)存的。

Netty如何分配內(nèi)存池中的內(nèi)存?

分配原則:

內(nèi)存池中的內(nèi)存分配是在PoolArea中進(jìn)行的。

  1. 申請(qǐng)小于PageSize(默認(rèn)8192字節(jié))的內(nèi)存,會(huì)在SubPagePools中進(jìn)行分配,如果申請(qǐng)內(nèi)存小于512字節(jié),則會(huì)在tingSubPagePools中進(jìn)行分配,如果大于512小于PageSize字節(jié),則會(huì)在smallSubPagePools進(jìn)行分配。
  2. 申請(qǐng)大于PageSize的內(nèi)存,則會(huì)在PoolChunkList中進(jìn)行分配。
  3. 申請(qǐng)大于ChunkSize的內(nèi)存,則不會(huì)在內(nèi)存池中申請(qǐng),而且也不會(huì)重用該內(nèi)存。

應(yīng)用中在內(nèi)存池中申請(qǐng)內(nèi)存的方法:

  1. // 在內(nèi)存池中申請(qǐng) 直接內(nèi)存 
  2.         ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024);        // 在內(nèi)存池中申請(qǐng) 堆內(nèi)存 
  3.         ByteBuf heapByteBuf = ByteBufAllocator.DEFAULT.heapBuffer(1024); 

接下來,一層一層的看下來,在Netty中申請(qǐng)內(nèi)存是如何實(shí)現(xiàn)的。就拿申請(qǐng)直接內(nèi)存舉例,首先看directBuffer方法。

  1. // directBuffer方法實(shí)現(xiàn) 
  2.     @Override 
  3.     public ByteBuf directBuffer(int initialCapacity) {        return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY); 
  4.     }     
  5.     // 校驗(yàn)申請(qǐng)大小,返回申請(qǐng)的直接內(nèi)存 
  6.     @Override 
  7.     public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {        if (initialCapacity == 0 && maxCapacity == 0) {            return emptyBuf; 
  8.         } 
  9.         validate(initialCapacity, maxCapacity);        return newDirectBuffer(initialCapacity, maxCapacity); 
  10.     }    //PooledByteBufAllocator類中的 newDirectBuffer方法的實(shí)現(xiàn) 
  11.     @Override 
  12.     protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {        // Netty避免每個(gè)線程對(duì)內(nèi)存池的競(jìng)爭(zhēng),在每個(gè)線程都提供了PoolThreadCache線程內(nèi)的內(nèi)存池 
  13.         PoolThreadCache cache = threadCache.get(); 
  14.         PoolArena<ByteBuffer> directArena = cache.directArena;       // 如果緩存存在,則分配內(nèi)存 
  15.         final ByteBuf buf;        if (directArena != null) { 
  16.             buf = directArena.allocate(cache, initialCapacity, maxCapacity); 
  17.         } else {       // 緩存不存在,則分配非池內(nèi)存 
  18.             buf = PlatformDependent.hasUnsafe() ? 
  19.                     UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity); 
  20.         } 
  21.     // 通過toLeakAwareBuffer包裝成內(nèi)存泄漏檢測(cè)的buffer 
  22.         return toLeakAwareBuffer(buf); 
  23.     } 

一般情況下,內(nèi)存都是在buf = directArena.allocate(cache, initialCapacity, maxCapacity)這行代碼進(jìn)行內(nèi)存分配的,也就是說在內(nèi)存的連續(xù)塊PoolArena中進(jìn)行的內(nèi)存分配。

接下來,我們根據(jù)內(nèi)存分配原則來進(jìn)行內(nèi)存研讀PoolArena中的allocate方法。

  1. 1     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) { 2         PooledByteBuf<T> buf = newByteBuf(maxCapacity); 3         allocate(cache, buf, reqCapacity); 4         return buf; 5     } 6  
  2.  7     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) { 8         final int normCapacity = normalizeCapacity(reqCapacity); 9         if (isTinyOrSmall(normCapacity)) { // capacity < pageSize10             int tableIdx;11             PoolSubpage<T>[] table;12             boolean tiny = isTiny(normCapacity);13             if (tiny) { // < 51214 15           // 如果申請(qǐng)內(nèi)存小于512字節(jié),則會(huì)在tingSubPagePools中進(jìn)行分配16                 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {17                     // was able to allocate out of the cache so move on18                     return;19                 }20                 tableIdx = tinyIdx(normCapacity);21                 table = tinySubpagePools;22             } else {23           // 如果大于512小于PageSize字節(jié),則會(huì)在smallSubPagePools進(jìn)行分配24                 if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {25                     // was able to allocate out of the cache so move on26                     return;27                 }28                 tableIdx = smallIdx(normCapacity);29                 table = smallSubpagePools;30             }31 32             final PoolSubpage<T> head = table[tableIdx];33 34             /** 
  3. 35              * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and 
  4. 36              * {@link PoolChunk#free(long)} may modify the doubly linked list as well. 
  5. 37              */38             synchronized (head) {39                 final PoolSubpage<T> s = head.next;40                 if (s != head) {41                     assert s.doNotDestroy && s.elemSize == normCapacity;42                     long handle = s.allocate();43                     assert handle >= 0;44                     s.chunk.initBufWithSubpage(buf, handle, reqCapacity);45                     incTinySmallAllocation(tiny);46                     return;47                 }48             }49             synchronized (this) {50                 allocateNormal(buf, reqCapacity, normCapacity);51             }52 53             incTinySmallAllocation(tiny);54             return;55         }56         if (normCapacity <= chunkSize) {57             if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {58                 // was able to allocate out of the cache so move on59                 return;60             }61             synchronized (this) {62                 allocateNormal(buf, reqCapacity, normCapacity);63                 ++allocationsNormal;64             }65         } else {66             // Huge allocations are never served via the cache so just call allocateHuge67             allocateHuge(buf, reqCapacity);68         }69     } 

如何使用內(nèi)存池?

底層IO處理線程的緩沖區(qū)使用堆外直接緩沖區(qū),減少一次IO復(fù)制。業(yè)務(wù)消息的編解碼使用堆緩沖區(qū),分配效率更高,而且不涉及到內(nèi)核緩沖區(qū)的復(fù)制問題。

Netty默認(rèn)不使用內(nèi)存池,需要在創(chuàng)建服務(wù)端或者客戶端的時(shí)候進(jìn)行配置。

  1. //Boss線程池內(nèi)存池配置. 
  2. .option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT) //Work線程池內(nèi)存池配置.  
  3. .childOption(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT); 

本人的想法是:

1.I/O處理線程使內(nèi)存池中的直接內(nèi)存,開啟以上配置

2.在handler處理業(yè)務(wù)的時(shí)候,使用內(nèi)存池中的堆內(nèi)存

還有一點(diǎn)值得注意的是:在使用完內(nèi)存池中的ByteBuf,一定要記得釋放,即調(diào)用release():

  1. // 在內(nèi)存池中申請(qǐng) 直接內(nèi)存 
  2.         ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024);        // 歸還到內(nèi)存池 
  3.         directByteBuf.release(); 

如果handler繼承了SimpleChannelInboundHandler,那么它將會(huì)自動(dòng)釋放Bytefuf.詳情可見:

  1. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {        boolean release = true;        try {            if (acceptInboundMessage(msg)) {                @SuppressWarnings("unchecked"
  2.                 I imsg = (I) msg; 
  3.                 channelRead0(ctx, imsg); 
  4.             } else { 
  5.                 release = false
  6.                 ctx.fireChannelRead(msg); 
  7.             } 
  8.         } finally { 
  9.       // autoRelease默認(rèn)為true 
  10.             if (autoRelease && release) { 
  11.       // 釋放Bytebuf,歸還到內(nèi)存池 
  12.                 ReferenceCountUtil.release(msg); 
  13.             } 
  14.         } 
  15.     } 

零拷貝:

該部分是重點(diǎn)介紹的部分,首先將它與傳統(tǒng)的I/O read和write操作作對(duì)比,看看有什么不同,首先需要理解一下用戶態(tài)和內(nèi)存態(tài)的概念

用戶態(tài)(User Mode)和內(nèi)核態(tài)(Kernel Mode),也可以叫用戶空間和內(nèi)核

用戶態(tài):受限的訪問內(nèi)存,并且不允許訪問硬件設(shè)備。

內(nèi)核態(tài):本質(zhì)上是一個(gè)軟件,可以控制計(jì)算機(jī)的硬件資源(如網(wǎng)卡,硬盤),可以訪問內(nèi)存所有數(shù)據(jù)。

用戶程序都是運(yùn)行在用戶態(tài)中的,比如JVM,就是用戶程序,所以它運(yùn)行在用戶態(tài)中。

用戶態(tài)是不能直接訪問硬件設(shè)備的,如果需要一次I/O操作,那就必須利用系統(tǒng)調(diào)用機(jī)制切換到內(nèi)核態(tài)(用戶態(tài)與內(nèi)核態(tài)之間的轉(zhuǎn)換稱為上下文切換),進(jìn)行硬盤讀寫。

比如說一次傳統(tǒng)網(wǎng)絡(luò)I/O:

***步,從用戶態(tài)切換到內(nèi)核態(tài),將用戶緩沖區(qū)的數(shù)據(jù)拷貝到內(nèi)核緩沖區(qū),執(zhí)行send操作。

第二步,數(shù)據(jù)發(fā)送由底層的操作系統(tǒng)進(jìn)行,此時(shí)從內(nèi)核態(tài)切換到用戶態(tài),將內(nèi)核緩存區(qū)的數(shù)據(jù)拷貝到網(wǎng)卡的緩沖區(qū)

總結(jié):也就是一次普通的網(wǎng)絡(luò)I/O,至少經(jīng)過兩次上下文切換,和兩次內(nèi)存拷貝。

什么是零拷貝? 

當(dāng)需要傳輸?shù)臄?shù)據(jù)遠(yuǎn)大于內(nèi)核緩沖區(qū)的大小時(shí),內(nèi)核緩沖區(qū)就成為I/O的性能瓶頸。零拷貝就是杜絕了內(nèi)核緩沖區(qū)與用戶緩沖區(qū)的的數(shù)據(jù)拷貝。

所以零拷貝適合大數(shù)據(jù)量的傳輸。

拿傳統(tǒng)的網(wǎng)絡(luò)I/O做對(duì)比,零拷貝I/O是怎樣的一個(gè)過程:

用戶程序執(zhí)行transferTo(),將用戶緩沖區(qū)待發(fā)送的數(shù)據(jù)拷貝到網(wǎng)卡緩沖區(qū)。

很簡(jiǎn)單,一步完成,中間少了用戶態(tài)到內(nèi)存態(tài)的拷貝。

Netty中零拷貝如何實(shí)現(xiàn)

Netty的中零拷貝與上述零拷貝是不一樣的,它并不是系統(tǒng)層面上的零拷貝,只是相對(duì)于ByteBuf而言的。

Netty中的零拷貝:

1.CompositeByteBuf,將多個(gè)ByteBuf合并為一個(gè)邏輯上的ByteBuf,避免了各個(gè)ByteBuf之間的拷貝。

使用方式:

  1. CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(); 
  2. compositeByteBuf.addComponents(true, ByteBuf1, ByteBuf1); 

注意: addComponents***個(gè)參數(shù)必須為true,那么writeIndex才不為0,才能從compositeByteBuf中讀到數(shù)據(jù)。

2.wrapedBuffer()方法,將byte[]數(shù)組包裝成ByteBuf對(duì)象。

  1. byte[] bytes = data.getBytes();ByteBuf byteBuf = Unpooled.wrappedBuffer(bytes); 

Unpooled.wrappedBuffer(bytes)就是進(jìn)行了byte[]數(shù)組的包裝工作,過程中不存在內(nèi)存拷貝。

即包裝出來的ByteBuf和byte[]數(shù)組指向了同一個(gè)存儲(chǔ)空間。因?yàn)橹狄?,所以bytes修改也會(huì)影響 byteBuf 的值。

3.ByteBuf的分割,slice()方法。將一個(gè)ByteBuf對(duì)象切分成多個(gè)ByteBuf對(duì)象。

  1. ByteBuf directByteBuf = ByteBufAllocator.DEFAULT.directBuffer(1024);ByteBuf header = directByteBuf.slice(0,50);ByteBuf body = directByteBuf.slice(51,1024); 

header和body兩個(gè)ByteBuf對(duì)象實(shí)際上還是指向directByteBuf的存儲(chǔ)空間。

總結(jié):

本文很長(zhǎng)很長(zhǎng),博主陸陸續(xù)續(xù)寫了有一個(gè)月的時(shí)間。但是只是窺探Netty內(nèi)存池中的冰山一角,更多是要在實(shí)際項(xiàng)目中進(jìn)行驗(yàn)證才能起到效果。

 

責(zé)任編輯:武曉燕 來源: 漫談Java架構(gòu)
相關(guān)推薦

2009-11-16 17:26:17

Oracle優(yōu)化緩沖區(qū)

2021-12-09 09:30:38

字節(jié)流文件緩沖區(qū)

2011-07-20 10:54:14

C++

2009-11-16 16:59:24

Oracle優(yōu)化庫高速

2025-04-27 08:25:00

Netty零拷貝內(nèi)存

2020-10-29 08:41:20

JavaNetty緩沖

2022-05-13 09:02:34

LinuxBufferCache

2019-02-27 13:58:29

漏洞緩沖區(qū)溢出系統(tǒng)安全

2017-01-09 17:03:34

2011-12-14 16:30:42

javanio

2020-10-27 09:51:18

漏洞

2009-09-24 18:16:40

2009-11-16 17:08:59

Oracle日志緩沖區(qū)

2018-01-26 14:52:43

2017-07-04 17:09:10

Map環(huán)形緩沖區(qū)數(shù)據(jù)

2014-07-30 11:21:46

2010-12-27 10:21:21

2009-07-15 15:50:48

Jython線程

2023-10-09 23:01:09

MySQL數(shù)據(jù)庫

2010-09-08 15:43:18

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)