自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark刷爆磁盤與Java弱引用的關(guān)系

存儲(chǔ) 存儲(chǔ)設(shè)備 Spark
今天,給大家分享下。Java的引用在大數(shù)據(jù)框架下的引用案例。

 [[351561]]

 

一 引用基本概念

 

如下面,定義兩個(gè)變量num,str,存儲(chǔ)模型大致如下圖:

  1. int num = 6; 
  2. String str = “浪尖聊大數(shù)據(jù)”; 

 

變量num值直接從6修改為了8;變量str只是修改了其保存的地址,從0x88修改為0x86,對(duì)象 “浪尖聊大數(shù)據(jù) ”本身還在內(nèi)存中,并沒(méi)有被修改。只是內(nèi)存中新增了對(duì)象 “浪尖是帥哥”。

二 值傳遞&引用傳遞

 

舉例說(shuō)明引用傳遞和值傳遞:

  1. 第一個(gè)栗子:基本類型 
  2. void foo(int value) { 
  3.     value = 88; 
  4. foo(num); // num 沒(méi)有被改變 
  5.  
  6. 第二個(gè)栗子:沒(méi)有提供改變自身方法的引用類型 
  7. void foo(String text) { 
  8.     text = "mac"
  9. foo(str); // str 也沒(méi)有被改變 
  10.  
  11. 第三個(gè)栗子:提供了改變自身方法的引用類型 
  12. StringBuilder sb = new StringBuilder("vivo"); 
  13. void foo(StringBuilder builder) { 
  14.     builder.append("5"); 
  15. foo(sb); // sb 被改變了,變成了"vivo5"。 
  16.  
  17. 第四個(gè)栗子:提供了改變自身方法的引用類型,但是不使用,而是使用賦值運(yùn)算符。 
  18. StringBuilder sb = new StringBuilder("oppo"); 
  19. void foo(StringBuilder builder) { 
  20.     builder = new StringBuilder("vivo"); 
  21. foo(sb); // sb 沒(méi)有被改變,還是 "oppo"。 

三 引用的類型

  1. 單純的申明一個(gè)軟引用,指向一個(gè)person對(duì)象 
  2. 1 SoftReference pSoftReference=new SoftReference(new Person(“張三”,12)); 
  3.  
  4. 聲明一個(gè)引用隊(duì)列 
  5. ReferenceQueue<Person> queue = new ReferenceQueue<>(); 
  6.  
  7. 聲明一個(gè)person對(duì)象,李四,obj是其強(qiáng)引用 
  8. Person obj = new Person(“李四”,13); 
  9.  
  10. 使軟引用softRef指向李四對(duì)應(yīng)的對(duì)象,并且將該軟引用關(guān)聯(lián)到引用隊(duì)列 
  11. 2 SoftReference softRef = new SoftReference<Object>(obj,queue); 
  12.  
  13. 聲明一個(gè)person對(duì)象,名叫王酒,并保證其僅含軟引用,且將軟引用關(guān)聯(lián)到引用隊(duì)列queue 
  14. 3 SoftReference softRef = new SoftReference<Object>(new Person(“王酒”,15),queue); 
  15.  
  16. 使用很簡(jiǎn)單softRef.get即可獲取對(duì)應(yīng)的value。 

  1. WeakReference<Person> weakReference = new WeakReference<>(new Person(“浪尖”,18)); 
  2.  
  3. 聲明一個(gè)引用隊(duì)列 
  4. ReferenceQueue<Person> queue = new ReferenceQueue<>(); 
  5.  
  6. 聲明一個(gè)person對(duì)象,李四,obj是其強(qiáng)引用 
  7. Person obj = new Person(“李四”,13); 
  8.  
  9. 聲明一個(gè)弱引用,指向強(qiáng)引用obj所指向的對(duì)象,同時(shí)該引用綁定到引用隊(duì)列queue。 
  10. WeakReference weakRef = new WeakReference<Object>(obj,queue); 
  11.  
  12. 使用弱引用也很簡(jiǎn)單,weakRef.get 

  1. 聲明引用隊(duì)列 
  2. ReferenceQueue queue = new ReferenceQueue(); 
  3.  
  4. 聲明一個(gè)虛引用 
  5. PhantomReference<Person> reference = new PhantomReference<Person>(new Person(“浪尖”,18), queue); 
  6.  
  7. 獲取虛引用的值,直接為null,因?yàn)闊o(wú)法通過(guò)虛引用獲取引用對(duì)象。 
  8. System.out.println(reference.get()); 

 

 

 


 

 

四 Threadlocal如何使用弱引用


 

 

五 spark如何使用弱引用進(jìn)行數(shù)據(jù)清理

 

 


 

 

shuffle相關(guān)的引用,實(shí)際上是在ShuffleDependency內(nèi)部實(shí)現(xiàn)了,shuffle狀態(tài)注冊(cè)到ContextCleaner過(guò)程:

  1. _rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this)) 

然后,我們翻開(kāi)registerShuffleForCleanup函數(shù)源碼可以看到,注釋的大致意思是注冊(cè)ShuffleDependency目的是在垃圾回收的時(shí)候清除掉它對(duì)應(yīng)的數(shù)據(jù):

  1. /** Register a ShuffleDependency for cleanup when it is garbage collected. */ 
  2.   def registerShuffleForCleanup(shuffleDependency: ShuffleDependency[_, _, _]): Unit = { 
  3.     registerForCleanup(shuffleDependency, CleanShuffle(shuffleDependency.shuffleId)) 
  4.   } 

其中,registerForCleanup函數(shù)如下:

  1. /** Register an object for cleanup. */ 
  2.   private def registerForCleanup(objectForCleanup: AnyRef, task: CleanupTask): Unit = { 
  3.     referenceBuffer.add(new CleanupTaskWeakReference(task, objectForCleanup, referenceQueue)) 
  4.   } 

referenceBuffer主要作用保存CleanupTaskWeakReference弱引用,確保在引用隊(duì)列沒(méi)處理前,弱引用不會(huì)被垃圾回收。

  1. /** 
  2.    * A buffer to ensure that `CleanupTaskWeakReference`s are not garbage collected as long as they 
  3.    * have not been handled by the reference queue. 
  4.    */ 
  5.   private val referenceBuffer = 
  6.     Collections.newSetFromMap[CleanupTaskWeakReference](new ConcurrentHashMap) 

ContextCleaner內(nèi)部有一個(gè)線程,循環(huán)從引用隊(duì)列里取被垃圾回收的RDD等相關(guān)弱引用,然后完成對(duì)應(yīng)的數(shù)據(jù)清除工作。

  1. private val cleaningThread = new Thread() { override def run(): Unit = keepCleaning() } 

其中,keepCleaning函數(shù),如下:

  1. /** Keep cleaning RDD, shuffle, and broadcast state. */ 
  2.   private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) { 
  3.     while (!stopped) { 
  4.       try { 
  5.         val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT)) 
  6.           .map(_.asInstanceOf[CleanupTaskWeakReference]) 
  7.         // Synchronize here to avoid being interrupted on stop() 
  8.         synchronized { 
  9.           reference.foreach { ref => 
  10.             logDebug("Got cleaning task " + ref.task) 
  11.             referenceBuffer.remove(ref) 
  12.             ref.task match { 
  13.               case CleanRDD(rddId) => 
  14.                 doCleanupRDD(rddId, blocking = blockOnCleanupTasks) 
  15.               case CleanShuffle(shuffleId) => 
  16.                 doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks) 
  17.               case CleanBroadcast(broadcastId) => 
  18.                 doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks) 
  19.               case CleanAccum(accId) => 
  20.                 doCleanupAccum(accId, blocking = blockOnCleanupTasks) 
  21.               case CleanCheckpoint(rddId) => 
  22.                 doCleanCheckpoint(rddId) 
  23.             } 
  24.           } 
  25.         } 
  26.       } catch { 
  27.         case ie: InterruptedException if stopped => // ignore 
  28.         case e: Exception => logError("Error in cleaning thread", e) 
  29.       } 
  30.     } 
  31.   } 

shuffle數(shù)據(jù)清除的函數(shù)是doCleanupShuffle,具體內(nèi)容如下:

  1. /** Perform shuffle cleanup. */ 
  2.   def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = { 
  3.     try { 
  4.       logDebug("Cleaning shuffle " + shuffleId) 
  5.       mapOutputTrackerMaster.unregisterShuffle(shuffleId) 
  6.       shuffleDriverComponents.removeShuffle(shuffleId, blocking) 
  7.       listeners.asScala.foreach(_.shuffleCleaned(shuffleId)) 
  8.       logDebug("Cleaned shuffle " + shuffleId) 
  9.     } catch { 
  10.       case e: Exception => logError("Error cleaning shuffle " + shuffleId, e) 
  11.     } 
  12.   } 

細(xì)節(jié)就不細(xì)展開(kāi)了。

 

ContextCleaner的start函數(shù)被調(diào)用后,實(shí)際上啟動(dòng)了一個(gè)調(diào)度線程,每隔30min主動(dòng)調(diào)用了一次System.gc(),來(lái)觸發(fā)垃圾回收。

  1. /** Start the cleaner. */ 
  2.   def start(): Unit = { 
  3.     cleaningThread.setDaemon(true
  4.     cleaningThread.setName("Spark Context Cleaner"
  5.     cleaningThread.start() 
  6.     periodicGCService.scheduleAtFixedRate(() => System.gc(), 
  7.       periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) 
  8.   } 

具體參數(shù)是:

  1. spark.cleaner.periodicGC.interval 

本文轉(zhuǎn)載自微信公眾號(hào)「浪尖聊大數(shù)據(jù)」,可以通過(guò)以下二維碼關(guān)注。轉(zhuǎn)載本文請(qǐng)聯(lián)系浪尖聊大數(shù)據(jù)公眾號(hào)。

 

 

責(zé)任編輯:武曉燕 來(lái)源: 浪尖聊大數(shù)據(jù)
相關(guān)推薦

2013-08-19 17:14:04

.Net強(qiáng)引用弱引用

2020-12-02 09:01:40

Java基礎(chǔ)

2015-11-02 17:20:00

Java弱引用

2009-06-16 11:26:22

弱引用內(nèi)存泄露

2021-01-07 14:20:55

JavaGC

2024-05-20 08:58:13

Java引用類型垃圾回收器

2020-11-12 07:49:18

MySQL

2021-12-09 15:45:09

Python弱引用代碼

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2009-06-19 16:19:23

Java對(duì)象引用

2021-10-08 21:00:52

數(shù)據(jù)弱引用對(duì)象

2013-06-09 13:24:00

程序員Bug

2022-01-02 06:55:08

Node.js ObjectWrapAddon

2021-11-25 07:42:11

命令Linux系統(tǒng)

2013-09-16 16:48:50

Android優(yōu)化軟引用

2017-12-15 16:03:27

2018-11-16 16:10:28

JavaOOM編程語(yǔ)言

2021-10-18 15:50:49

Android強(qiáng)引用軟引用

2020-12-28 11:13:24

比特幣數(shù)據(jù)匿名幣

2021-02-21 00:22:32

技術(shù)團(tuán)隊(duì)工具
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)