自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Spark streaming中持久保存的RDD/有狀態(tài)的內(nèi)存

存儲(chǔ) 存儲(chǔ)軟件 Spark
以spark streaming為例,就是希望有個(gè)數(shù)據(jù)集能夠在當(dāng)前批次中更新,再下個(gè)批次后又可以繼續(xù)訪問(wèn)。一個(gè)最簡(jiǎn)單的實(shí)現(xiàn)是在driver的內(nèi)存中,我們可以自行保存一個(gè)大的內(nèi)存結(jié)構(gòu)。這種hack的方式就是我們無(wú)法利用spark提供的分布式計(jì)算的能力。

在面向流處理的分布式計(jì)算中,經(jīng)常會(huì)有這種需求,希望需要處理的某個(gè)數(shù)據(jù)集能夠不隨著流式數(shù)據(jù)的流逝而消失。

以spark streaming為例,就是希望有個(gè)數(shù)據(jù)集能夠在當(dāng)前批次中更新,再下個(gè)批次后又可以繼續(xù)訪問(wèn)。一個(gè)最簡(jiǎn)單的實(shí)現(xiàn)是在driver的內(nèi)存中,我們可以自行保存一個(gè)大的內(nèi)存結(jié)構(gòu)。這種hack的方式就是我們無(wú)法利用spark提供的分布式計(jì)算的能力。

對(duì)此,spark streaming提供了stateful streaming, 可以創(chuàng)建一個(gè)有狀態(tài)的DStream,我們可以操作一個(gè)跨越不同批次的RDD。

[[226324]]

1 updateStateByKey

該方法提供了這樣的一種機(jī)制: 維護(hù)了一個(gè)可以跨越不同批次的RDD, 姑且成為StateRDD,在每個(gè)批次遍歷StateRDD的所有數(shù)據(jù),對(duì)每條數(shù)據(jù)執(zhí)行update方法。當(dāng)update方法返回None時(shí),淘汰StateRDD中的該條數(shù)據(jù)。

具體接口如下:

  1. /** 
  2.  * Return a new "state" DStream where the state for each key is updated by applying 
  3.  * the given function on the previous state of the key and the new values of each key
  4.  * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. 
  5.  * @param updateFunc State update function. If `this` function returns None, then 
  6.  *                   corresponding state key-value pair will be eliminated. 
  7.  * @param numPartitions Number of partitions of each RDD in the new DStream. 
  8.  * @tparam S State type 
  9.  */ 
  10. def updateStateByKey[S: ClassTag]( 
  11.     updateFunc: (Seq[V], Option[S]) => Option[S], 
  12.     numPartitions: Int 
  13.   ): DStream[(K, S)] = ssc.withScope { 
  14.   updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) 

即用戶(hù)需要實(shí)現(xiàn)一個(gè)updateFunc的函數(shù),該函數(shù)的參數(shù):

Seq[V] 該批次中相同key的數(shù)據(jù),以Seq數(shù)組形式傳遞

Option[S] 歷史狀態(tài)中的數(shù)據(jù)

返回值: 返回需要保持的歷史狀態(tài)數(shù)據(jù),為None時(shí)表示刪除該數(shù)據(jù)

  1. def updateStateFunc(lines: Seq[Array[String]], state: Option[Array[String]]): Option[Array[String]] = {...} 

這種做法簡(jiǎn)單清晰明了,但是其中有一些可以?xún)?yōu)化的地方:

a) 如果DRDD增長(zhǎng)到比較大的時(shí)候,而每個(gè)進(jìn)入的批次數(shù)據(jù)量相比并不大,此時(shí)每次都需要遍歷DRDD,無(wú)論該批次中是否有數(shù)據(jù)需要更新DRDD。這種情況有的時(shí)候可能會(huì)引發(fā)性能問(wèn)題。

b) 需要用戶(hù)自定義數(shù)據(jù)的淘汰機(jī)制。有的時(shí)候顯得不是那么方便。

c) 返回的類(lèi)型需要和緩存中的類(lèi)型相同。類(lèi)型不能發(fā)生改變。

2 mapWithState

該接口是對(duì)updateSateByKey的改良,解決了updateStateFunc中可以?xún)?yōu)化的地方:

  1. * :: Experimental :: 
  2. Return a [[MapWithStateDStream]] by applying a function to every key-value element of 
  3. * `this` stream, while maintaining some state data for each unique key. The mapping function 
  4. and other specification (e.g. partitioners, timeouts, initial state data, etc.) of this 
  5. * transformation can be specified using [[StateSpec]] class. The state data is accessible in 
  6. as a parameter of type [[State]] in the mapping function
  7. * Example of using `mapWithState`: 
  8. * {{{ 
  9. *    // A mapping function that maintains an integer state and return a String 
  10. *    def mappingFunction(key: String, value: Option[Int], state: State[Int]): Option[String] = { 
  11. *      // Use state.exists(), state.get(), state.update() and state.remove() 
  12. *      // to manage state, and return the necessary string 
  13. *    } 
  14. *    val spec = StateSpec.function(mappingFunction).numPartitions(10) 
  15. *    val mapWithStateDStream = keyValueDStream.mapWithState[StateType, MappedType](spec) 
  16. * }}} 
  17. * @param spec          Specification of this transformation 
  18. * @tparam StateType    Class type of the state data 
  19. * @tparam MappedType   Class type of the mapped data 
  20. */ 
  21. @Experimental 
  22. def mapWithState[StateType: ClassTag, MappedType: ClassTag]( 
  23.     spec: StateSpec[K, V, StateType, MappedType] 
  24.   ): MapWithStateDStream[K, V, StateType, MappedType] = { 
  25.   new MapWithStateDStreamImpl[K, V, StateType, MappedType]( 
  26.     self, 
  27.     spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]] 
  28.   ) 

其中spec封裝了用戶(hù)自定義的函數(shù),用以更新緩存數(shù)據(jù):

  1. mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType 

實(shí)現(xiàn)樣例如下:

  1. val mappingFunc = (k: String, line: Option[Array[String]], state: State[Array[String]]) => {...} 

參數(shù)分別代表:

數(shù)據(jù)的key: k

RDD中的每行數(shù)據(jù): line

state: 緩存數(shù)據(jù)

當(dāng)對(duì)state調(diào)用remove方法時(shí),該數(shù)據(jù)會(huì)被刪除。

注意,如果數(shù)據(jù)超時(shí),不要調(diào)用remove方法,因?yàn)閟park會(huì)在mappingFunc后自動(dòng)調(diào)用remove。

a) 與updateStateByKey 每次都要遍歷緩存數(shù)據(jù)不同,mapWithState每次遍歷每個(gè)批次中的數(shù)據(jù),更新緩存中的數(shù)據(jù)。對(duì)于緩存數(shù)據(jù)較大的情況來(lái)說(shuō),性能會(huì)有較大提升。

b) 提供了內(nèi)置的超時(shí)機(jī)制,當(dāng)數(shù)據(jù)一定時(shí)間內(nèi)沒(méi)有更新時(shí),淘汰相應(yīng)數(shù)據(jù)。

注意,當(dāng)有數(shù)據(jù)到來(lái)或者有超時(shí)發(fā)生時(shí),mappingFunc都會(huì)被調(diào)用。

3 checkpointing

通常情況下,在一個(gè)DStream鐘,對(duì)RDD的各種轉(zhuǎn)換而依賴(lài)的數(shù)據(jù)都是來(lái)自于當(dāng)前批次中。但是當(dāng)在進(jìn)行有狀態(tài)的transformations時(shí),包括updateStateByKey/reduceByKeyAndWindow 、mapWithSate,還會(huì)依賴(lài)于以前批次的數(shù)據(jù),RDD的容錯(cuò)機(jī)制,在異常情況需要重新計(jì)算RDD時(shí),需要以前批次的RDD信息。如果這個(gè)依賴(lài)的鏈路過(guò)長(zhǎng),會(huì)需要大量的內(nèi)存,即使有些RDD的數(shù)據(jù)在內(nèi)存中,不需要計(jì)算。此時(shí)spark通過(guò)checkpoint來(lái)打破依賴(lài)鏈路。checkpoint會(huì)生成一個(gè)新的RDD到hdfs中,該RDD是計(jì)算后的結(jié)果集,而沒(méi)有對(duì)之前的RDD依賴(lài)。

此時(shí)一定要啟用checkpointing,以進(jìn)行周期性的RDD Checkpointing

在StateDstream在實(shí)現(xiàn)RDD的compute方法時(shí),就是將之前的PreStateRDD與當(dāng)前批次中依賴(lài)的ParentRDD進(jìn)行合并。

而checkpoint的實(shí)現(xiàn)是將上述合并的RDD寫(xiě)入HDFS中。

現(xiàn)在checkpoint的實(shí)現(xiàn)中,數(shù)據(jù)寫(xiě)入hdfs的過(guò)程是由一個(gè)固定的線程池異步完成的。一種存在的風(fēng)險(xiǎn)是上次checkpoint的數(shù)據(jù)尚未完成,此次又來(lái)了新的要寫(xiě)的checkpoint數(shù)據(jù),會(huì)加大集群的負(fù)載,可能會(huì)引發(fā)一系列的問(wèn)題。

4 checkpoint周期設(shè)置:

對(duì)mapWithStateByKey/updateStateByKey返回的DStream可以調(diào)用checkpoint方法設(shè)置checkpoint的周期。注意傳遞的時(shí)間只能是批次時(shí)間的整數(shù)倍。

另外,對(duì)于mapWithState而言,checkpoint執(zhí)行時(shí),才會(huì)進(jìn)行數(shù)據(jù)的刪除。 State.remove方法只是設(shè)置狀態(tài),標(biāo)記為刪除,數(shù)據(jù)并不會(huì)真的刪除。 SnapShot方法還是可以獲取得到。

責(zé)任編輯:武曉燕 來(lái)源: 數(shù)客聯(lián)盟
相關(guān)推薦

2016-10-24 09:52:45

SparkRDD容錯(cuò)

2016-10-24 23:04:56

SparkRDD數(shù)據(jù)

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺(tái)

2017-04-25 09:50:16

SparkRDD核心

2017-08-14 10:30:13

SparkSpark Strea擴(kuò)容

2017-06-06 08:31:10

Spark Strea計(jì)算模型監(jiān)控

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2024-04-30 11:14:19

KubernetesReplicaSet數(shù)量

2023-10-24 20:32:40

大數(shù)據(jù)

2019-10-08 11:10:18

React自動(dòng)保存前端

2021-08-20 16:37:42

SparkSpark Strea

2019-12-13 08:25:26

FlinkSpark Strea流數(shù)據(jù)

2018-04-09 12:25:11

2016-05-11 10:29:54

Spark Strea數(shù)據(jù)清理Spark

2018-05-28 08:54:45

SparkRDD Cache緩存

2021-07-09 10:27:12

SparkStreaming系統(tǒng)

2017-08-04 10:58:55

RDDSpark算子

2018-05-10 09:51:39

Spark內(nèi)存Hadoop

2019-10-17 09:25:56

Spark StreaPVUV
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)