Storm源碼淺析之topology的提交
最近一直在讀twitter開源的這個分布式流計算框架——storm的源碼,還是有必要記錄下一些比較有意思的地方。我按照storm的主要概念進行組織,并且只分析我關注的東西,因此稱之為淺析。
一、介紹
Storm的開發(fā)語言主要是Java和Clojure,其中Java定義骨架,而Clojure編寫核心邏輯。源碼統(tǒng)計結(jié)果:
180 text files. 177 unique files. 7 files ignored. http://cloc.sourceforge.net v 1.55 T=1.0 s (171.0 files/s, 46869.0 lines/s) ------------------------------------------------------------------------------- Language files blank comment code ------------------------------------------------------------------------------- Java 125 5010 2414 25661 Lisp 33 732 283 4871 Python 7 742 433 4675 CSS 1 12 45 1837 ruby 2 22 0 104 Bourne Shell 1 0 0 6 Javascript 2 1 15 6 ------------------------------------------------------------------------------- SUM: 171 6519 3190 37160 -------------------------------------------------------------------------------
Java代碼25000多行,而Clojure(Lisp)只有4871行,說語言不重要再次證明是扯淡。
二、Topology和Nimbus
Topology是storm的核心理念,將spout和bolt組織成一個topology,運行在storm集群里,完成實時分析和計算的任務。這里我主要想介紹下topology部署到storm集群的大概過程。提交一個topology任務到Storm集群是通過StormSubmitter.submitTopology方法提交:
StormSubmitter.submitTopology(name, conf, builder.createTopology());
我們將topology打成jar包后,利用bin/storm這個python腳本,執(zhí)行如下命令:
bin/storm jar xxxx.jar com.taobao.MyTopology args
將jar包提交給storm集群。storm腳本會啟動JVM執(zhí)行Topology的main方法,執(zhí)行submitTopology的過程。而submitTopology會將jar文件上傳到nimbus,上傳是通過socket傳輸。在storm這個python腳本的jar方法里可以看到:
- def jar(jarfile, klass, *args):
- exec_storm_class(
- klass,
- jvmtype="-client",
- extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],
- args=args,
- prefix="export STORM_JAR=" + jarfile + ";")
通過環(huán)境變量找到jar包的地址,然后上傳。利用環(huán)境變量傳參是個小技巧。
其次,nimbus在接收到jar文件后,存放到數(shù)據(jù)目錄的inbox目錄,nimbus數(shù)據(jù)目錄的結(jié)構:
-nimbus -inbox -stormjar-57f1d694-2865-4b3b-8a7c-99104fc0aea3.jar -stormjar-76b4e316-b430-4215-9e26-4f33ba4ee520.jar -stormdist -storm-id -stormjar.jar -stormconf.ser -stormcode.ser
其中inbox用于存放提交的jar文件,每個jar文件都重命名為stormjar加上一個32位的UUID。而stormdist存放的是啟動topology后生成的文件,每個topology都分配一個唯一的id,ID的規(guī)則是“name-計數(shù)-時間戳”。啟動后的topology的jar文件名命名為storm.jar ,而它的配置經(jīng)過java序列化后存放在stormconf.ser文件,而stormcode.ser是將topology本身序列化后存放的文件。這些文件在部署的時候,supervisor會從這個目錄下載這些文件,然后在supervisor本地執(zhí)行這些代碼。
進入重點,topology任務的分配過程(zookeeper路徑說明忽略root):
1.在zookeeper上創(chuàng)建/taskheartbeats/{storm id} 路徑,用于任務的心跳檢測。storm對zookeeper的一個重要應用就是利用zk的臨時節(jié)點做存活檢測。task將定時刷新節(jié)點的時間戳,然后nimbus會檢測這個時間戳是否超過timeout設置。
2.從topology中獲取bolts,spouts設置的并行數(shù)目以及全局配置的***并行數(shù),然后產(chǎn)生task id列表,如[1 2 3 4]
3.在zookeeper上創(chuàng)建/tasks/{strom id}/{task id}路徑,并存儲task信息
4.開始分配任務(內(nèi)部稱為assignment), 具體步驟:
(1)從zk上獲得已有的assignment(新的toplogy當然沒有了)
(2)查找所有可用的slot,所謂slot就是可用的worker,在所有supervisor上配置的多個worker的端口。
(3)將任務均勻地分配給可用的worker,這里有兩種情況:
(a)task數(shù)目比worker多,例如task是[1 2 3 4],可用的slot只有[host1:port1 host2:port1],那么最終是這樣分配
- {1: [host1:port1] 2 : [host2:port1]
- 3 : [host1:port1] 4 : [host2:port1]}
,可以看到任務平均地分配在兩個worker上。
(b)如果task數(shù)目比worker少,例如task是[1 2],而worker有[host1:port1 host1:port2 host2:port1 host2:port2],那么首先會將woker排序,將不同host間隔排列,保證task不會全部分配到同一個worker上,也就是將worker排列成
[host1:port1 host2:port1 host1:port2 host2:port2]
,然后分配任務為
{1: host1:port1 , 2 : host2:port2}
(4)記錄啟動時間
(5)判斷現(xiàn)有的assignment是否跟重新分配的assignment相同,如果相同,不需要變更,否則更新assignment到zookeeper的/assignments/{storm id}上。
5.啟動topology,所謂啟動,只是將zookeeper上/storms/{storm id}對應的數(shù)據(jù)里的active設置為true。
6.nimbus會檢查task的心跳,如果發(fā)現(xiàn)task心跳超過超時時間,那么會重新跳到第4步做re-assignment。
原文鏈接:http://www.blogjava.net/killme2008/archive/2011/11/17/364112.html
【編輯推薦】