Spark刷爆磁盤與Java弱引用的關(guān)系
一 引用基本概念
如下面,定義兩個(gè)變量num,str,存儲(chǔ)模型大致如下圖:
- int num = 6;
- String str = “浪尖聊大數(shù)據(jù)”;
變量num值直接從6修改為了8;變量str只是修改了其保存的地址,從0x88修改為0x86,對(duì)象 “浪尖聊大數(shù)據(jù) ”本身還在內(nèi)存中,并沒(méi)有被修改。只是內(nèi)存中新增了對(duì)象 “浪尖是帥哥”。
二 值傳遞&引用傳遞
舉例說(shuō)明引用傳遞和值傳遞:
- 第一個(gè)栗子:基本類型
- void foo(int value) {
- value = 88;
- }
- foo(num); // num 沒(méi)有被改變
- 第二個(gè)栗子:沒(méi)有提供改變自身方法的引用類型
- void foo(String text) {
- text = "mac";
- }
- foo(str); // str 也沒(méi)有被改變
- 第三個(gè)栗子:提供了改變自身方法的引用類型
- StringBuilder sb = new StringBuilder("vivo");
- void foo(StringBuilder builder) {
- builder.append("5");
- }
- foo(sb); // sb 被改變了,變成了"vivo5"。
- 第四個(gè)栗子:提供了改變自身方法的引用類型,但是不使用,而是使用賦值運(yùn)算符。
- StringBuilder sb = new StringBuilder("oppo");
- void foo(StringBuilder builder) {
- builder = new StringBuilder("vivo");
- }
- foo(sb); // sb 沒(méi)有被改變,還是 "oppo"。
三 引用的類型
- 單純的申明一個(gè)軟引用,指向一個(gè)person對(duì)象
- 1 SoftReference pSoftReference=new SoftReference(new Person(“張三”,12));
- 聲明一個(gè)引用隊(duì)列
- ReferenceQueue<Person> queue = new ReferenceQueue<>();
- 聲明一個(gè)person對(duì)象,李四,obj是其強(qiáng)引用
- Person obj = new Person(“李四”,13);
- 使軟引用softRef指向李四對(duì)應(yīng)的對(duì)象,并且將該軟引用關(guān)聯(lián)到引用隊(duì)列
- 2 SoftReference softRef = new SoftReference<Object>(obj,queue);
- 聲明一個(gè)person對(duì)象,名叫王酒,并保證其僅含軟引用,且將軟引用關(guān)聯(lián)到引用隊(duì)列queue
- 3 SoftReference softRef = new SoftReference<Object>(new Person(“王酒”,15),queue);
- 使用很簡(jiǎn)單softRef.get即可獲取對(duì)應(yīng)的value。
- WeakReference<Person> weakReference = new WeakReference<>(new Person(“浪尖”,18));
- 聲明一個(gè)引用隊(duì)列
- ReferenceQueue<Person> queue = new ReferenceQueue<>();
- 聲明一個(gè)person對(duì)象,李四,obj是其強(qiáng)引用
- Person obj = new Person(“李四”,13);
- 聲明一個(gè)弱引用,指向強(qiáng)引用obj所指向的對(duì)象,同時(shí)該引用綁定到引用隊(duì)列queue。
- WeakReference weakRef = new WeakReference<Object>(obj,queue);
- 使用弱引用也很簡(jiǎn)單,weakRef.get
- 聲明引用隊(duì)列
- ReferenceQueue queue = new ReferenceQueue();
- 聲明一個(gè)虛引用
- PhantomReference<Person> reference = new PhantomReference<Person>(new Person(“浪尖”,18), queue);
- 獲取虛引用的值,直接為null,因?yàn)闊o(wú)法通過(guò)虛引用獲取引用對(duì)象。
- System.out.println(reference.get());
四 Threadlocal如何使用弱引用
五 spark如何使用弱引用進(jìn)行數(shù)據(jù)清理
shuffle相關(guān)的引用,實(shí)際上是在ShuffleDependency內(nèi)部實(shí)現(xiàn)了,shuffle狀態(tài)注冊(cè)到ContextCleaner過(guò)程:
- _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
然后,我們翻開(kāi)registerShuffleForCleanup函數(shù)源碼可以看到,注釋的大致意思是注冊(cè)ShuffleDependency目的是在垃圾回收的時(shí)候清除掉它對(duì)應(yīng)的數(shù)據(jù):
- /** Register a ShuffleDependency for cleanup when it is garbage collected. */
- def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = {
- registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId))
- }
其中,registerForCleanup函數(shù)如下:
- /** Register an object for cleanup. */
- private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = {
- referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue))
- }
referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,確保在引用隊(duì)列沒(méi)處理前,弱引用不會(huì)被垃圾回收。
- /**
- * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they
- * have not been handled by the reference queue.
- */
- private val referenceBuffer =
- Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap)
ContextCleaner內(nèi)部有一個(gè)線程,循環(huán)從引用隊(duì)列里取被垃圾回收的RDD等相關(guān)弱引用,然后完成對(duì)應(yīng)的數(shù)據(jù)清除工作。
- private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() }
其中,keepCleaning函數(shù),如下:
- /** Keep cleaning RDD, shuffle, and broadcast state. */
- private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
- while (!stopped) {
- try {
- val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
- .map(_.asInstanceOf[CleanupTaskWeakReference])
- // Synchronize here to avoid being interrupted on stop()
- synchronized {
- reference.foreach { ref =>
- logDebug("Got cleaning task " + ref.task)
- referenceBuffer.remove(ref)
- ref.task match {
- case CleanRDD(rddId) =>
- doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
- case CleanShuffle(shuffleId) =>
- doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
- case CleanBroadcast(broadcastId) =>
- doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
- case CleanAccum(accId) =>
- doCleanupAccum(accId, blocking = blockOnCleanupTasks)
- case CleanCheckpoint(rddId) =>
- doCleanCheckpoint(rddId)
- }
- }
- }
- } catch {
- case ie: InterruptedException if stopped => // ignore
- case e: Exception => logError("Error in cleaning thread", e)
- }
- }
- }
shuffle數(shù)據(jù)清除的函數(shù)是doCleanupShuffle,具體內(nèi)容如下:
- /** Perform shuffle cleanup. */
- def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
- try {
- logDebug("Cleaning shuffle " + shuffleId)
- mapOutputTrackerMaster.unregisterShuffle(shuffleId)
- shuffleDriverComponents.removeShuffle(shuffleId, blocking)
- listeners.asScala.foreach(_.shuffleCleaned(shuffleId))
- logDebug("Cleaned shuffle " + shuffleId)
- } catch {
- case e: Exception => logError("Error cleaning shuffle " + shuffleId, e)
- }
- }
細(xì)節(jié)就不細(xì)展開(kāi)了。
ContextCleaner的start函數(shù)被調(diào)用后,實(shí)際上啟動(dòng)了一個(gè)調(diào)度線程,每隔30min主動(dòng)調(diào)用了一次System.gc(),來(lái)觸發(fā)垃圾回收。
- /** Start the cleaner. */
- def start(): Unit = {
- cleaningThread.setDaemon(true)
- cleaningThread.setName("Spark Context Cleaner")
- cleaningThread.start()
- periodicGCService.scheduleAtFixedRate(() => System.gc(),
- periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS)
- }
具體參數(shù)是:
- spark.cleaner.periodicGC.interval
本文轉(zhuǎn)載自微信公眾號(hào)「浪尖聊大數(shù)據(jù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系浪尖聊大數(shù)據(jù)公眾號(hào)。