MapReduce源碼解析--環(huán)形緩沖區(qū)
這篇文章把Map階段的環(huán)形緩沖區(qū)單獨(dú)拿出來進(jìn)行分析,對(duì)環(huán)形緩沖區(qū)的數(shù)據(jù)結(jié)構(gòu)和數(shù)據(jù)進(jìn)入環(huán)形緩沖區(qū)然后溢寫到磁盤的流程進(jìn)行分析。
環(huán)形緩沖區(qū)數(shù)據(jù)結(jié)構(gòu)
Map過程中環(huán)形緩沖區(qū)是指數(shù)據(jù)被map處理之后會(huì)先放入內(nèi)存,內(nèi)存中的這片區(qū)域就是環(huán)形緩沖區(qū)。
環(huán)形緩沖區(qū)是在MapTask.MapOutputBuffer中定義的,相關(guān)的屬性如下:
- // k/v accounting
- // 存放meta數(shù)據(jù)的IntBuffer,都是int entry,占4byte
- private IntBuffer kvmeta; // metadata overlay on backing store
- int kvstart; // marks origin of spill metadata
- int kvend; // marks end of spill metadata
- int kvindex; // marks end of fully serialized records
- // 分割meta和key value內(nèi)容的標(biāo)識(shí)
- // meta數(shù)據(jù)和key value內(nèi)容都存放在同一個(gè)環(huán)形緩沖區(qū),所以需要分隔開
- int equator; // marks origin of meta/serialization
- int bufstart; // marks beginning of spill
- int bufend; // marks beginning of collectable
- int bufmark; // marks end of record
- int bufindex; // marks end of collected
- int bufvoid; // marks the point where we should stop
- // reading at the end of the buffer
- // 存放key value的byte數(shù)組,單位是byte,注意與kvmeta區(qū)分
- byte[] kvbuffer; // main output buffer
- private final byte[] b0 = new byte[0];
- // key value在kvbuffer中的地址存放在偏移kvindex的距離
- private static final int VALSTART = 0; // val offset in acct
- private static final int KEYSTART = 1; // key offset in acct
- // partition信息存在kvmeta中偏移kvindex的距離
- private static final int PARTITION = 2; // partition offset in acct
- private static final int VALLEN = 3; // length of value
- // 一對(duì)key value的meta數(shù)據(jù)在kvmeta中占用的個(gè)數(shù)
- private static final int NMETA = 4; // num meta ints
- // 一對(duì)key value的meta數(shù)據(jù)在kvmeta中占用的byte數(shù)
- private static final int METASIZE = NMETA * 4; // size in bytes
環(huán)形緩沖區(qū)其實(shí)是一個(gè)數(shù)組,數(shù)組中存放著key、value的序列化數(shù)據(jù)和key、value的元數(shù)據(jù)信息,key/value的元數(shù)據(jù)存儲(chǔ)的格式是int類型,每個(gè)key/value對(duì)應(yīng)一個(gè)元數(shù)據(jù),元數(shù)據(jù)由4個(gè)int組成,第一個(gè)int存放value的起始位置,第二個(gè)存放key的起始位置,第三個(gè)存放partition,最后一個(gè)存放value的長度。
key/value序列化的數(shù)據(jù)和元數(shù)據(jù)在環(huán)形緩沖區(qū)中的存儲(chǔ)是由equator分隔的,key/value按照索引遞增的方向存儲(chǔ),meta則按照索引遞減的方向存儲(chǔ),將其數(shù)組抽象為一個(gè)環(huán)形結(jié)構(gòu)之后,以equator為界,key/value順時(shí)針存儲(chǔ),meta逆時(shí)針存儲(chǔ)。
初始化
環(huán)形緩沖區(qū)的結(jié)構(gòu)在MapOutputBuffer.init中創(chuàng)建。
- public void init(MapOutputCollector.Context context
- ) throws IOException, ClassNotFoundException {
- ...
- //MAP_SORT_SPILL_PERCENT = mapreduce.map.sort.spill.percent
- // map 端buffer所占的百分比
- //sanity checks
- final float spillper =
- job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);
- //IO_SORT_MB = "mapreduce.task.io.sort.mb"
- // map 端buffer大小
- // mapreduce.task.io.sort.mb * mapreduce.map.sort.spill.percent 最好是16的整數(shù)倍
- final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);
- // 所有的spill index 在內(nèi)存所占的大小的閾值
- indexCacheMemoryLimit = job.getInt(JobContext.INDEX_CACHE_MEMORY_LIMIT,
- INDEX_CACHE_MEMORY_LIMIT_DEFAULT);
- ...
- // 排序的實(shí)現(xiàn)類,可以自己實(shí)現(xiàn)。 這里用的是改寫的快排
- sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
- QuickSort.class, IndexedSorter.class), job);
- // buffers and accounting
- // 上面IO_SORT_MB的單位是MB,左移20位將單位轉(zhuǎn)化為byte
- int maxMemUsage = sortmb << 20;
- // METASIZE是元數(shù)據(jù)的長度,元數(shù)據(jù)有4個(gè)int單元,分別為
- // VALSTART、KEYSTART、PARTITION、VALLEN,而int為4個(gè)byte,
- // 所以METASIZE長度為16。下面是計(jì)算buffer中最多有多少byte來存元數(shù)據(jù)
- maxMemUsage -= maxMemUsage % METASIZE;
- // 元數(shù)據(jù)數(shù)組 以byte為單位
- kvbuffer = new byte[maxMemUsage];
- bufvoid = kvbuffer.length;
- // 將kvbuffer轉(zhuǎn)化為int型的kvmeta 以int為單位,也就是4byte
- kvmeta = ByteBuffer.wrap(kvbuffer)
- .order(ByteOrder.nativeOrder())
- .asIntBuffer();
- // 設(shè)置buf和kvmeta的分界線
- setEquator(0);
- bufstart = bufend = bufindex = equator;
- kvstart = kvend = kvindex;
- // kvmeta中存放元數(shù)據(jù)實(shí)體的最大個(gè)數(shù)
- maxRec = kvmeta.capacity() / NMETA;
- // buffer spill時(shí)的閾值(不單單是sortmb*spillper)
- // 更加精確的是kvbuffer.length*spiller
- softLimit = (int)(kvbuffer.length * spillper);
- // 此變量較為重要,作為spill的動(dòng)態(tài)衡量標(biāo)準(zhǔn)
- bufferRemaining = softLimit;
- ...
- // k/v serialization
- comparator = job.getOutputKeyComparator();
- keyClass = (Class<K>)job.getMapOutputKeyClass();
- valClass = (Class<V>)job.getMapOutputValueClass();
- serializationFactory = new SerializationFactory(job);
- keySerializer = serializationFactory.getSerializer(keyClass);
- // 將bb作為key序列化寫入的output
- keySerializer.open(bb);
- valSerializer = serializationFactory.getSerializer(valClass);
- // 將bb作為value序列化寫入的output
- valSerializer.open(bb);
- ...
- // combiner
- ...
- spillInProgress = false;
- // 最后一次merge時(shí),在有combiner的情況下,超過此閾值才執(zhí)行combiner
- minSpillsForCombine = job.getInt(JobContext.MAP_COMBINE_MIN_SPILLS, 3);
- spillThread.setDaemon(true);
- spillThread.setName("SpillThread");
- spillLock.lock();
- try {
- spillThread.start();
- while (!spillThreadRunning) {
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw new IOException("Spill thread failed to initialize", e);
- } finally {
- spillLock.unlock();
- }
- if (sortSpillException != null) {
- throw new IOException("Spill thread failed to initialize",
- sortSpillException);
- }
- }
init是對(duì)環(huán)形緩沖區(qū)進(jìn)行初始化構(gòu)造,由mapreduce.task.io.sort.mb決定map中環(huán)形緩沖區(qū)的大小sortmb,默認(rèn)是100M。
此緩沖區(qū)也用于存放meta,一個(gè)meta占用METASIZE(16byte),則其中用于存放數(shù)據(jù)的大小是maxMemUsage -= sortmb << 20 % METASIZE(由此可知最好設(shè)置sortmb轉(zhuǎn)換為byte之后是16的整數(shù)倍),然后用maxMemUsage初始化kvbuffer字節(jié)數(shù)組和kvmeta整形數(shù)組,最后設(shè)置數(shù)組的一些標(biāo)識(shí)信息。利用setEquator(0)設(shè)置kvbuffer和kvmeta的分界線,初始化的時(shí)候以0為分界線,kvindex為aligned - METASIZE + kvbuffer.length,其位置在環(huán)形數(shù)組中相當(dāng)于按照逆時(shí)針方向減去METASIZE,由kvindex設(shè)置kvstart = kvend = kvindex,由equator設(shè)置bufstart = bufend = bufindex = equator,還得設(shè)置bufvoid = kvbuffer.length,bufvoid用于標(biāo)識(shí)用于存放數(shù)據(jù)的最大位置。
為了提高效率,當(dāng)buffer占用達(dá)到閾值之后,會(huì)進(jìn)行spill,這個(gè)閾值是由bufferRemaining進(jìn)行檢查的,bufferRemaining由softLimit = (int)(kvbuffer.length * spillper); bufferRemaining = softLimit;進(jìn)行初始化賦值,這里需要注意的是softLimit并不是sortmb*spillper,而是kvbuffer.length * spillper,當(dāng)sortmb << 20是16的整數(shù)倍時(shí),才可以認(rèn)為softLimit是sortmb*spillper。
下面是setEquator的代碼
- // setEquator(0)的代碼如下
- private void setEquator(int pos) {
- equator = pos;
- // set index prior to first entry, aligned at meta boundary
- // 第一個(gè) entry的末尾位置,即元數(shù)據(jù)和kv數(shù)據(jù)的分界線 單位是byte
- final int aligned = pos - (pos % METASIZE);
- // Cast one of the operands to long to avoid integer overflow
- // 元數(shù)據(jù)中存放數(shù)據(jù)的起始位置
- kvindex = (int)
- (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
- LOG.info("(EQUATOR) " + pos + " kvi " + kvindex +
- "(" + (kvindex * 4) + ")");
- }
buffer初始化之后的抽象數(shù)據(jù)結(jié)構(gòu)如下圖所示:
環(huán)形緩沖區(qū)數(shù)據(jù)結(jié)構(gòu)圖
寫入buffer
Map通過NewOutputCollector.write方法調(diào)用collector.collect向buffer中寫入數(shù)據(jù),數(shù)據(jù)寫入之前已在NewOutputCollector.write中對(duì)要寫入的數(shù)據(jù)進(jìn)行逐條分區(qū),下面看下collect
- // MapOutputBuffer.collect
- public synchronized void collect(K key, V value, final int partition
- ) throws IOException {
- ...
- // 新數(shù)據(jù)collect時(shí),先將剩余的空間減去元數(shù)據(jù)的長度,之后進(jìn)行判斷
- bufferRemaining -= METASIZE;
- if (bufferRemaining <= 0) {
- // start spill if the thread is not running and the soft limit has been
- // reached
- spillLock.lock();
- try {
- do {
- // 首次spill時(shí),spillInProgress是false
- if (!spillInProgress) {
- // 得到kvindex的byte位置
- final int kvbidx = 4 * kvindex;
- // 得到kvend的byte位置
- final int kvbend = 4 * kvend;
- // serialized, unspilled bytes always lie between kvindex and
- // bufindex, crossing the equator. Note that any void space
- // created by a reset must be included in "used" bytes
- final int bUsed = distanceTo(kvbidx, bufindex);
- final boolean bufsoftlimit = bUsed >= softLimit;
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished, reclaim space
- resetSpill();
- bufferRemaining = Math.min(
- distanceTo(bufindex, kvbidx) - 2 * METASIZE,
- softLimit - bUsed) - METASIZE;
- continue;
- } else if (bufsoftlimit && kvindex != kvend) {
- // spill records, if any collected; check latter, as it may
- // be possible for metadata alignment to hit spill pcnt
- startSpill();
- final int avgRec = (int)
- (mapOutputByteCounter.getCounter() /
- mapOutputRecordCounter.getCounter());
- // leave at least half the split buffer for serialization data
- // ensure that kvindex >= bufindex
- final int distkvi = distanceTo(bufindex, kvbidx);
- final int newPos = (bufindex +
- Math.max(2 * METASIZE - 1,
- Math.min(distkvi / 2,
- distkvi / (METASIZE + avgRec) * METASIZE)))
- % kvbuffer.length;
- setEquator(newPos);
- bufmark = bufindex = newPos;
- final int serBound = 4 * kvend;
- // bytes remaining before the lock must be held and limits
- // checked is the minimum of three arcs: the metadata space, the
- // serialization space, and the soft limit
- bufferRemaining = Math.min(
- // metadata max
- distanceTo(bufend, newPos),
- Math.min(
- // serialization max
- distanceTo(newPos, serBound),
- // soft limit
- softLimit)) - 2 * METASIZE;
- }
- }
- } while (false);
- } finally {
- spillLock.unlock();
- }
- }
- // 將key value 及元數(shù)據(jù)信息寫入緩沖區(qū)
- try {
- // serialize key bytes into buffer
- int keystart = bufindex;
- // 將key序列化寫入kvbuffer中,并移動(dòng)bufindex
- keySerializer.serialize(key);
- // key所占空間被bufvoid分隔,則移動(dòng)key,
- // 將其值放在連續(xù)的空間中便于sort時(shí)key的對(duì)比
- if (bufindex < keystart) {
- // wrapped the key; must make contiguous
- bb.shiftBufferedKey();
- keystart = 0;
- }
- // serialize value bytes into buffer
- final int valstart = bufindex;
- valSerializer.serialize(value);
- // It's possible for records to have zero length, i.e. the serializer
- // will perform no writes. To ensure that the boundary conditions are
- // checked and that the kvindex invariant is maintained, perform a
- // zero-length write into the buffer. The logic monitoring this could be
- // moved into collect, but this is cleaner and inexpensive. For now, it
- // is acceptable.
- bb.write(b0, 0, 0);
- // the record must be marked after the preceding write, as the metadata
- // for this record are not yet written
- int valend = bb.markRecord();
- mapOutputRecordCounter.increment(1);
- mapOutputByteCounter.increment(
- distanceTo(keystart, valend, bufvoid));
- // write accounting info
- kvmeta.put(kvindex + PARTITION, partition);
- kvmeta.put(kvindex + KEYSTART, keystart);
- kvmeta.put(kvindex + VALSTART, valstart);
- kvmeta.put(kvindex + VALLEN, distanceTo(valstart, valend));
- // advance kvindex
- kvindex = (kvindex - NMETA + kvmeta.capacity()) % kvmeta.capacity();
- } catch (MapBufferTooSmallException e) {
- LOG.info("Record too large for in-memory buffer: " + e.getMessage());
- spillSingleRecord(key, value, partition);
- mapOutputRecordCounter.increment(1);
- return;
- }
- }
每次寫入數(shù)據(jù)時(shí),執(zhí)行bufferRemaining -= METASIZE之后,檢查bufferRemaining,
如果大于0,直接將key/value序列化對(duì)和對(duì)應(yīng)的meta寫入buffer中,key/value是序列化之后寫入的,key/value經(jīng)過一些列的方法調(diào)用Serializer.serialize(key/value) -> WritableSerializer.serialize(key/value) -> BytesWritable.write(dataOut) -> DataOutputStream.write(bytes, 0, size) -> MapOutputBuffer.Buffer.write(b, off, len),最后由MapOutputBuffer.Buffer.write(b, off, len)將數(shù)據(jù)寫入kvbuffer中,write方法如下:
- public void write(byte b[], int off, int len)
- throws IOException {
- // must always verify the invariant that at least METASIZE bytes are
- // available beyond kvindex, even when len == 0
- bufferRemaining -= len;
- if (bufferRemaining <= 0) {
- // writing these bytes could exhaust available buffer space or fill
- // the buffer to soft limit. check if spill or blocking are necessary
- boolean blockwrite = false;
- spillLock.lock();
- try {
- do {
- checkSpillException();
- final int kvbidx = 4 * kvindex;
- final int kvbend = 4 * kvend;
- // ser distance to key index
- final int distkvi = distanceTo(bufindex, kvbidx);
- // ser distance to spill end index
- final int distkve = distanceTo(bufindex, kvbend);
- // if kvindex is closer than kvend, then a spill is neither in
- // progress nor complete and reset since the lock was held. The
- // write should block only if there is insufficient space to
- // complete the current write, write the metadata for this record,
- // and write the metadata for the next record. If kvend is closer,
- // then the write should block if there is too little space for
- // either the metadata or the current write. Note that collect
- // ensures its metadata requirement with a zero-length write
- blockwrite = distkvi <= distkve
- ? distkvi <= len + 2 * METASIZE
- : distkve <= len || distanceTo(bufend, kvbidx) < 2 * METASIZE;
- if (!spillInProgress) {
- if (blockwrite) {
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished, reclaim space
- // need to use meta exclusively; zero-len rec & 100% spill
- // pcnt would fail
- resetSpill(); // resetSpill doesn't move bufindex, kvindex
- bufferRemaining = Math.min(
- distkvi - 2 * METASIZE,
- softLimit - distanceTo(kvbidx, bufindex)) - len;
- continue;
- }
- // we have records we can spill; only spill if blocked
- if (kvindex != kvend) {
- startSpill();
- // Blocked on this write, waiting for the spill just
- // initiated to finish. Instead of repositioning the marker
- // and copying the partial record, we set the record start
- // to be the new equator
- setEquator(bufmark);
- } else {
- // We have no buffered records, and this record is too large
- // to write into kvbuffer. We must spill it directly from
- // collect
- final int size = distanceTo(bufstart, bufindex) + len;
- setEquator(0);
- bufstart = bufend = bufindex = equator;
- kvstart = kvend = kvindex;
- bufvoid = kvbuffer.length;
- throw new MapBufferTooSmallException(size + " bytes");
- }
- }
- }
- if (blockwrite) {
- // wait for spill
- try {
- while (spillInProgress) {
- reporter.progress();
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw new IOException(
- "Buffer interrupted while waiting for the writer", e);
- }
- }
- } while (blockwrite);
- } finally {
- spillLock.unlock();
- }
- }
- // here, we know that we have sufficient space to write
- if (bufindex + len > bufvoid) {
- final int gaplen = bufvoid - bufindex;
- System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
- len -= gaplen;
- off += gaplen;
- bufindex = 0;
- }
- System.arraycopy(b, off, kvbuffer, bufindex, len);
- bufindex += len;
- }
write方法將key/value寫入kvbuffer中,如果bufindex+len超過了bufvoid,則將寫入的內(nèi)容分開存儲(chǔ),將一部分寫入bufindex和bufvoid之間,然后重置bufindex,將剩余的部分寫入,這里不區(qū)分key和value,寫入key之后會(huì)在collect中判斷bufindex < keystart,當(dāng)bufindex小時(shí),則key被分開存儲(chǔ),執(zhí)行bb.shiftBufferedKey(),value則直接寫入,不用判斷是否被分開存儲(chǔ),key不能分開存儲(chǔ)是因?yàn)橐獙?duì)key進(jìn)行排序。
這里需要注意的是要寫入的數(shù)據(jù)太長,并且kvinde==kvend,則拋出MapBufferTooSmallException異常,在collect中捕獲,將此數(shù)據(jù)直接spill到磁盤spillSingleRecord,也就是當(dāng)單條記錄過長時(shí),不寫buffer,直接寫入磁盤。
下面看下bb.shiftBufferedKey()代碼
- // BlockingBuffer.shiftBufferedKey
- protected void shiftBufferedKey() throws IOException {
- // spillLock unnecessary; both kvend and kvindex are current
- int headbytelen = bufvoid - bufmark;
- bufvoid = bufmark;
- final int kvbidx = 4 * kvindex;
- final int kvbend = 4 * kvend;
- final int avail =
- Math.min(distanceTo(0, kvbidx), distanceTo(0, kvbend));
- if (bufindex + headbytelen < avail) {
- System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
- System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
- bufindex += headbytelen;
- bufferRemaining -= kvbuffer.length - bufvoid;
- } else {
- byte[] keytmp = new byte[bufindex];
- System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
- bufindex = 0;
- out.write(kvbuffer, bufmark, headbytelen);
- out.write(keytmp);
- }
- }
shiftBufferedKey時(shí),判斷首部是否有足夠的空間存放key,有沒有足夠的空間,則先將首部的部分key寫入keytmp中,然后分兩次寫入,再次調(diào)用Buffer.write,如果有足夠的空間,分兩次copy,先將首部的部分key復(fù)制到headbytelen的位置,然后將末尾的部分key復(fù)制到首部,移動(dòng)bufindex,重置bufferRemaining的值。
key/value寫入之后,繼續(xù)寫入元數(shù)據(jù)信息并重置kvindex的值。
spill
一次寫入buffer結(jié)束,當(dāng)寫入數(shù)據(jù)比較多,bufferRemaining小于等于0時(shí),準(zhǔn)備進(jìn)行spill,首次spill,spillInProgress為false,此時(shí)查看bUsed = distanceTo(kvbidx, bufindex),此時(shí)bUsed >= softLimit 并且 (kvbend + METASIZE) % kvbuffer.length == equator - (equator % METASIZE),則進(jìn)行spill,調(diào)用startSpill
- private void startSpill() {
- // 元數(shù)據(jù)的邊界賦值
- kvend = (kvindex + NMETA) % kvmeta.capacity();
- // key/value的邊界賦值
- bufend = bufmark;
- // 設(shè)置spill運(yùn)行標(biāo)識(shí)
- spillInProgress = true;
- ...
- // 利用重入鎖,對(duì)spill線程進(jìn)行喚醒
- spillReady.signal();
- }
startSpill喚醒spill線程之后,進(jìn)程spill操作,但此時(shí)map向buffer的寫入操作并沒有阻塞,需要重新邊界equator和bufferRemaining的值,先來看下equator和bufferRemaining值的設(shè)定:
- // 根據(jù)已經(jīng)寫入的kv得出每個(gè)record的平均長度
- final int avgRec = (int) (mapOutputByteCounter.getCounter() /
- mapOutputRecordCounter.getCounter());
- // leave at least half the split buffer for serialization data
- // ensure that kvindex >= bufindex
- // 得到空余空間的大小
- final int distkvi = distanceTo(bufindex, kvbidx);
- // 得出新equator的位置
- final int newPos = (bufindex +
- Math.max(2 * METASIZE - 1,
- Math.min(distkvi / 2,
- distkvi / (METASIZE + avgRec) * METASIZE)))
- % kvbuffer.length;
- setEquator(newPos);
- bufmark = bufindex = newPos;
- final int serBound = 4 * kvend;
- // bytes remaining before the lock must be held and limits
- // checked is the minimum of three arcs: the metadata space, the
- // serialization space, and the soft limit
- bufferRemaining = Math.min(
- // metadata max
- distanceTo(bufend, newPos),
- Math.min(
- // serialization max
- distanceTo(newPos, serBound),
- // soft limit
- softLimit)) - 2 * METASIZE;
因?yàn)閑quator是kvbuffer和kvmeta的分界線,為了更多的空間存儲(chǔ)kv,則最多拿出distkvi的一半來存儲(chǔ)meta,并且利用avgRec估算distkvi能存放多少個(gè)record和meta對(duì),根據(jù)record和meta對(duì)的個(gè)數(shù)估算meta所占空間的大小,從distkvi/2和meta所占空間的大小中取最小值,又因?yàn)閐istkvi中最少得存放一個(gè)meta,所占空間為METASIZE,在選取kvindex時(shí)需要求aligned,aligned最多為METASIZE-1,總和上述因素,最終選取equator為(bufindex + Math.max(2 * METASIZE - 1, Math.min(distkvi / 2, distkvi / (METASIZE + avgRec) * METASIZE)))。equator選取之后,設(shè)置bufmark = bufindex = newPos和kvindex,但此時(shí)并不設(shè)置bufstart、bufend和kvstart、kvend,因?yàn)檫@幾個(gè)值要用來表示spill數(shù)據(jù)的邊界。
spill之后,可用的空間減少了,則控制spill的bufferRemaining也應(yīng)該重新設(shè)置,bufferRemaining取三個(gè)值的最小值減去2*METASIZE,三個(gè)值分別是meta可用占用的空間distanceTo(bufend, newPos),kv可用空間distanceTo(newPos, serBound)和softLimit。這里為什么要減去2*METASIZE,一個(gè)是spill之前kvend到kvindex的距離,另一個(gè)是當(dāng)時(shí)的kvindex空間????此時(shí),已有一個(gè)record要寫入buffer,需要從bufferRemaining中減去當(dāng)前record的元數(shù)據(jù)占用的空間,即減去METASIZE,另一個(gè)METASIZE是在計(jì)算equator時(shí),沒有包括kvindex到kvend(spill之前)的這段METASIZE,所以要減去這個(gè)METASIZE。
接下來解析下SpillThread線程,查看其run方法:
- public void run() {
- spillLock.lock();
- spillThreadRunning = true;
- try {
- while (true) {
- spillDone.signal();
- // 判斷是否在spill,false則掛起SpillThread線程,等待喚醒
- while (!spillInProgress) {
- spillReady.await();
- }
- try {
- spillLock.unlock();
- // 喚醒之后,進(jìn)行排序和溢寫到磁盤
- sortAndSpill();
- } catch (Throwable t) {
- sortSpillException = t;
- } finally {
- spillLock.lock();
- if (bufend < bufstart) {
- bufvoid = kvbuffer.length;
- }
- kvstart = kvend;
- bufstart = bufend;
- spillInProgress = false;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- spillLock.unlock();
- spillThreadRunning = false;
- }
- }
run中主要是sortAndSpill,
- private void sortAndSpill() throws IOException, ClassNotFoundException,
- InterruptedException {
- //approximate the length of the output file to be the length of the
- //buffer + header lengths for the partitions
- final long size = distanceTo(bufstart, bufend, bufvoid) +
- partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // create spill file
- // 用來存儲(chǔ)index文件
- final SpillRecord spillRec = new SpillRecord(partitions);
- // 創(chuàng)建寫入磁盤的spill文件
- final Path filename =
- mapOutputFile.getSpillFileForWrite(numSpills, size);
- // 打開文件流
- out = rfs.create(filename);
- // kvend/4 是截止到當(dāng)前位置能存放多少個(gè)元數(shù)據(jù)實(shí)體
- final int mstart = kvend / NMETA;
- // kvstart 處能存放多少個(gè)元數(shù)據(jù)實(shí)體
- // 元數(shù)據(jù)則在mstart和mend之間,(mstart - mend)則是元數(shù)據(jù)的個(gè)數(shù)
- final int mend = 1 + // kvend is a valid record
- (kvstart >= kvend
- ? kvstart
- : kvmeta.capacity() + kvstart) / NMETA;
- // 排序 只對(duì)元數(shù)據(jù)進(jìn)行排序,只調(diào)整元數(shù)據(jù)在kvmeta中的順序
- // 排序規(guī)則是MapOutputBuffer.compare,
- // 先對(duì)partition進(jìn)行排序其次對(duì)key值排序
- sorter.sort(MapOutputBuffer.this, mstart, mend, reporter);
- int spindex = mstart;
- // 創(chuàng)建rec,用于存放該分區(qū)在數(shù)據(jù)文件中的信息
- final IndexRecord rec = new IndexRecord();
- final InMemValBytes value = new InMemValBytes();
- for (int i = 0; i < partitions; ++i) {
- // 臨時(shí)文件是IFile格式的
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- FSDataOutputStream partitionOut = CryptoUtils.wrapIfNecessary(job, out);
- writer = new Writer<K, V>(job, partitionOut, keyClass, valClass, codec,
- spilledRecordsCounter);
- // 往磁盤寫數(shù)據(jù)時(shí)先判斷是否有combiner
- if (combinerRunner == null) {
- // spill directly
- DataInputBuffer key = new DataInputBuffer();
- // 寫入相同partition的數(shù)據(jù)
- while (spindex < mend &&
- kvmeta.get(offsetFor(spindex % maxRec) + PARTITION) == i) {
- final int kvoff = offsetFor(spindex % maxRec);
- int keystart = kvmeta.get(kvoff + KEYSTART);
- int valstart = kvmeta.get(kvoff + VALSTART);
- key.reset(kvbuffer, keystart, valstart - keystart);
- getVBytesForOffset(kvoff, value);
- writer.append(key, value);
- ++spindex;
- }
- } else {
- int spstart = spindex;
- while (spindex < mend &&
- kvmeta.get(offsetFor(spindex % maxRec)
- + PARTITION) == i) {
- ++spindex;
- }
- // Note: we would like to avoid the combiner if we've fewer
- // than some threshold of records for a partition
- if (spstart != spindex) {
- combineCollector.setWriter(writer);
- RawKeyValueIterator kvIter =
- new MRResultIterator(spstart, spindex);
- combinerRunner.combine(kvIter, combineCollector);
- }
- }
- // close the writer
- writer.close();
- // record offsets
- // 記錄當(dāng)前partition i的信息寫入索文件rec中
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength() + CryptoUtils.cryptoPadding(job);
- rec.partLength = writer.getCompressedLength() + CryptoUtils.cryptoPadding(job);
- // spillRec中存放了spill中partition的信息,便于后續(xù)堆排序時(shí),取出partition相關(guān)的數(shù)據(jù)進(jìn)行排序
- spillRec.putIndex(rec, i);
- writer = null;
- } finally {
- if (null != writer) writer.close();
- }
- }
- // 判斷內(nèi)存中的index文件是否超出閾值,超出則將index文件寫入磁盤
- // 當(dāng)超出閾值時(shí)只是把當(dāng)前index和之后的index寫入磁盤
- if (totalIndexCacheMemory >= indexCacheMemoryLimit) {
- // create spill index file
- // 創(chuàng)建index文件
- Path indexFilename =
- mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
- * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info("Finished spill " + numSpills);
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
ortAndSpill中,有mstart和mend得到一共有多少條record需要spill到磁盤,調(diào)用sorter.sort對(duì)meta進(jìn)行排序,先對(duì)partition進(jìn)行排序,然后按key排序,排序的結(jié)果只調(diào)整meta的順序。
排序之后,判斷是否有combiner,沒有則直接將record寫入磁盤,寫入時(shí)是一個(gè)partition一個(gè)IndexRecord,如果有combiner,則將該partition的record寫入kvIter,然后調(diào)用combinerRunner.combine執(zhí)行combiner。
寫入磁盤之后,將spillx.out對(duì)應(yīng)的spillRec放入內(nèi)存indexCacheList.add(spillRec),如果所占內(nèi)存totalIndexCacheMemory超過了indexCacheMemoryLimit,則創(chuàng)建index文件,將此次及以后的spillRec寫入index文件存入磁盤。
最后spill次數(shù)遞增。sortAndSpill結(jié)束之后,回到run方法中,執(zhí)行finally中的代碼,對(duì)kvstart和bufstart賦值,kvstart = kvend,bufstart = bufend,設(shè)置spillInProgress的狀態(tài)為false。
在spill的同時(shí),map往buffer的寫操作并沒有停止,依然在調(diào)用collect,再次回到collect方法中,
- // MapOutputBuffer.collect
- public synchronized void collect(K key, V value, final int partition
- ) throws IOException {
- ...
- // 新數(shù)據(jù)collect時(shí),先將剩余的空間減去元數(shù)據(jù)的長度,之后進(jìn)行判斷
- bufferRemaining -= METASIZE;
- if (bufferRemaining <= 0) {
- // start spill if the thread is not running and the soft limit has been
- // reached
- spillLock.lock();
- try {
- do {
- // 首次spill時(shí),spillInProgress是false
- if (!spillInProgress) {
- // 得到kvindex的byte位置
- final int kvbidx = 4 * kvindex;
- // 得到kvend的byte位置
- final int kvbend = 4 * kvend;
- // serialized, unspilled bytes always lie between kvindex and
- // bufindex, crossing the equator. Note that any void space
- // created by a reset must be included in "used" bytes
- final int bUsed = distanceTo(kvbidx, bufindex);
- final boolean bufsoftlimit = bUsed >= softLimit;
- if ((kvbend + METASIZE) % kvbuffer.length !=
- equator - (equator % METASIZE)) {
- // spill finished, reclaim space
- resetSpill();
- bufferRemaining = Math.min(
- distanceTo(bufindex, kvbidx) - 2 * METASIZE,
- softLimit - bUsed) - METASIZE;
- continue;
- } else if (bufsoftlimit && kvindex != kvend) {
- ...
- }
- }
- } while (false);
- } finally {
- spillLock.unlock();
- }
- }
- ...
- }
有新的record需要寫入buffer時(shí),判斷bufferRemaining -= METASIZE,此時(shí)的bufferRemaining是在開始spill時(shí)被重置過的(此時(shí)的bufferRemaining應(yīng)該比初始的softLimit要小),當(dāng)bufferRemaining小于等于0時(shí),進(jìn)入if,此時(shí)spillInProgress的狀態(tài)為false,進(jìn)入if (!spillInProgress),startSpill時(shí)對(duì)kvend和bufend進(jìn)行了重置,則此時(shí)(kvbend + METASIZE) % kvbuffer.length != equator - (equator % METASIZE),調(diào)用resetSpill(),將kvstart、kvend和bufstart、bufend設(shè)置為上次startSpill時(shí)的位置。此時(shí)buffer已將一部分內(nèi)容寫入磁盤,有大量空余的空間,則對(duì)bufferRemaining進(jìn)行重置,此次不spill。
bufferRemaining取值為Math.min(distanceTo(bufindex, kvbidx) - 2 * METASIZE, softLimit - bUsed) - METASIZE
最后一個(gè)METASIZE是當(dāng)前record進(jìn)入collect之后bufferRemaining減去的那個(gè)METASIZE,為什么要減去2*METASIZE,不知道。。。。。
- private void resetSpill() {
- final int e = equator;
- bufstart = bufend = e;
- final int aligned = e - (e % METASIZE);
- // set start/end to point to first meta record
- // Cast one of the operands to long to avoid integer overflow
- kvstart = kvend = (int)
- (((long)aligned - METASIZE + kvbuffer.length) % kvbuffer.length) / 4;
- LOG.info("(RESET) equator " + e + " kv " + kvstart + "(" +
- (kvstart * 4) + ")" + " kvi " + kvindex + "(" + (kvindex * 4) + ")");
- }
當(dāng)bufferRemaining再次小于等于0時(shí),進(jìn)行spill,這以后就都是套路了。環(huán)形緩沖區(qū)分析到此結(jié)束。
【本文為51CTO專欄作者“王森豐”的原創(chuàng)稿件,轉(zhuǎn)載請(qǐng)注明出處】