Hadoop 2.0 Yarn代碼:心跳驅(qū)動服務(wù)分析
主要涉及的java文件
hadoop-yarn-server-resourcemanager下的包
org.apache.hadoop.yarn.server.resourcemanager
ResourceTrackerService.java
org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
FifoScheduler.java
org.apache.hadoop.yarn.server.resourcemanager.rmnode
RMNodeImpl.java
hadoop-yarn-server-nodemanager下的包
org.apache.hadoop.yarn.server.nodemanager
NodeStatusUpdaterImpl.java
2.代碼分析
各個服務(wù)代碼已經(jīng)啟動,NodeStatusUpdate啟動后開始驅(qū)動整個Hadoop運行
1).NodeStatusUpdaterImpl(NodeManager端):
NodeStatusUpdaterImpl一經(jīng)被啟動,start()函數(shù)被調(diào)用,進行Hadoop RPC服務(wù)端的初始化操作(調(diào)用getServer函數(shù)創(chuàng)建服務(wù)等等)。
start()函數(shù)主要依次調(diào)用registerWithRM()函數(shù)和startStatusUpdater()函數(shù)
registerWithRM()函數(shù)
設(shè)置必要配置信息,和安全認證操作
利用Hadoop RPC遠程調(diào)用RM端ResourcesTrackerService下的registerNodeManager()方法,詳細見后面ResourcesTrackerService下的registerNodeManager()代碼分析
startStatusUpdater()函數(shù)
創(chuàng)建一個線程,然后啟動,所有操作都在運行while的循環(huán)中
設(shè)置、獲取和輸出必要配置信息,其中比較重要的調(diào)用getNodeStatus()方法,獲取本地Container和本地Node的狀態(tài),以供后面的nodeHeartbeat()方法使用
通過Hadoop RPC遠程調(diào)用RM端ResourcesTrackerService下的nodeHeartbeat()函數(shù),用while循環(huán)以一定時間間隔向RM發(fā)送心跳信息,心跳操作見下面ResourcesTrackerService下nodeHeartbeat()函數(shù)
nodeHeartbeat()將返回給NM信息,根據(jù)返回的response,根據(jù)response返回的信息標記不需要的Container和Application發(fā)送相應(yīng)的FINISH_CONTAINERS和FINISH_APPS給ContainerManager,進行清理操作----詳細見后面的代碼分析
2).ResourceTrackerService(ResourcesManager端):
ResourceTrackerService開頭與NodeStatusUpdaterImpl相似,start()函數(shù)被調(diào)用,初始化Hadoop RPC服務(wù)端,等待遠程來調(diào)用ResourceTrackerService中的函數(shù)
接上面的NodeStatusUpdaterImpl中對registerNodeManager()和nodeHeartbeat()的Hadoop RPC調(diào)用,詳細調(diào)用細節(jié)見下文
以下分成主要從兩個函數(shù)registerNodeManager()和nodeHeartbeat()開始分析,所以分成兩部分---
第一部分:
1).接前文ResourceTrackerService下的registerNodeManager()函數(shù)
首先獲取本地的NodeID,還有相應(yīng)的主機名、端口、請求資源信息。
進行安全認證等輔助操作,檢查NodeID所標記的Node是否"有效".如果“無效”的話,立即返回
Node“有效”說明此Node可用,于是創(chuàng)建RMNode(new RMNodeImpl)來識別這個Node的狀態(tài)和監(jiān)測在這個Node上運行的Container和Application
判斷其是否為新RMNode,如果是則向其發(fā)送RMNodeEventType.STARTED
如果不是新的RMNode,則發(fā)送RMNodeEventType.RECONNECTED到RMNode,重新連接Node,見附加代碼分析。
最后返回給調(diào)用方操作結(jié)果。
2).RMNodeImpl:當接收RMNodeEventType.STARTED后(接1)),發(fā)生狀態(tài)轉(zhuǎn)移NodeState(NEW→RUNNING),Transition函數(shù)被調(diào)用
向調(diào)度器(FifoScheduler)發(fā)送NODE_ADDED。
判斷這個Node是否Inactive,如果在Inactive中則,則先將這個Node移除出Inactive,否則增加ActiveNodes的數(shù)目。
3).FifoScheduler:接受NODE_ADDED事件,調(diào)用addNode()函數(shù),向RM報告新添加的Node的狀態(tài)
addNode函數(shù)被調(diào)用,首先將接收到的RMNode的NodeID和其相應(yīng)新創(chuàng)建的SchedulerNode(包含對資源的各種操作)放在ConcurrentHashMap類型的node對象中。
再調(diào)用Resources下的addTo()函數(shù),累加Node的資源數(shù)量,來計算集群中擁有的資源數(shù)量
至此NM端的Node已經(jīng)添加到RM的管轄范圍下,NM成功注冊到RM
附加代碼分析
附加2).RMNodeImpl:當RMNode接收RMNodeEventType.RECONNECTED(接1)),則保持當前狀態(tài)不變(RUNNING或者UNHEALTHY),Transition函數(shù)被調(diào)用
首先向調(diào)度器(FifoScheduler)發(fā)送NODE_REMOVED事件,刪除當前Node
然后重新連接操作,如果新連接的Node與上一次斷開的Node為同一個,則直接向調(diào)度器發(fā)送NODE_ADDED事件,如果兩個Node不是同一個,則更新關(guān)于Node的參數(shù),再將新的Node加入ConcurrentHashMap類型的node對象中(與前面FifoScheduler中的是同一個)
最后向新的RMNode發(fā)送RMNodeEventType.STARTED
附加3).FifoScheduler:接到NODE_REMOVED事件,調(diào)用removeNode()函數(shù)
removeNode()函數(shù)中,先將此Node上的Container全部Kill掉,通過發(fā)送RMContainerEventType.KILL實現(xiàn),因為現(xiàn)在討論沒有Job運行,所以沒有Container可以Kill
從nodes中移出此Node,最后計算集群資源,將相應(yīng)Node的資源數(shù)量從集群資源總量扣除,完畢
第二部分
1).接前文ResourceTrackerService下的nodeHeartbeat()函數(shù),各個NM已經(jīng)注冊到RM上,則各個NM開始調(diào)用這個函數(shù)向RM發(fā)送“心跳”保持聯(lián)系,另外這里討論的是未提交Job的情況下
獲取所需操作的參數(shù)變量,例如NodeStatus、NodeId等
驗證發(fā)送這次“心跳的”NM是否已經(jīng)注冊到RM,若未注冊則返回給NM讓其進行重新(reboot)注冊到RM中(實際上就是讓NodeStatusUpdater跳過此次循環(huán))。
驗證這個NM是否“有效”(有效:主機隊列包含這個NM或者黑名單沒有這個NM),如“無效”,則發(fā)送RMNodeEventType.DECOMMISSION到NM相應(yīng)的RMNode中,并關(guān)閉(shutdown)這個NM---下接附加2)
驗證這次“心跳”是否與上一個“心跳”重復(fù)或者是不是新的“心跳”,這個通過心跳ID實現(xiàn),如果重復(fù)則輸出心跳重復(fù)信息,并且直接返回,如果不是新的心跳,則向RMNode發(fā)送RMNodeEventType.REBOOTING,然后返回reboot,讓NM“重啟”(和上面一樣NodeStatusUpdater跳過當此次循環(huán))---下接附加2)
設(shè)置新的“心跳”ID,獲取Container和Application的信息
向RMNode發(fā)送包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,然后返回相應(yīng)設(shè)置好的response給調(diào)用者。
2).RMNodeImpl:RMNode接收到包含STATUS_UPDATE和Container、Application等信息的RMNodeStatusEvent,RMNodeImpl狀態(tài)遷移NodeState(RUNNING→UNHEALTHY或RUNNING→RUNNING),Transition函數(shù)被調(diào)用
首先從RMNodeStatusEvent獲得必要的變量,然后判斷相應(yīng)的Node的“健康”情況,如果出現(xiàn)問題,則向調(diào)度器NODE_REMOVED,則調(diào)度器將此NM從集群管理刪除(詳見第一部分 附加3)),然后發(fā)送NODE_UNUSABLE到NodeListManager,將其RMNode放到“unusable”的set集合當中,此時RMNodeImpl的NodeState(RUNNING→UNHEALTHY)
如果“健康” 則順利運行,獲取NM遠程傳過來的關(guān)于Container的信息(是在NM端用Hadoop RPC調(diào)用nodeHeartbeat()時傳送過來的),
說明:
由于這個地方討論的時候,無Job提交過來,NM端無Container啟動,NM發(fā)送到RM的事件里面的裝有Container狀態(tài)的List為空,所以只傳送“心跳” 表明NM的活動信息,并沒有實際處理,RM端也無Application處理,接受“心跳”只會向RMNode發(fā)送RMNodeCleanContainerEvent事件,清理可能互動的Container(對應(yīng)的代碼見FifoScheduler下的containerLaunchedOnNode函數(shù))。若詳見處理情況的運行狀態(tài),參見后面的文章:RM與NM代碼_心跳驅(qū)動服務(wù)分析_2 Container的配置和分配(Job提交)一篇。此時RMNodeImpl的NodeState(RUNNING→RUNNING)
到這為止,RM-NM端的代碼已經(jīng)啟動完成,RM和NM之間以一定的時間間隔用心跳交互信息,等待Job的提交
附加代碼分析
附加2)RMNodeImpl:當RMNode接收RMNodeEventType.DECOMMISSION),發(fā)生狀態(tài)轉(zhuǎn)移NodeState(RUNNING→DECOMMISSIONED),Transition函數(shù)被調(diào)用,
將DECOMMISSIONED設(shè)置為finalState
當接到RMNodeEventType.REBOOTING情況類似,最后將REBOOTING設(shè)置為finalState。
分析如下圖,其中白色線為第一部分,初始NM注冊到RM階段,黃色線為第二部分,NM發(fā)送“心跳”信息到RM階段
原文鏈接:http://www.cnblogs.com/biyeymyhjob/archive/2012/08/21/2648026.html
【編輯推薦】
- 小白學數(shù)據(jù)分析:怎么做流失分析
- 小白學數(shù)據(jù)分析之K-means理論篇
- 小白學數(shù)據(jù)分析之從購買記錄分析道具支付環(huán)節(jié)
- 小白學數(shù)據(jù)分析之付費滲透率
- 小白學數(shù)據(jù)分析之Excel制作INFOGRAPHIC