自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

10 億級(jí)海量數(shù)據(jù)運(yùn)算下,Apache Spark 的四個(gè)技術(shù)應(yīng)用實(shí)踐

原創(chuàng)
大數(shù)據(jù) Spark
本文主要介紹 Apache Spark 如何實(shí)現(xiàn)蘇寧中臺(tái)商品價(jià)格信息的 TB 級(jí)別復(fù)雜業(yè)務(wù)數(shù)據(jù)處理運(yùn)算,以及其中碰到的問(wèn)題和解決方案。

[[196475]]

【51CTO.com原創(chuàng)稿件】2013 年,蘇寧大數(shù)據(jù)團(tuán)隊(duì)以 Hadoop 生態(tài)系統(tǒng)為核心構(gòu)建了整套大數(shù)據(jù)平臺(tái),為整個(gè)蘇寧集團(tuán)所有業(yè)務(wù)團(tuán)隊(duì)提供大數(shù)據(jù)的存儲(chǔ)以及計(jì)算能力。

在蘇寧中臺(tái)供應(yīng)鏈計(jì)算等應(yīng)用場(chǎng)景下,我們基于 Apache Spark 來(lái)構(gòu)建整套零售核心數(shù)據(jù)計(jì)算與分析平臺(tái),解決海量數(shù)據(jù)離線和在線計(jì)算時(shí)效和性能問(wèn)題。

本文主要介紹 Apache Spark 如何實(shí)現(xiàn)蘇寧中臺(tái)商品價(jià)格信息的 TB 級(jí)別復(fù)雜業(yè)務(wù)數(shù)據(jù)處理運(yùn)算,以及其中碰到的問(wèn)題和解決方案。

蘇寧大數(shù)據(jù)平臺(tái)和整體框架結(jié)構(gòu)

蘇寧大數(shù)據(jù)平臺(tái)的整體架構(gòu)以開(kāi)源的基礎(chǔ)平臺(tái)為主,輔助以自研的組件。

圖 1:蘇寧大數(shù)據(jù)平臺(tái)架構(gòu)

圖 2:大數(shù)據(jù)開(kāi)發(fā)平臺(tái)

綜合商品價(jià)格運(yùn)算系統(tǒng)中 Spark 的應(yīng)用

在整個(gè)綜合商品價(jià)格運(yùn)算系統(tǒng)中,我們對(duì)供應(yīng)鏈等數(shù)據(jù)進(jìn)行了整合,生成了目前全部可售商品的價(jià)格庫(kù)存等數(shù)據(jù)。

該數(shù)據(jù)的整合涉及到多個(gè)外圍系統(tǒng)的數(shù)據(jù)整合和業(yè)務(wù)的執(zhí)行。在該項(xiàng)目中,我們運(yùn)用 Spark 技術(shù)來(lái)解決海量數(shù)據(jù)抽取、海量數(shù)據(jù)運(yùn)算的問(wèn)題。

整體流程可以描述為:

  • 使用 Spark 從上游系統(tǒng)的 DB2、MySQL 生產(chǎn)環(huán)境備庫(kù)中抽取全量數(shù)據(jù)。
  • 使用 Spark 進(jìn)行數(shù)據(jù)的關(guān)聯(lián)和聚合,將各個(gè)源頭數(shù)據(jù)加工轉(zhuǎn)換成計(jì)算所需要的數(shù)據(jù)維度。
  • 運(yùn)用 Spark 的 Map 進(jìn)行全量數(shù)據(jù)的運(yùn)算轉(zhuǎn)換。
  • 存儲(chǔ)結(jié)果到 HDFS 中,并且在 Hive 表中建立外部表映射到 HDFS 目錄。

下面講述 Apache Spark 的四個(gè)技術(shù)應(yīng)用實(shí)踐。

使用 DataFrame 實(shí)現(xiàn)異構(gòu)數(shù)據(jù)庫(kù)海量數(shù)據(jù)抽取

數(shù)據(jù)處理的規(guī)劃:由于上游系統(tǒng)的數(shù)據(jù),尚未同步到大數(shù)據(jù)存儲(chǔ)系統(tǒng)中(HDFS,Hive,HBase),項(xiàng)目需要獨(dú)立進(jìn)行數(shù)據(jù)的 ETL 工作。

這些上游系統(tǒng)的數(shù)據(jù)存在以下特點(diǎn):

  • 數(shù)據(jù)量較大:兩個(gè)主要數(shù)據(jù)源頭數(shù)據(jù)量在 10 億級(jí)。
  • 存儲(chǔ)介質(zhì)不同:DB2,MySQL,Hive。
  • 存儲(chǔ)的分布不同:業(yè)務(wù)庫(kù)有 10 個(gè)庫(kù) 1000 張表,也有 5 個(gè)庫(kù) 100 張表,和不分表等存儲(chǔ)結(jié)構(gòu),分庫(kù)的規(guī)則有取表名后綴的模,有取表名后綴的區(qū)間等。
  • 在系統(tǒng)需求上,又需要將整個(gè)運(yùn)算任務(wù)壓縮到 1 小時(shí)之內(nèi)。我們最終采用的方案是使用 SparkSQL 的 JDBC 接口直接進(jìn)行數(shù)據(jù)的抽取和計(jì)算,相當(dāng)于將數(shù)據(jù) ETL 和數(shù)據(jù)的業(yè)務(wù)處理放置在一個(gè)程序中。
  • 相對(duì)于 Sqoop,這樣的解決方案是輕量級(jí)的,1000 個(gè) DataFrame 的 Load 要比 1000 個(gè) Sqoop 任務(wù)的資源消耗要低很多,以及調(diào)度開(kāi)銷(xiāo)的消耗也少很多。
  • 便于數(shù)據(jù)業(yè)務(wù)代碼,業(yè)務(wù)針對(duì)動(dòng)態(tài)表的切換,可以將讀取當(dāng)前表編號(hào)的模塊直接嵌入到 Spark 的代碼中進(jìn)行。

對(duì)于 DataFrame 在加載數(shù)據(jù)前的數(shù)據(jù)庫(kù) Schema 性能問(wèn)題,有了一個(gè)較好的優(yōu)化方案,以下是優(yōu)化前和優(yōu)化后,DataFrame 在創(chuàng)建過(guò)程中的流程:

圖 3 :優(yōu)化前 DataFrame 創(chuàng)建流程

圖 4 :優(yōu)化后的 DataFrame 創(chuàng)建流程

在使用該方案后,任務(wù) DataFrame 的 Load 數(shù)據(jù)時(shí)間從原先的接近 30 分鐘縮短為 5 分鐘以?xún)?nèi)。

使用 SparkSQL 結(jié)合 ZipPartition 實(shí)現(xiàn)多層次多維度數(shù)據(jù)關(guān)聯(lián)和優(yōu)化

電商和互聯(lián)網(wǎng)的運(yùn)營(yíng)一般會(huì)涉及到數(shù)據(jù)維度的擴(kuò)散,為了簡(jiǎn)化運(yùn)營(yíng)端的操作難度和提高數(shù)據(jù)提供方的性能,一般會(huì)使用維度擴(kuò)散的方案,將上游運(yùn)營(yíng)系統(tǒng)的數(shù)據(jù)進(jìn)行數(shù)據(jù)的擴(kuò)散,放置到下游數(shù)據(jù)使用方<K,V>存儲(chǔ)中。

在進(jìn)行維度擴(kuò)散時(shí)經(jīng)常會(huì)有數(shù)據(jù)層次的問(wèn)題,數(shù)據(jù)需要多層的關(guān)聯(lián)和層次不同的數(shù)據(jù)需要關(guān)聯(lián)的情況。例如,上游運(yùn)營(yíng)的數(shù)據(jù)為全國(guó)、地區(qū)、城市維度并且有優(yōu)先級(jí),下游數(shù)據(jù)服務(wù)方的維度統(tǒng)一為城市維度。

圖 5:多層關(guān)聯(lián)示意圖

在應(yīng)用系統(tǒng)中,一般的做法是使用點(diǎn)關(guān)聯(lián)查詢(xún),從***優(yōu)先級(jí)的維度進(jìn)行關(guān)聯(lián),關(guān)聯(lián)到則返回,關(guān)聯(lián)不到則繼續(xù)向下一個(gè)優(yōu)先級(jí)進(jìn)行關(guān)聯(lián),直到最終結(jié)果。

但是在 Spark 中,我們是需要運(yùn)用類(lèi)似于數(shù)據(jù)庫(kù)關(guān)聯(lián)的模式解決該問(wèn)題。

使用***優(yōu)先級(jí)進(jìn)行左關(guān)聯(lián),然后過(guò)濾出未關(guān)聯(lián)到的數(shù)據(jù),再依次將未關(guān)聯(lián)到的數(shù)據(jù)進(jìn)行下面優(yōu)先級(jí)的關(guān)聯(lián),直到生成結(jié)果??梢员硎緸椋?/p>

  1. DataSet LeftJoin DimensionA =>    DataSetA 
  2. DataSetA Filter(A.Field == NULL) =>    DataSetToJoinB 
  3. DataSetA    Filter(A.Field != NULL) => DataSetAFinal 
  4. DataSetToJoinB LeftJoin DimensionB =>    DataSetB 
  5.               … 
  6. DataSetFinal = DataSetAFinal UNION    DataSetBFinal UNION DataSetCFinal 

這里就出現(xiàn)一個(gè)問(wèn)題:

  • 同樣的數(shù)據(jù)被多次使用,這里從技術(shù)上可以采用 Cache 的方法應(yīng)對(duì)。
  • 在使用 Cache 對(duì)數(shù)據(jù)緩存的方法上,假如***優(yōu)先級(jí)的數(shù)據(jù)少,則實(shí)際上大量的數(shù)據(jù)都會(huì)需要 Cache 并且落到***一層。

針對(duì)可能緩存多次數(shù)據(jù)的問(wèn)題,我們嘗試了另外一種方法,全部進(jìn)行左關(guān)聯(lián),并且?guī)蟽?yōu)先級(jí)。最終,我們使用 Group By 的方法對(duì)優(yōu)先級(jí)進(jìn)行了處理,可以表示為:

  1. DataSet    LeftJoin DimensionA with A => DataSetA 
  2. DataSet    LeftJoin DimensionB with B => DataSetB 
  3. DataSet    LeftJoin DimensionC with C => DataSetC 
  4. DataSetA    UNION DataSetB UINION DataSetC => DataSetToGroupBy 
  5. DataSetFinal    = (DataSetToGroupBy GroupBy Dimension).ApplyPriority() 

這里方案就只需要進(jìn)行一次的數(shù)據(jù) Cache。

最終我們根據(jù)兩種方案對(duì)實(shí)際測(cè)試的結(jié)果進(jìn)行取舍,該部分的優(yōu)化和一般的數(shù)據(jù)庫(kù)優(yōu)化一樣,都需要考慮到實(shí)際的數(shù)據(jù)關(guān)聯(lián)的情況和業(yè)務(wù)要求,獲取***化的方案。已達(dá)到***的任務(wù)運(yùn)行效益。

通過(guò)并行控制 DAG,優(yōu)化執(zhí)行時(shí)間

在進(jìn)行復(fù)雜業(yè)務(wù)的處理過(guò)程中,我們發(fā)現(xiàn)有部分?jǐn)?shù)據(jù)未進(jìn)行分表分庫(kù),并且數(shù)據(jù)量相對(duì)較大(約 20 億),這部分?jǐn)?shù)據(jù)的載入效率直接影響了后面的整個(gè)效率。

例如,我們正常使用 128 核進(jìn)行運(yùn)算,但一旦運(yùn)行到該步驟,則變成了單個(gè)核進(jìn)行運(yùn)行,時(shí)間長(zhǎng)達(dá) 5 分鐘。而 5 分鐘對(duì)于系統(tǒng)運(yùn)算有嚴(yán)格時(shí)間區(qū)間要求的業(yè)務(wù)需求是非常嚴(yán)峻的。

針對(duì)該問(wèn)題,我們使用 Driver 端并行提交任務(wù)的方法進(jìn)行解決。這是根據(jù)我們的任務(wù)模式所決定的,Driver 需要大量的時(shí)間建立數(shù)據(jù)的整個(gè)流程(4000 多張表的 DataFrame),并在最終存儲(chǔ)結(jié)果 action 代碼執(zhí)行之前就進(jìn)行數(shù)據(jù)的加載。

具體操作流程是:

  • 將該部分?jǐn)?shù)據(jù)標(biāo)記為 Cache。
  • 執(zhí)行 countSync 直接進(jìn)行數(shù)據(jù)加載。
  • 在必須用到該步驟的流程進(jìn)行 Get 命令阻塞 Driver 主線程,確保數(shù)據(jù)加載后進(jìn)行后續(xù)的操作。
  • 后續(xù)步驟直接使用已經(jīng)緩存的數(shù)據(jù)進(jìn)行運(yùn)算。

該方案可以用下圖表示:

圖 6:并行優(yōu)化示意圖

計(jì)算結(jié)果是通過(guò)并行的加載后,我們將整個(gè)的流程縮短了 5 分鐘。我們也在其他對(duì)于運(yùn)行時(shí)間有嚴(yán)格要求的項(xiàng)目中使用該方法對(duì)于業(yè)務(wù)流程中需要獨(dú)立計(jì)算,資源占用小,但是耗時(shí)較長(zhǎng)的模塊進(jìn)行了優(yōu)化,都一定程度上縮短了所需要的時(shí)間。

Spark 的 ClassLoader 所帶來(lái)的問(wèn)題和解決方案

該問(wèn)題出現(xiàn)的原因是公司對(duì)運(yùn)維和研發(fā)的數(shù)據(jù)隔離有明確的要求,代碼和研發(fā)的配置中不允許出現(xiàn)生產(chǎn)環(huán)境的數(shù)據(jù)庫(kù)配置。

原先的配置都是由運(yùn)維在 Websphere,Wildfly 等中間件中,通過(guò) JNDI 的方法配置給實(shí)際的應(yīng)用程序。

但是項(xiàng)目中又需要使用 JDBC 去直接連接生產(chǎn)環(huán)境的數(shù)據(jù)庫(kù),這樣就帶來(lái)了數(shù)據(jù)庫(kù)連接的問(wèn)題。

我們采用自定義的 JDBC 封裝原先的 JDBC,讓外層的 JDBC 以 Token 的方式獲取實(shí)際的數(shù)據(jù)庫(kù)連接,并且由實(shí)際的 JDBC 進(jìn)行操作。

在這種需求下,我們?cè)谔幚磉^(guò)程中考慮到目前分布式協(xié)調(diào)組件的壓力,將數(shù)據(jù)庫(kù)的 Token 封裝到 Jar 包,使用 ClassLoader 去讀取 Token 數(shù)據(jù)。

正常情況下,我們?cè)?Spark 端使用到的 Classloader 順序來(lái)加載 Token 文件:

ExtClassLoader-> AppClassLoader -> MutableURLClassLoader.

然而,在集群運(yùn)行時(shí),我們?cè)?Driver 主線程中的 ClassLoader 是 AppClassLoader,而它無(wú)法讀取到 http 的 jar 包里面的 token 文件。

我們進(jìn)行了簡(jiǎn)單的方案,將當(dāng)期線程中的 ClassLoader 替換成為當(dāng)期業(yè)務(wù)代碼類(lèi)的 ClassLoader。

Spark 應(yīng)用的實(shí)踐總結(jié)

我們?cè)趦r(jià)格運(yùn)算系統(tǒng)中使用了 SOA 模式中的 Aggregate Reporting 模式,將多個(gè)產(chǎn)品線的數(shù)據(jù)進(jìn)行了整合,提供了一個(gè)業(yè)務(wù)聚合的數(shù)據(jù)集市。

在模塊設(shè)計(jì)上,我們將數(shù)據(jù)的抽取和數(shù)據(jù)的運(yùn)算集成到一起,這樣在帶來(lái)效率和便利的同時(shí)也必然帶來(lái)模塊耦合。

從目前的實(shí)踐中,可以得出幾個(gè)結(jié)論:

  • 使用 JDBC 抽取大量數(shù)據(jù)表直接進(jìn)行計(jì)算是可行的,尤其針對(duì)分表分庫(kù)和可以進(jìn)行并行抽取的數(shù)據(jù)庫(kù)。但是有幾點(diǎn)需要特別關(guān)注:

①DataFrame 在大量創(chuàng)建時(shí),對(duì)帶來(lái)的 Driver 端時(shí)間消耗需要進(jìn)行優(yōu)化,減少數(shù)據(jù)流轉(zhuǎn)的時(shí)間。

②數(shù)據(jù)源中一旦有任何一個(gè)出現(xiàn)失敗,整個(gè)任務(wù)就可能需要重新運(yùn)算,這個(gè)是將 ETL 和運(yùn)算放到一個(gè)模塊帶來(lái)的問(wèn)題。

③DataFrame 大量并行抽取時(shí),對(duì)數(shù)據(jù)庫(kù)的 IO 壓力是比較大的,Spark 難以對(duì)這部分的并行度進(jìn)行控制。

需要根據(jù)實(shí)際情況考慮使用合并抽取(Union多張表),或者進(jìn)行任務(wù)拆分。任務(wù)拆分可以解決抽取時(shí)并行度和計(jì)算時(shí)并行度和資源要求不一樣的問(wèn)題。

  • ZipPartition 是數(shù)據(jù)關(guān)聯(lián)的最終解決方案,通過(guò) SparkSQL 和 RDD 的 JOIN 時(shí)維度層次不一樣,或者其他難以 JOIN 的問(wèn)題都可以通過(guò)這個(gè)方法解決。

但由于 ZipPartition 是 RDD 底層的操作,開(kāi)發(fā)人員幾乎完全控制 Worker 的關(guān)聯(lián)模型,它的性能調(diào)優(yōu)顯得尤其重要。

而對(duì)于有層次結(jié)構(gòu)的關(guān)聯(lián),使用 JOIN – FILTER –UNION 和 JOIN – UNION –GROUPBY,則需要分析實(shí)際數(shù)據(jù)的情況,再進(jìn)行取舍。

  • 在 Driver 端進(jìn)行并行可以解決部分計(jì)算時(shí)間的問(wèn)題,但需要滿足幾個(gè)條件:

①可以并行的時(shí)間點(diǎn),必須是在集群相對(duì)空閑的時(shí)候,例如:Driver 端在進(jìn)行初始化 RDD 的時(shí)候,在編排大型 DAG 的時(shí)候。

②可并行的數(shù)據(jù)必須在最終使用之前確認(rèn)加載到 Cache。

③不應(yīng)該對(duì)過(guò)于大的數(shù)據(jù)集進(jìn)行并行加載操作。

  • Spark 在 ClassLoader 層面進(jìn)行了變動(dòng),將配置文件封裝在 jar 包中有可能無(wú)法讀取,正確的做法是將配置信息使用同一配置管理,例如基于 Zookeeper 配置。

蘇寧未來(lái)的開(kāi)發(fā)方向

蘇寧在 IT 的系統(tǒng)架構(gòu)方面未來(lái)會(huì)根據(jù)各種不同的應(yīng)用需求使用異構(gòu)化的數(shù)據(jù)存儲(chǔ),在數(shù)據(jù)架構(gòu)層面逐漸向 DataLake+Big DataWarehousing 的模式發(fā)展。

數(shù)據(jù)服務(wù)必須具備以下能力:

  • 數(shù)據(jù)治理能力:各個(gè)系統(tǒng)通過(guò)統(tǒng)一的規(guī)范進(jìn)行數(shù)據(jù)的業(yè)務(wù)分類(lèi),數(shù)據(jù)輸出。其他系統(tǒng)和分析人員可以使用統(tǒng)一的數(shù)據(jù)規(guī)范運(yùn)用數(shù)據(jù)。

數(shù)據(jù)需要能夠支持快速的輸出,并且保持與業(yè)務(wù)系統(tǒng)實(shí)時(shí)同步。

  • 數(shù)據(jù)分析能力:在統(tǒng)一數(shù)據(jù)治理的基礎(chǔ)上進(jìn)行數(shù)據(jù)的分析,挖掘數(shù)據(jù)對(duì)于運(yùn)營(yíng)系統(tǒng)和銷(xiāo)售系統(tǒng)的價(jià)值。

我們將使用 Apache Spark 平臺(tái)結(jié)合 Hadoop 生態(tài)圈的其他工具進(jìn)行開(kāi)發(fā),逐步形成以 Spark,Storm 等為引擎的一體化數(shù)據(jù)處理分析平臺(tái),提升整個(gè)蘇寧的數(shù)據(jù)運(yùn)用能力。

話題討論

當(dāng)前大數(shù)據(jù)架構(gòu)最火熱的莫過(guò)于分布式計(jì)算架構(gòu) Hadoop 和流數(shù)據(jù)處理框架 Spark/Storm 這兩類(lèi)。

網(wǎng)上逐漸有一種聲音說(shuō) Hadoop 的日子已經(jīng)快到頭了,真是這樣嗎?未來(lái)大數(shù)據(jù)架構(gòu)究竟該走向何方呢? 歡迎大家直接在底部參與評(píng)論!

[[196477]]

王卓偉

蘇寧云商 IT 總部高級(jí)技術(shù)經(jīng)理

擁有多年 IT 平臺(tái)研發(fā)和管理工作經(jīng)驗(yàn),先后在聯(lián)創(chuàng),焦點(diǎn),蘇寧等大型互聯(lián)網(wǎng)和 IT 企業(yè)工作,現(xiàn)主要承擔(dān)蘇寧易購(gòu)價(jià)格數(shù)據(jù)分析和實(shí)時(shí)處理優(yōu)化提升,負(fù)責(zé)價(jià)格搜索響應(yīng)提升系統(tǒng),在大數(shù)據(jù)和數(shù)據(jù)分析上有多年的實(shí)戰(zhàn)經(jīng)驗(yàn),專(zhuān)注 Java,大數(shù)據(jù),SOA 等技術(shù)領(lǐng)域。

【51CTO原創(chuàng)稿件,合作站點(diǎn)轉(zhuǎn)載請(qǐng)注明原文作者和出處為51CTO.com】

責(zé)任編輯:武曉燕 來(lái)源: 51CTO技術(shù)棧
相關(guān)推薦

2022-03-29 15:17:51

數(shù)據(jù)安全網(wǎng)絡(luò)安全

2016-08-22 15:15:14

數(shù)據(jù)實(shí)踐

2022-09-13 15:22:04

邊緣計(jì)算云計(jì)算

2020-08-11 07:00:00

大數(shù)據(jù)IT技術(shù)

2022-10-26 14:55:53

AIoT物聯(lián)網(wǎng)人工智能

2021-08-02 10:22:29

大數(shù)據(jù)安全云計(jì)算數(shù)據(jù)安全

2021-03-08 15:42:54

數(shù)據(jù)庫(kù)Apache Iceb開(kāi)源

2022-04-24 22:57:10

混合云云計(jì)算數(shù)據(jù)安全

2023-11-09 18:07:25

Pycharm插件

2022-03-30 14:13:53

安全漏洞首席信息安全官

2023-11-03 00:28:44

ApacheFlink

2022-02-08 23:16:34

元宇宙技術(shù)VR/AR

2011-07-01 16:07:18

云應(yīng)用集成云計(jì)算

2020-12-29 08:00:00

Windows 10Windows微軟

2021-12-03 14:37:38

數(shù)據(jù)備份存儲(chǔ)備份

2024-01-25 08:59:52

大數(shù)據(jù)vivo架構(gòu)

2020-09-10 10:16:09

開(kāi)源代碼安全性漏洞惡意組件

2011-07-25 14:39:06

組策略

2022-12-13 14:41:01

技術(shù)人工智能

2010-06-29 09:06:39

Java思想Java虛擬機(jī)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)