10 億級(jí)海量數(shù)據(jù)運(yùn)算下,Apache Spark 的四個(gè)技術(shù)應(yīng)用實(shí)踐
原創(chuàng)【51CTO.com原創(chuàng)稿件】2013 年,蘇寧大數(shù)據(jù)團(tuán)隊(duì)以 Hadoop 生態(tài)系統(tǒng)為核心構(gòu)建了整套大數(shù)據(jù)平臺(tái),為整個(gè)蘇寧集團(tuán)所有業(yè)務(wù)團(tuán)隊(duì)提供大數(shù)據(jù)的存儲(chǔ)以及計(jì)算能力。
在蘇寧中臺(tái)供應(yīng)鏈計(jì)算等應(yīng)用場(chǎng)景下,我們基于 Apache Spark 來(lái)構(gòu)建整套零售核心數(shù)據(jù)計(jì)算與分析平臺(tái),解決海量數(shù)據(jù)離線和在線計(jì)算時(shí)效和性能問(wèn)題。
本文主要介紹 Apache Spark 如何實(shí)現(xiàn)蘇寧中臺(tái)商品價(jià)格信息的 TB 級(jí)別復(fù)雜業(yè)務(wù)數(shù)據(jù)處理運(yùn)算,以及其中碰到的問(wèn)題和解決方案。
蘇寧大數(shù)據(jù)平臺(tái)和整體框架結(jié)構(gòu)
蘇寧大數(shù)據(jù)平臺(tái)的整體架構(gòu)以開(kāi)源的基礎(chǔ)平臺(tái)為主,輔助以自研的組件。
圖 1:蘇寧大數(shù)據(jù)平臺(tái)架構(gòu)
圖 2:大數(shù)據(jù)開(kāi)發(fā)平臺(tái)
綜合商品價(jià)格運(yùn)算系統(tǒng)中 Spark 的應(yīng)用
在整個(gè)綜合商品價(jià)格運(yùn)算系統(tǒng)中,我們對(duì)供應(yīng)鏈等數(shù)據(jù)進(jìn)行了整合,生成了目前全部可售商品的價(jià)格庫(kù)存等數(shù)據(jù)。
該數(shù)據(jù)的整合涉及到多個(gè)外圍系統(tǒng)的數(shù)據(jù)整合和業(yè)務(wù)的執(zhí)行。在該項(xiàng)目中,我們運(yùn)用 Spark 技術(shù)來(lái)解決海量數(shù)據(jù)抽取、海量數(shù)據(jù)運(yùn)算的問(wèn)題。
整體流程可以描述為:
- 使用 Spark 從上游系統(tǒng)的 DB2、MySQL 生產(chǎn)環(huán)境備庫(kù)中抽取全量數(shù)據(jù)。
- 使用 Spark 進(jìn)行數(shù)據(jù)的關(guān)聯(lián)和聚合,將各個(gè)源頭數(shù)據(jù)加工轉(zhuǎn)換成計(jì)算所需要的數(shù)據(jù)維度。
- 運(yùn)用 Spark 的 Map 進(jìn)行全量數(shù)據(jù)的運(yùn)算轉(zhuǎn)換。
- 存儲(chǔ)結(jié)果到 HDFS 中,并且在 Hive 表中建立外部表映射到 HDFS 目錄。
下面講述 Apache Spark 的四個(gè)技術(shù)應(yīng)用實(shí)踐。
使用 DataFrame 實(shí)現(xiàn)異構(gòu)數(shù)據(jù)庫(kù)海量數(shù)據(jù)抽取
數(shù)據(jù)處理的規(guī)劃:由于上游系統(tǒng)的數(shù)據(jù),尚未同步到大數(shù)據(jù)存儲(chǔ)系統(tǒng)中(HDFS,Hive,HBase),項(xiàng)目需要獨(dú)立進(jìn)行數(shù)據(jù)的 ETL 工作。
這些上游系統(tǒng)的數(shù)據(jù)存在以下特點(diǎn):
- 數(shù)據(jù)量較大:兩個(gè)主要數(shù)據(jù)源頭數(shù)據(jù)量在 10 億級(jí)。
- 存儲(chǔ)介質(zhì)不同:DB2,MySQL,Hive。
- 存儲(chǔ)的分布不同:業(yè)務(wù)庫(kù)有 10 個(gè)庫(kù) 1000 張表,也有 5 個(gè)庫(kù) 100 張表,和不分表等存儲(chǔ)結(jié)構(gòu),分庫(kù)的規(guī)則有取表名后綴的模,有取表名后綴的區(qū)間等。
- 在系統(tǒng)需求上,又需要將整個(gè)運(yùn)算任務(wù)壓縮到 1 小時(shí)之內(nèi)。我們最終采用的方案是使用 SparkSQL 的 JDBC 接口直接進(jìn)行數(shù)據(jù)的抽取和計(jì)算,相當(dāng)于將數(shù)據(jù) ETL 和數(shù)據(jù)的業(yè)務(wù)處理放置在一個(gè)程序中。
- 相對(duì)于 Sqoop,這樣的解決方案是輕量級(jí)的,1000 個(gè) DataFrame 的 Load 要比 1000 個(gè) Sqoop 任務(wù)的資源消耗要低很多,以及調(diào)度開(kāi)銷(xiāo)的消耗也少很多。
- 便于數(shù)據(jù)業(yè)務(wù)代碼,業(yè)務(wù)針對(duì)動(dòng)態(tài)表的切換,可以將讀取當(dāng)前表編號(hào)的模塊直接嵌入到 Spark 的代碼中進(jìn)行。
對(duì)于 DataFrame 在加載數(shù)據(jù)前的數(shù)據(jù)庫(kù) Schema 性能問(wèn)題,有了一個(gè)較好的優(yōu)化方案,以下是優(yōu)化前和優(yōu)化后,DataFrame 在創(chuàng)建過(guò)程中的流程:
圖 3 :優(yōu)化前 DataFrame 創(chuàng)建流程
圖 4 :優(yōu)化后的 DataFrame 創(chuàng)建流程
在使用該方案后,任務(wù) DataFrame 的 Load 數(shù)據(jù)時(shí)間從原先的接近 30 分鐘縮短為 5 分鐘以?xún)?nèi)。
使用 SparkSQL 結(jié)合 ZipPartition 實(shí)現(xiàn)多層次多維度數(shù)據(jù)關(guān)聯(lián)和優(yōu)化
電商和互聯(lián)網(wǎng)的運(yùn)營(yíng)一般會(huì)涉及到數(shù)據(jù)維度的擴(kuò)散,為了簡(jiǎn)化運(yùn)營(yíng)端的操作難度和提高數(shù)據(jù)提供方的性能,一般會(huì)使用維度擴(kuò)散的方案,將上游運(yùn)營(yíng)系統(tǒng)的數(shù)據(jù)進(jìn)行數(shù)據(jù)的擴(kuò)散,放置到下游數(shù)據(jù)使用方<K,V>存儲(chǔ)中。
在進(jìn)行維度擴(kuò)散時(shí)經(jīng)常會(huì)有數(shù)據(jù)層次的問(wèn)題,數(shù)據(jù)需要多層的關(guān)聯(lián)和層次不同的數(shù)據(jù)需要關(guān)聯(lián)的情況。例如,上游運(yùn)營(yíng)的數(shù)據(jù)為全國(guó)、地區(qū)、城市維度并且有優(yōu)先級(jí),下游數(shù)據(jù)服務(wù)方的維度統(tǒng)一為城市維度。
圖 5:多層關(guān)聯(lián)示意圖
在應(yīng)用系統(tǒng)中,一般的做法是使用點(diǎn)關(guān)聯(lián)查詢(xún),從***優(yōu)先級(jí)的維度進(jìn)行關(guān)聯(lián),關(guān)聯(lián)到則返回,關(guān)聯(lián)不到則繼續(xù)向下一個(gè)優(yōu)先級(jí)進(jìn)行關(guān)聯(lián),直到最終結(jié)果。
但是在 Spark 中,我們是需要運(yùn)用類(lèi)似于數(shù)據(jù)庫(kù)關(guān)聯(lián)的模式解決該問(wèn)題。
使用***優(yōu)先級(jí)進(jìn)行左關(guān)聯(lián),然后過(guò)濾出未關(guān)聯(lián)到的數(shù)據(jù),再依次將未關(guān)聯(lián)到的數(shù)據(jù)進(jìn)行下面優(yōu)先級(jí)的關(guān)聯(lián),直到生成結(jié)果??梢员硎緸椋?/p>
- DataSet LeftJoin DimensionA => DataSetA
- DataSetA Filter(A.Field == NULL) => DataSetToJoinB
- DataSetA Filter(A.Field != NULL) => DataSetAFinal
- DataSetToJoinB LeftJoin DimensionB => DataSetB
- …
- DataSetFinal = DataSetAFinal UNION DataSetBFinal UNION DataSetCFinal
這里就出現(xiàn)一個(gè)問(wèn)題:
- 同樣的數(shù)據(jù)被多次使用,這里從技術(shù)上可以采用 Cache 的方法應(yīng)對(duì)。
- 在使用 Cache 對(duì)數(shù)據(jù)緩存的方法上,假如***優(yōu)先級(jí)的數(shù)據(jù)少,則實(shí)際上大量的數(shù)據(jù)都會(huì)需要 Cache 并且落到***一層。
針對(duì)可能緩存多次數(shù)據(jù)的問(wèn)題,我們嘗試了另外一種方法,全部進(jìn)行左關(guān)聯(lián),并且?guī)蟽?yōu)先級(jí)。最終,我們使用 Group By 的方法對(duì)優(yōu)先級(jí)進(jìn)行了處理,可以表示為:
- DataSet LeftJoin DimensionA with A => DataSetA
- DataSet LeftJoin DimensionB with B => DataSetB
- DataSet LeftJoin DimensionC with C => DataSetC
- DataSetA UNION DataSetB UINION DataSetC => DataSetToGroupBy
- DataSetFinal = (DataSetToGroupBy GroupBy Dimension).ApplyPriority()
這里方案就只需要進(jìn)行一次的數(shù)據(jù) Cache。
最終我們根據(jù)兩種方案對(duì)實(shí)際測(cè)試的結(jié)果進(jìn)行取舍,該部分的優(yōu)化和一般的數(shù)據(jù)庫(kù)優(yōu)化一樣,都需要考慮到實(shí)際的數(shù)據(jù)關(guān)聯(lián)的情況和業(yè)務(wù)要求,獲取***化的方案。已達(dá)到***的任務(wù)運(yùn)行效益。
通過(guò)并行控制 DAG,優(yōu)化執(zhí)行時(shí)間
在進(jìn)行復(fù)雜業(yè)務(wù)的處理過(guò)程中,我們發(fā)現(xiàn)有部分?jǐn)?shù)據(jù)未進(jìn)行分表分庫(kù),并且數(shù)據(jù)量相對(duì)較大(約 20 億),這部分?jǐn)?shù)據(jù)的載入效率直接影響了后面的整個(gè)效率。
例如,我們正常使用 128 核進(jìn)行運(yùn)算,但一旦運(yùn)行到該步驟,則變成了單個(gè)核進(jìn)行運(yùn)行,時(shí)間長(zhǎng)達(dá) 5 分鐘。而 5 分鐘對(duì)于系統(tǒng)運(yùn)算有嚴(yán)格時(shí)間區(qū)間要求的業(yè)務(wù)需求是非常嚴(yán)峻的。
針對(duì)該問(wèn)題,我們使用 Driver 端并行提交任務(wù)的方法進(jìn)行解決。這是根據(jù)我們的任務(wù)模式所決定的,Driver 需要大量的時(shí)間建立數(shù)據(jù)的整個(gè)流程(4000 多張表的 DataFrame),并在最終存儲(chǔ)結(jié)果 action 代碼執(zhí)行之前就進(jìn)行數(shù)據(jù)的加載。
具體操作流程是:
- 將該部分?jǐn)?shù)據(jù)標(biāo)記為 Cache。
- 執(zhí)行 countSync 直接進(jìn)行數(shù)據(jù)加載。
- 在必須用到該步驟的流程進(jìn)行 Get 命令阻塞 Driver 主線程,確保數(shù)據(jù)加載后進(jìn)行后續(xù)的操作。
- 后續(xù)步驟直接使用已經(jīng)緩存的數(shù)據(jù)進(jìn)行運(yùn)算。
該方案可以用下圖表示:
圖 6:并行優(yōu)化示意圖
計(jì)算結(jié)果是通過(guò)并行的加載后,我們將整個(gè)的流程縮短了 5 分鐘。我們也在其他對(duì)于運(yùn)行時(shí)間有嚴(yán)格要求的項(xiàng)目中使用該方法對(duì)于業(yè)務(wù)流程中需要獨(dú)立計(jì)算,資源占用小,但是耗時(shí)較長(zhǎng)的模塊進(jìn)行了優(yōu)化,都一定程度上縮短了所需要的時(shí)間。
Spark 的 ClassLoader 所帶來(lái)的問(wèn)題和解決方案
該問(wèn)題出現(xiàn)的原因是公司對(duì)運(yùn)維和研發(fā)的數(shù)據(jù)隔離有明確的要求,代碼和研發(fā)的配置中不允許出現(xiàn)生產(chǎn)環(huán)境的數(shù)據(jù)庫(kù)配置。
原先的配置都是由運(yùn)維在 Websphere,Wildfly 等中間件中,通過(guò) JNDI 的方法配置給實(shí)際的應(yīng)用程序。
但是項(xiàng)目中又需要使用 JDBC 去直接連接生產(chǎn)環(huán)境的數(shù)據(jù)庫(kù),這樣就帶來(lái)了數(shù)據(jù)庫(kù)連接的問(wèn)題。
我們采用自定義的 JDBC 封裝原先的 JDBC,讓外層的 JDBC 以 Token 的方式獲取實(shí)際的數(shù)據(jù)庫(kù)連接,并且由實(shí)際的 JDBC 進(jìn)行操作。
在這種需求下,我們?cè)谔幚磉^(guò)程中考慮到目前分布式協(xié)調(diào)組件的壓力,將數(shù)據(jù)庫(kù)的 Token 封裝到 Jar 包,使用 ClassLoader 去讀取 Token 數(shù)據(jù)。
正常情況下,我們?cè)?Spark 端使用到的 Classloader 順序來(lái)加載 Token 文件:
ExtClassLoader-> AppClassLoader -> MutableURLClassLoader.
然而,在集群運(yùn)行時(shí),我們?cè)?Driver 主線程中的 ClassLoader 是 AppClassLoader,而它無(wú)法讀取到 http 的 jar 包里面的 token 文件。
我們進(jìn)行了簡(jiǎn)單的方案,將當(dāng)期線程中的 ClassLoader 替換成為當(dāng)期業(yè)務(wù)代碼類(lèi)的 ClassLoader。
Spark 應(yīng)用的實(shí)踐總結(jié)
我們?cè)趦r(jià)格運(yùn)算系統(tǒng)中使用了 SOA 模式中的 Aggregate Reporting 模式,將多個(gè)產(chǎn)品線的數(shù)據(jù)進(jìn)行了整合,提供了一個(gè)業(yè)務(wù)聚合的數(shù)據(jù)集市。
在模塊設(shè)計(jì)上,我們將數(shù)據(jù)的抽取和數(shù)據(jù)的運(yùn)算集成到一起,這樣在帶來(lái)效率和便利的同時(shí)也必然帶來(lái)模塊耦合。
從目前的實(shí)踐中,可以得出幾個(gè)結(jié)論:
- 使用 JDBC 抽取大量數(shù)據(jù)表直接進(jìn)行計(jì)算是可行的,尤其針對(duì)分表分庫(kù)和可以進(jìn)行并行抽取的數(shù)據(jù)庫(kù)。但是有幾點(diǎn)需要特別關(guān)注:
①DataFrame 在大量創(chuàng)建時(shí),對(duì)帶來(lái)的 Driver 端時(shí)間消耗需要進(jìn)行優(yōu)化,減少數(shù)據(jù)流轉(zhuǎn)的時(shí)間。
②數(shù)據(jù)源中一旦有任何一個(gè)出現(xiàn)失敗,整個(gè)任務(wù)就可能需要重新運(yùn)算,這個(gè)是將 ETL 和運(yùn)算放到一個(gè)模塊帶來(lái)的問(wèn)題。
③DataFrame 大量并行抽取時(shí),對(duì)數(shù)據(jù)庫(kù)的 IO 壓力是比較大的,Spark 難以對(duì)這部分的并行度進(jìn)行控制。
需要根據(jù)實(shí)際情況考慮使用合并抽取(Union多張表),或者進(jìn)行任務(wù)拆分。任務(wù)拆分可以解決抽取時(shí)并行度和計(jì)算時(shí)并行度和資源要求不一樣的問(wèn)題。
- ZipPartition 是數(shù)據(jù)關(guān)聯(lián)的最終解決方案,通過(guò) SparkSQL 和 RDD 的 JOIN 時(shí)維度層次不一樣,或者其他難以 JOIN 的問(wèn)題都可以通過(guò)這個(gè)方法解決。
但由于 ZipPartition 是 RDD 底層的操作,開(kāi)發(fā)人員幾乎完全控制 Worker 的關(guān)聯(lián)模型,它的性能調(diào)優(yōu)顯得尤其重要。
而對(duì)于有層次結(jié)構(gòu)的關(guān)聯(lián),使用 JOIN – FILTER –UNION 和 JOIN – UNION –GROUPBY,則需要分析實(shí)際數(shù)據(jù)的情況,再進(jìn)行取舍。
- 在 Driver 端進(jìn)行并行可以解決部分計(jì)算時(shí)間的問(wèn)題,但需要滿足幾個(gè)條件:
①可以并行的時(shí)間點(diǎn),必須是在集群相對(duì)空閑的時(shí)候,例如:Driver 端在進(jìn)行初始化 RDD 的時(shí)候,在編排大型 DAG 的時(shí)候。
②可并行的數(shù)據(jù)必須在最終使用之前確認(rèn)加載到 Cache。
③不應(yīng)該對(duì)過(guò)于大的數(shù)據(jù)集進(jìn)行并行加載操作。
- Spark 在 ClassLoader 層面進(jìn)行了變動(dòng),將配置文件封裝在 jar 包中有可能無(wú)法讀取,正確的做法是將配置信息使用同一配置管理,例如基于 Zookeeper 配置。
蘇寧未來(lái)的開(kāi)發(fā)方向
蘇寧在 IT 的系統(tǒng)架構(gòu)方面未來(lái)會(huì)根據(jù)各種不同的應(yīng)用需求使用異構(gòu)化的數(shù)據(jù)存儲(chǔ),在數(shù)據(jù)架構(gòu)層面逐漸向 DataLake+Big DataWarehousing 的模式發(fā)展。
數(shù)據(jù)服務(wù)必須具備以下能力:
- 數(shù)據(jù)治理能力:各個(gè)系統(tǒng)通過(guò)統(tǒng)一的規(guī)范進(jìn)行數(shù)據(jù)的業(yè)務(wù)分類(lèi),數(shù)據(jù)輸出。其他系統(tǒng)和分析人員可以使用統(tǒng)一的數(shù)據(jù)規(guī)范運(yùn)用數(shù)據(jù)。
數(shù)據(jù)需要能夠支持快速的輸出,并且保持與業(yè)務(wù)系統(tǒng)實(shí)時(shí)同步。
- 數(shù)據(jù)分析能力:在統(tǒng)一數(shù)據(jù)治理的基礎(chǔ)上進(jìn)行數(shù)據(jù)的分析,挖掘數(shù)據(jù)對(duì)于運(yùn)營(yíng)系統(tǒng)和銷(xiāo)售系統(tǒng)的價(jià)值。
我們將使用 Apache Spark 平臺(tái)結(jié)合 Hadoop 生態(tài)圈的其他工具進(jìn)行開(kāi)發(fā),逐步形成以 Spark,Storm 等為引擎的一體化數(shù)據(jù)處理分析平臺(tái),提升整個(gè)蘇寧的數(shù)據(jù)運(yùn)用能力。
話題討論
當(dāng)前大數(shù)據(jù)架構(gòu)最火熱的莫過(guò)于分布式計(jì)算架構(gòu) Hadoop 和流數(shù)據(jù)處理框架 Spark/Storm 這兩類(lèi)。
網(wǎng)上逐漸有一種聲音說(shuō) Hadoop 的日子已經(jīng)快到頭了,真是這樣嗎?未來(lái)大數(shù)據(jù)架構(gòu)究竟該走向何方呢? 歡迎大家直接在底部參與評(píng)論!
王卓偉
蘇寧云商 IT 總部高級(jí)技術(shù)經(jīng)理
擁有多年 IT 平臺(tái)研發(fā)和管理工作經(jīng)驗(yàn),先后在聯(lián)創(chuàng),焦點(diǎn),蘇寧等大型互聯(lián)網(wǎng)和 IT 企業(yè)工作,現(xiàn)主要承擔(dān)蘇寧易購(gòu)價(jià)格數(shù)據(jù)分析和實(shí)時(shí)處理優(yōu)化提升,負(fù)責(zé)價(jià)格搜索響應(yīng)提升系統(tǒng),在大數(shù)據(jù)和數(shù)據(jù)分析上有多年的實(shí)戰(zhàn)經(jīng)驗(yàn),專(zhuān)注 Java,大數(shù)據(jù),SOA 等技術(shù)領(lǐng)域。
【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】