自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark及Spark Streaming核心原理及實踐

大數(shù)據(jù) Spark
本文依次從spark生態(tài),原理,基本概念,spark streaming原理及實踐,還有spark調(diào)優(yōu)以及環(huán)境搭建等方面進行介紹,希望對大家有所幫助。

導語 : Spark 已經(jīng)成為廣告、報表以及推薦系統(tǒng)等大數(shù)據(jù)計算場景中***系統(tǒng),因效率高,易用以及通用性越來越得到大家的青睞,我自己最近半年在接觸spark以及spark streaming之后,對spark技術(shù)的使用有一些自己的經(jīng)驗積累以及心得體會,在此分享給大家。本文依次從spark生態(tài),原理,基本概念,spark streaming原理及實踐,還有spark調(diào)優(yōu)以及環(huán)境搭建等方面進行介紹,希望對大家有所幫助。

spark 生態(tài)及運行原理

 

Spark 特點

  • 運行速度快 => Spark擁有DAG執(zhí)行引擎,支持在內(nèi)存中對數(shù)據(jù)進行迭代計算。官方提供的數(shù)據(jù)表明,如果數(shù)據(jù)由磁盤讀取,速度是Hadoop MapReduce的10倍以上,如果數(shù)據(jù)從內(nèi)存中讀取,速度可以高達100多倍。
  • 適用場景廣泛 => 大數(shù)據(jù)分析統(tǒng)計,實時數(shù)據(jù)處理,圖計算及機器學習
  • 易用性 => 編寫簡單,支持80種以上的高級算子,支持多種語言,數(shù)據(jù)源豐富,可部署在多種集群中

容錯性高。Spark引進了彈性分布式數(shù)據(jù)集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一組節(jié)點中的只讀對象集合,這些集合是彈性的,如果數(shù)據(jù)集一部分丟失,則可以根據(jù)“血統(tǒng)”(即充許基于數(shù)據(jù)衍生過程)對它們進行重建。另外在RDD計算時可以通過CheckPoint來實現(xiàn)容錯,而CheckPoint有兩種方式:CheckPoint Data,和Logging The Updates,用戶可以控制采用哪種方式來實現(xiàn)容錯。

Spark的適用場景

目前大數(shù)據(jù)處理場景有以下幾個類型:

  • 復雜的批量處理(Batch Data Processing),偏重點在于處理海量數(shù)據(jù)的能力,至于處理速度可忍受,通常的時間可能是在數(shù)十分鐘到數(shù)小時;
  • 基于歷史數(shù)據(jù)的交互式查詢(Interactive Query),通常的時間在數(shù)十秒到數(shù)十分鐘之間
  • 基于實時數(shù)據(jù)流的數(shù)據(jù)處理(Streaming Data Processing),通常在數(shù)百毫秒到數(shù)秒之間

Spark成功案例 目前大數(shù)據(jù)在互聯(lián)網(wǎng)公司主要應(yīng)用在廣告、報表、推薦系統(tǒng)等業(yè)務(wù)上。在廣告業(yè)務(wù)方面需要大數(shù)據(jù)做應(yīng)用分析、效果分析、定向優(yōu)化等,在推薦系統(tǒng)方面則需要大數(shù)據(jù)優(yōu)化相關(guān)排名、個性化推薦以及熱點點擊分析等。這些應(yīng)用場景的普遍特點是計算量大、效率要求高。騰訊 / yahoo / 淘寶 / 優(yōu)酷土豆

spark運行架構(gòu)

spark基礎(chǔ)運行架構(gòu)如下所示:

 

spark結(jié)合yarn集群背后的運行流程如下所示:

 

spark 運行流程:

Spark架構(gòu)采用了分布式計算中的Master-Slave模型。Master是對應(yīng)集群中的含有Master進程的節(jié)點,Slave是集群中含有Worker進程的節(jié)點。Master作為整個集群的控制器,負責整個集群的正常運行;Worker相當于計算節(jié)點,接收主節(jié)點命令與進行狀態(tài)匯報;Executor負責任務(wù)的執(zhí)行;Client作為用戶的客戶端負責提交應(yīng)用,Driver負責控制一個應(yīng)用的執(zhí)行。

Spark集群部署后,需要在主節(jié)點和從節(jié)點分別啟動Master進程和Worker進程,對整個集群進行控制。在一個Spark應(yīng)用的執(zhí)行過程中,Driver和Worker是兩個重要角色。Driver 程序是應(yīng)用邏輯執(zhí)行的起點,負責作業(yè)的調(diào)度,即Task任務(wù)的分發(fā),而多個Worker用來管理計算節(jié)點和創(chuàng)建Executor并行處理任務(wù)。在執(zhí)行階段,Driver會將Task和Task所依賴的file和jar序列化后傳遞給對應(yīng)的Worker機器,同時Executor對相應(yīng)數(shù)據(jù)分區(qū)的任務(wù)進行處理。

  • Excecutor /Task 每個程序自有,不同程序互相隔離,task多線程并行,
  • 集群對Spark透明,Spark只要能獲取相關(guān)節(jié)點和進程
  • Driver 與Executor保持通信,協(xié)作處理

三種集群模式:

  1. Standalone 獨立集群
  2. Mesos, apache mesos
  3. Yarn, hadoop yarn

基本概念:

  1. Application =>Spark的應(yīng)用程序,包含一個Driver program和若干Executor
  2. SparkContext => Spark應(yīng)用程序的入口,負責調(diào)度各個運算資源,協(xié)調(diào)各個Worker Node上的Executor
  3. Driver Program => 運行Application的main()函數(shù)并且創(chuàng)建SparkContext
  4. Executor => 是為Application運行在Worker node上的一個進程,該進程負責運行Task,并且負責將數(shù)據(jù)存在內(nèi)存或者磁盤上。每個Application都會申請各自的Executor來處理任務(wù)
  5. Cluster Manager =>在集群上獲取資源的外部服務(wù) (例如:Standalone、Mesos、Yarn)
  6. Worker Node => 集群中任何可以運行Application代碼的節(jié)點,運行一個或多個Executor進程
  7. Task => 運行在Executor上的工作單元
  8. Job => SparkContext提交的具體Action操作,常和Action對應(yīng)
  9. Stage => 每個Job會被拆分很多組task,每組任務(wù)被稱為Stage,也稱TaskSet
  10. RDD => 是Resilient distributed datasets的簡稱,中文為彈性分布式數(shù)據(jù)集;是Spark最核心的模塊和類
  11. DAGScheduler => 根據(jù)Job構(gòu)建基于Stage的DAG,并提交Stage給TaskScheduler
  12. TaskScheduler => 將Taskset提交給Worker node集群運行并返回結(jié)果
  13. Transformations => 是Spark API的一種類型,Transformation返回值還是一個RDD,所有的Transformation采用的都是懶策略,如果只是將Transformation提交是不會執(zhí)行計算的
  14. Action => 是Spark API的一種類型,Action返回值不是一個RDD,而是一個scala集合;計算只有在Action被提交的時候計算才被觸發(fā)。

Spark核心概念之RDD

 

Spark核心概念之Transformations / Actions

 

Transformation返回值還是一個RDD。它使用了鏈式調(diào)用的設(shè)計模式,對一個RDD進行計算后,變換成另外一個RDD,然后這個RDD又可以進行另外一次轉(zhuǎn)換。這個過程是分布式的。 Action返回值不是一個RDD。它要么是一個Scala的普通集合,要么是一個值,要么是空,最終或返回到Driver程序,或把RDD寫入到文件系統(tǒng)中。

Action是返回值返回給driver或者存儲到文件,是RDD到result的變換,Transformation是RDD到RDD的變換。

只有action執(zhí)行時,rdd才會被計算生成,這是rdd懶惰執(zhí)行的根本所在。

Spark核心概念之Jobs / Stage

  • Job => 包含多個task的并行計算,一個action觸發(fā)一個job
  • stage => 一個job會被拆為多組task,每組任務(wù)稱為一個stage,以shuffle進行劃分

 

Spark核心概念之Shuffle

以reduceByKey為例解釋shuffle過程。

 

在沒有task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=false)

 

fetch 來的數(shù)據(jù)存放到哪里?

剛 fetch 來的 FileSegment 存放在 softBuffer 緩沖區(qū),經(jīng)過處理后的數(shù)據(jù)放在內(nèi)存 + 磁盤上。這里我們主要討論處理后的數(shù)據(jù),可以靈活設(shè)置這些數(shù)據(jù)是“只用內(nèi)存”還是“內(nèi)存+磁盤”。如果spark.shuffle.spill = false就只用內(nèi)存。由于不要求數(shù)據(jù)有序,shuffle write 的任務(wù)很簡單:將數(shù)據(jù) partition 好,并持久化。之所以要持久化,一方面是要減少內(nèi)存存儲空間壓力,另一方面也是為了 fault-tolerance。

shuffle之所以需要把中間結(jié)果放到磁盤文件中,是因為雖然上一批task結(jié)束了,下一批task還需要使用內(nèi)存。如果全部放在內(nèi)存中,內(nèi)存會不夠。另外一方面為了容錯,防止任務(wù)掛掉。

存在問題如下:

  1. 產(chǎn)生的 FileSegment 過多。每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件。
  2. 緩沖區(qū)占用內(nèi)存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 MR 個 bucket。雖然一個 ShuffleMapTask 結(jié)束后,對應(yīng)的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達到 cores R 個(一般 worker 同時可以運行 cores 個 ShuffleMapTask),占用的內(nèi)存空間也就達到了cores R 32 KB。對于 8 核 1000 個 reducer 來說,占用內(nèi)存就是 256MB。

為了解決上述問題,我們可以使用文件合并的功能。

在進行task的文件分片合并下的shuffle過程如下:(spark.shuffle.consolidateFiles=true)

 

可以明顯看出,在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數(shù)降為 cores * R。FileConsolidation 功能可以通過spark.shuffle.consolidateFiles=true來開啟。

Spark核心概念之Cache

 

  1. val rdd1 = ... // 讀取hdfs數(shù)據(jù),加載成RDD  
  2. rdd1.cache  
  3. val rdd2 = rdd1.map(...)  
  4. val rdd3 = rdd1.filter(...)  
  5. rdd2.take(10).foreach(println)  
  6. rdd3.take(10).foreach(println)  
  7. rdd1.unpersist 

cache和unpersisit兩個操作比較特殊,他們既不是action也不是transformation。cache會將標記需要緩存的rdd,真正緩存是在***次被相關(guān)action調(diào)用后才緩存;unpersisit是抹掉該標記,并且立刻釋放內(nèi)存。只有action執(zhí)行時,rdd1才會開始創(chuàng)建并進行后續(xù)的rdd變換計算。

cache其實也是調(diào)用的persist持久化函數(shù),只是選擇的持久化級別為MEMORY_ONLY。

persist支持的RDD持久化級別如下:

 

需要注意的問題: Cache或shuffle場景序列化時, spark序列化不支持protobuf message,需要java 可以serializable的對象。一旦在序列化用到不支持java serializable的對象就會出現(xiàn)上述錯誤。 Spark只要寫磁盤,就會用到序列化。除了shuffle階段和persist會序列化,其他時候RDD處理都在內(nèi)存中,不會用到序列化。

Spark Streaming運行原理

spark程序是使用一個spark應(yīng)用實例一次性對一批歷史數(shù)據(jù)進行處理,spark streaming是將持續(xù)不斷輸入的數(shù)據(jù)流轉(zhuǎn)換成多個batch分片,使用一批spark應(yīng)用實例進行處理。

 

從原理上看,把傳統(tǒng)的spark批處理程序變成streaming程序,spark需要構(gòu)建什么?

 

 

需要構(gòu)建4個東西:

  • 一個靜態(tài)的 RDD DAG 的模板,來表示處理邏輯;
  • 一個動態(tài)的工作控制器,將連續(xù)的 streaming data 切分數(shù)據(jù)片段,并按照模板復制出新的 RDD 3. DAG 的實例,對數(shù)據(jù)片段進行處理;
  • Receiver進行原始數(shù)據(jù)的產(chǎn)生和導入;Receiver將接收到的數(shù)據(jù)合并為數(shù)據(jù)塊并存到內(nèi)存或硬盤中,供后續(xù)batch RDD進行消費
  • 對長時運行任務(wù)的保障,包括輸入數(shù)據(jù)的失效后的重構(gòu),處理任務(wù)的失敗后的重調(diào)。

具體streaming的詳細原理可以參考廣點通出品的源碼解析文章:

  • https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/0.1%20Spark%20Streaming%20%E5%AE%9E%E7%8E%B0%E6%80%9D%E8%B7%AF%E4%B8%8E%E6%A8%A1%E5%9D%97%E6%A6%82%E8%BF%B0.md#24

對于spark streaming需要注意以下三點:

盡量保證每個work節(jié)點中的數(shù)據(jù)不要落盤,以提升執(zhí)行效率。

 

保證每個batch的數(shù)據(jù)能夠在batch interval時間內(nèi)處理完畢,以免造成數(shù)據(jù)堆積。

 

使用steven提供的框架進行數(shù)據(jù)接收時的預處理,減少不必要數(shù)據(jù)的存儲和傳輸。從tdbank中接收后轉(zhuǎn)儲前進行過濾,而不是在task具體處理時才進行過濾。

 

 

Spark 資源調(diào)優(yōu)

內(nèi)存管理:

 

Executor的內(nèi)存主要分為三塊:

  • ***塊是讓task執(zhí)行我們自己編寫的代碼時使用,默認是占Executor總內(nèi)存的20%;
  • 第二塊是讓task通過shuffle過程拉取了上一個stage的task的輸出后,進行聚合等操作時使用,默認也是占Executor總內(nèi)存的20%;
  • 第三塊是讓RDD持久化時使用,默認占Executor總內(nèi)存的60%。

每個task以及每個executor占用的內(nèi)存需要分析一下。每個task處理一個partiiton的數(shù)據(jù),分片太少,會造成內(nèi)存不夠。

其他資源配置:

 

具體調(diào)優(yōu)可以參考美團點評出品的調(diào)優(yōu)文章:

  • http://tech.meituan.com/spark-tuning-basic.html
  • http://tech.meituan.com/spark-tuning-pro.html

Spark 環(huán)境搭建

spark tdw以及tdbank api文檔:

  • http://git.code.oa.com/tdw/tdw-spark-common/wikis/api
責任編輯:未麗燕 來源: 網(wǎng)絡(luò)大數(shù)據(jù)
相關(guān)推薦

2017-06-06 08:31:10

Spark Strea計算模型監(jiān)控

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2017-08-14 10:30:13

SparkSpark Strea擴容

2018-03-21 11:05:26

Spark大數(shù)據(jù)應(yīng)用程序

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2020-12-04 14:31:45

大數(shù)據(jù)Spark

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺

2016-05-11 10:29:54

Spark Strea數(shù)據(jù)清理Spark

2017-04-13 09:10:06

Spark解釋關(guān)系

2020-05-26 15:05:30

Spark安裝環(huán)境

2021-12-26 00:03:25

Spark性能調(diào)優(yōu)

2019-09-30 08:28:53

Delta LakeSpark數(shù)據(jù)原理

2019-10-17 09:25:56

Spark StreaPVUV

2017-05-25 09:45:35

2016-10-24 09:52:45

SparkRDD容錯

2016-10-24 23:04:56

SparkRDD數(shù)據(jù)

2024-04-11 11:04:05

Redis

2023-10-24 20:32:40

大數(shù)據(jù)

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)
點贊
收藏

51CTO技術(shù)棧公眾號