自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Flink與Spark Streaming誰(shuí)的拳頭更硬

大數(shù)據(jù) Spark
流數(shù)據(jù)(或數(shù)據(jù)流)是指在時(shí)間分布和數(shù)量上無(wú)限的一系列動(dòng)態(tài)數(shù)據(jù)集合體,數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,因此必須實(shí)時(shí)計(jì)算給出秒級(jí)響應(yīng)。流式計(jì)算,顧名思義,就是對(duì)數(shù)據(jù)流進(jìn)行處理,是實(shí)時(shí)計(jì)算。

 前言

流數(shù)據(jù)(或數(shù)據(jù)流)是指在時(shí)間分布和數(shù)量上無(wú)限的一系列動(dòng)態(tài)數(shù)據(jù)集合體,數(shù)據(jù)的價(jià)值隨著時(shí)間的流逝而降低,因此必須實(shí)時(shí)計(jì)算給出秒級(jí)響應(yīng)。流式計(jì)算,顧名思義,就是對(duì)數(shù)據(jù)流進(jìn)行處理,是實(shí)時(shí)計(jì)算。

[[285452]]

架構(gòu)對(duì)比

生態(tài)

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

運(yùn)行模型

Spark Streaming 是微批處理,運(yùn)行的時(shí)候需要指定批處理的時(shí)間,每次運(yùn)行 job 時(shí)處理一個(gè)批次的數(shù)據(jù)

Flink 是基于事件驅(qū)動(dòng)的,事件可以理解為消息。事件驅(qū)動(dòng)的應(yīng)用程序是一種狀態(tài)應(yīng)用程序,它會(huì)從一個(gè)或者多個(gè)流中注入事件,通過(guò)觸發(fā)計(jì)算更新?tīng)顟B(tài),或外部動(dòng)作對(duì)注入的事件作出反應(yīng)。

運(yùn)行角色

Spark Streaming 運(yùn)行時(shí)的角色(standalone 模式)主要有:

Master:主要負(fù)責(zé)整體集群資源的管理和應(yīng)用程序調(diào)度;

Worker:負(fù)責(zé)單個(gè)節(jié)點(diǎn)的資源管理,driver 和 executor 的啟動(dòng)等;

Driver:用戶(hù)入口程序執(zhí)行的地方,即 SparkContext 執(zhí)行的地方,主要是 DAG 生成、stage 劃分、task 生成及調(diào)度;

Executor:負(fù)責(zé)執(zhí)行 task,反饋執(zhí)行狀態(tài)和執(zhí)行結(jié)果。

Flink 運(yùn)行時(shí)的角色(standalone 模式)主要有:

Jobmanager: 協(xié)調(diào)分布式執(zhí)行,他們調(diào)度任務(wù)、協(xié)調(diào) checkpoints、協(xié)調(diào)故障恢復(fù)等。至少有一個(gè) JobManager。高可用情況下可以啟動(dòng)多個(gè) JobManager,其中一個(gè)選舉為 leader,其余為 standby;

Taskmanager: 負(fù)責(zé)執(zhí)行具體的 tasks、緩存、交換數(shù)據(jù)流,至少有一個(gè) TaskManager;

Slot: 每個(gè) task slot 代表 TaskManager 的一個(gè)固定部分資源,Slot 的個(gè)數(shù)代表著 taskmanager 可并行執(zhí)行的 task 數(shù)。

編程模型對(duì)比

編程模型對(duì)比,主要是對(duì)比 flink 和 Spark Streaming 兩者在代碼編寫(xiě)上的區(qū)別。

Spark Streaming

Spark Streaming 與 kafka 的結(jié)合主要是兩種模型:

  • 基于 receiver dstream;
  • 基于 direct dstream。

以上兩種模型編程機(jī)構(gòu)近似,只是在 api 和內(nèi)部數(shù)據(jù)獲取有些區(qū)別,新版本的已經(jīng)取消了基于 receiver 這種模式,企業(yè)中通常采用基于 direct Dstream 的模式。

  1. val Array(brokers, topics) = args// 創(chuàng)建一個(gè)批處理時(shí)間是2s的context  
  2.  val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")  
  3.  val ssc = new StreamingContext(sparkConf, Seconds(2))  
  4.  // 使用broker和topic創(chuàng)建DirectStream  
  5.  val topicsSet = topics.split(",").toSet  
  6.  val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)  
  7.  val messages = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))  
  8.  // Get the lines, split them into words, count the words and print  
  9.  val lines = messages.map(_.value)  
  10.  val words = lines.flatMap(_.split(" "))  
  11.  val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)  
  12.  wordCounts.print() // 啟動(dòng)流  
  13.  ssc.start()  
  14.  ssc.awaitTermination() 

通過(guò)以上代碼我們可以 get 到:

  • 設(shè)置批處理時(shí)間
  • 創(chuàng)建數(shù)據(jù)流
  • 編寫(xiě)transform
  • 編寫(xiě)action
  • 啟動(dòng)執(zhí)行

Flink

接下來(lái)看 flink 與 kafka 結(jié)合是如何編寫(xiě)代碼的。Flink 與 kafka 結(jié)合是事件驅(qū)動(dòng),大家可能對(duì)此會(huì)有疑問(wèn),消費(fèi) kafka 的數(shù)據(jù)調(diào)用 poll 的時(shí)候是批量獲取數(shù)據(jù)的(可以設(shè)置批處理大小和超時(shí)時(shí)間),這就不能叫做事件觸發(fā)了。而實(shí)際上,flink 內(nèi)部對(duì) poll 出來(lái)的數(shù)據(jù)進(jìn)行了整理,然后逐條 emit,形成了事件觸發(fā)的機(jī)制。 下面的代碼是 flink 整合 kafka 作為 data source 和 data sink:

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
  2.  env.getConfig().disableSysoutLogging(); 
  3.  env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
  4.  env.enableCheckpointing(5000); // create a checkpoint every 5 seconds 
  5.  env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface 
  6.  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  
  7.  // ExecutionConfig.GlobalJobParameters 
  8.  env.getConfig().setGlobalJobParameters(null); DataStream<KafkaEvent> input = env 
  9.  .addSource( new FlinkKafkaConsumer010<>( 
  10.  parameterTool.getRequired("input-topic"), new KafkaEventSchema(), 
  11.  parameterTool.getProperties()) 
  12.  .assignTimestampsAndWatermarks(new CustomWatermarkExtractor())).setParallelism(1).rebalance() 
  13.  .keyBy("word"
  14.  .map(new RollingAdditionMapper()).setParallelism(0); 
  15.   
  16.  input.addSink( new FlinkKafkaProducer010<>( 
  17.  parameterTool.getRequired("output-topic"), new KafkaEventSchema(), 
  18.  parameterTool.getProperties())); 
  19.   
  20.  env.execute("Kafka 0.10 Example"); 

從 Flink 與 kafka 結(jié)合的代碼可以 get 到:

  • 注冊(cè)數(shù)據(jù) source
  • 編寫(xiě)運(yùn)行邏輯
  • 注冊(cè)數(shù)據(jù) sink

調(diào)用 env.execute 相比于 Spark Streaming 少了設(shè)置批處理時(shí)間,還有一個(gè)顯著的區(qū)別是 flink 的所有算子都是 lazy 形式的,調(diào)用 env.execute 會(huì)構(gòu)建 jobgraph。client 端負(fù)責(zé) Jobgraph 生成并提交它到集群運(yùn)行;而 Spark Streaming的操作算子分 action 和 transform,其中僅有 transform 是 lazy 形式,而且 DAG 生成、stage 劃分、任務(wù)調(diào)度是在 driver 端進(jìn)行的,在 client 模式下 driver 運(yùn)行于客戶(hù)端處。

任務(wù)調(diào)度原理

Spark 任務(wù)調(diào)度

Spark Streaming 任務(wù)如上文提到的是基于微批處理的,實(shí)際上每個(gè)批次都是一個(gè) Spark Core 的任務(wù)。對(duì)于編碼完成的 Spark Core 任務(wù)在生成到最終執(zhí)行結(jié)束主要包括以下幾個(gè)部分:

  • 構(gòu)建 DAG 圖;
  • 劃分 stage;
  • 生成 taskset;
  • 調(diào)度 task。

具體可參考圖 5:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

對(duì)于 job 的調(diào)度執(zhí)行有 fifo 和 fair 兩種模式,Task 是根據(jù)數(shù)據(jù)本地性調(diào)度執(zhí)行的。 假設(shè)每個(gè) Spark Streaming 任務(wù)消費(fèi)的 kafka topic 有四個(gè)分區(qū),中間有一個(gè) transform操作(如 map)和一個(gè) reduce 操作,如圖 6 所示:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

假設(shè)有兩個(gè) executor,其中每個(gè) executor 三個(gè)核,那么每個(gè)批次相應(yīng)的 task 運(yùn)行位置是固定的嗎?是否能預(yù)測(cè)? 由于數(shù)據(jù)本地性和調(diào)度不確定性,每個(gè)批次對(duì)應(yīng) kafka 分區(qū)生成的 task 運(yùn)行位置并不是固定的。

Flink 任務(wù)調(diào)度

對(duì)于 flink 的流任務(wù)客戶(hù)端首先會(huì)生成 StreamGraph,接著生成 JobGraph,然后將 jobGraph 提交給 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的轉(zhuǎn)變,最后由 jobManager 調(diào)度執(zhí)行。

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

如圖 7 所示有一個(gè)由 data source、MapFunction和 ReduceFunction 組成的程序,data source 和 MapFunction 的并發(fā)度都為 4,而 ReduceFunction 的并發(fā)度為 3。一個(gè)數(shù)據(jù)流由 Source-Map-Reduce 的順序組成,在具有 2 個(gè)TaskManager、每個(gè) TaskManager 都有 3 個(gè) Task Slot 的集群上運(yùn)行。

可以看出 flink 的拓?fù)渖商峤粓?zhí)行之后,除非故障,否則拓?fù)洳考?zhí)行位置不變,并行度由每一個(gè)算子并行度決定,類(lèi)似于 storm。而 spark Streaming 是每個(gè)批次都會(huì)根據(jù)數(shù)據(jù)本地性和資源情況進(jìn)行調(diào)度,無(wú)固定的執(zhí)行拓?fù)浣Y(jié)構(gòu)。 flink 是數(shù)據(jù)在拓?fù)浣Y(jié)構(gòu)里流動(dòng)執(zhí)行,而 Spark Streaming 則是對(duì)數(shù)據(jù)緩存批次并行處理。

時(shí)間機(jī)制對(duì)比

流處理的時(shí)間

流處理程序在時(shí)間概念上總共有三個(gè)時(shí)間概念:

  • 處理時(shí)間

處理時(shí)間是指每臺(tái)機(jī)器的系統(tǒng)時(shí)間,當(dāng)流程序采用處理時(shí)間時(shí)將使用運(yùn)行各個(gè)運(yùn)算符實(shí)例的機(jī)器時(shí)間。處理時(shí)間是最簡(jiǎn)單的時(shí)間概念,不需要流和機(jī)器之間的協(xié)調(diào),它能提供最好的性能和最低延遲。然而在分布式和異步環(huán)境中,處理時(shí)間不能提供消息事件的時(shí)序性保證,因?yàn)樗艿较鬏斞舆t,消息在算子之間流動(dòng)的速度等方面制約。

  • 事件時(shí)間

事件時(shí)間是指事件在其設(shè)備上發(fā)生的時(shí)間,這個(gè)時(shí)間在事件進(jìn)入 flink 之前已經(jīng)嵌入事件,然后 flink 可以提取該時(shí)間。基于事件時(shí)間進(jìn)行處理的流程序可以保證事件在處理的時(shí)候的順序性,但是基于事件時(shí)間的應(yīng)用程序必須要結(jié)合 watermark 機(jī)制?;谑录r(shí)間的處理往往有一定的滯后性,因?yàn)樗枰却罄m(xù)事件和處理無(wú)序事件,對(duì)于時(shí)間敏感的應(yīng)用使用的時(shí)候要慎重考慮。

  • 注入時(shí)間

注入時(shí)間是事件注入到 flink 的時(shí)間。事件在 source 算子處獲取 source 的當(dāng)前時(shí)間作為事件注入時(shí)間,后續(xù)的基于時(shí)間的處理算子會(huì)使用該時(shí)間處理數(shù)據(jù)。

相比于事件時(shí)間,注入時(shí)間不能夠處理無(wú)序事件或者滯后事件,但是應(yīng)用程序無(wú)序指定如何生成 watermark。在內(nèi)部注入時(shí)間程序的處理和事件時(shí)間類(lèi)似,但是時(shí)間戳分配和 watermark 生成都是自動(dòng)的。

圖 8 可以清晰地看出三種時(shí)間的區(qū)別:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

Spark 時(shí)間機(jī)制

Spark Streaming 只支持處理時(shí)間,Structured streaming 支持處理時(shí)間和事件時(shí)間,同時(shí)支持 watermark 機(jī)制處理滯后數(shù)據(jù)。

Flink 時(shí)間機(jī)制

flink 支持三種時(shí)間機(jī)制:事件時(shí)間,注入時(shí)間,處理時(shí)間,同時(shí)支持 watermark 機(jī)制處理滯后數(shù)據(jù)。

kafka 動(dòng)態(tài)分區(qū)檢測(cè)

Spark Streaming

對(duì)于有實(shí)時(shí)處理業(yè)務(wù)需求的企業(yè),隨著業(yè)務(wù)增長(zhǎng)數(shù)據(jù)量也會(huì)同步增長(zhǎng),將導(dǎo)致原有的 kafka 分區(qū)數(shù)不滿足數(shù)據(jù)寫(xiě)入所需的并發(fā)度,需要擴(kuò)展 kafka 的分區(qū)或者增加 kafka 的 topic,這時(shí)就要求實(shí)時(shí)處理程序,如 SparkStreaming、flink 能檢測(cè)到 kafka 新增的 topic 、分區(qū)及消費(fèi)新增分區(qū)的數(shù)據(jù)。

接下來(lái)結(jié)合源碼分析,Spark Streaming 和 flink 在 kafka 新增 topic 或 partition 時(shí)能否動(dòng)態(tài)發(fā)現(xiàn)新增分區(qū)并消費(fèi)處理新增分區(qū)的數(shù)據(jù)。 Spark Streaming 與 kafka 結(jié)合有兩個(gè)區(qū)別比較大的版本,如圖 9 所示是官網(wǎng)給出的對(duì)比數(shù)據(jù):

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

其中確認(rèn)的是 Spark Streaming 與 kafka 0.8 版本結(jié)合不支持動(dòng)態(tài)分區(qū)檢測(cè),與 0.10 版本結(jié)合支持,接著通過(guò)源碼分析。

Spark Streaming 與 kafka 0.8 版本結(jié)合

*源碼分析只針對(duì)分區(qū)檢測(cè)

入口是 DirectKafkaInputDStream 的 compute:

  1. override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {// 改行代碼會(huì)計(jì)算這個(gè)job,要消費(fèi)的每個(gè)kafka分區(qū)的最大偏移 
  2.  val untilOffsets = clamp(latestLeaderOffsets(maxRetries))// 構(gòu)建KafkaRDD,用指定的分區(qū)數(shù)和要消費(fèi)的offset范圍 
  3.  val rdd = KafkaRDD[K, V, U, T, R]( 
  4.  context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler) // Report the record number and metadata of this batch interval to InputInfoTracker. 
  5.  val offsetRanges = currentOffsets.map { case (tp, fo) => 
  6.  val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) 
  7.  } val description = offsetRanges.filter { offsetRange => 
  8.  // Don't display empty ranges. 
  9.  offsetRange.fromOffset != offsetRange.untilOffset 
  10.  }.map { offsetRange => 
  11.  s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + 
  12.  s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" 
  13.  }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user 
  14.  val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) 
  15.  ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) 
  16.  
  17.  currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) Some(rdd) 
  18.  } 

第一行就是計(jì)算得到該批次生成 KafkaRDD 每個(gè)分區(qū)要消費(fèi)的最大 offset。 接著看 latestLeaderOffsets(maxRetries)

  1. @tailrec protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {// 可以看到的是用來(lái)指定獲取最大偏移分區(qū)的列表還是只有currentOffsets,沒(méi)有發(fā)現(xiàn)關(guān)于新增的分區(qū)的內(nèi)容。 
  2.  val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually 
  3.  if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err) 
  4.  } else { 
  5.  logError(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) 
  6.  latestLeaderOffsets(retries - 1) 
  7.  } 
  8.  } else { 
  9.  o.right.get 
  10.  } 
  11.  } 

其中 protected var currentOffsets = fromOffsets,這個(gè)僅僅是在構(gòu)建 DirectKafkaInputDStream 的時(shí)候初始化,并在 compute 里面更新:

  1. currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset) 

中間沒(méi)有檢測(cè) kafka 新增 topic 或者分區(qū)的代碼,所以可以確認(rèn) Spark Streaming 與 kafka 0.8 的版本結(jié)合不支持動(dòng)態(tài)分區(qū)檢測(cè)。

Spark Streaming 與 kafka 0.10 版本結(jié)合

入口同樣是 DirectKafkaInputDStream 的 compute 方法,撿主要的部分說(shuō),Compute 里第一行也是計(jì)算當(dāng)前 job 生成 kafkardd 要消費(fèi)的每個(gè)分區(qū)的最大 offset:

// 獲取當(dāng)前生成job,要用到的KafkaRDD每個(gè)分區(qū)最大消費(fèi)偏移值 val untilOffsets = clamp(latestOffsets())

具體檢測(cè) kafka 新增 topic 或者分區(qū)的代碼在 latestOffsets()

  1. /**  
  2. Returns the latest (highest) available offsets, taking new partitions into account. */ 
  3.  protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer 
  4.  paranoidPoll(c) // 獲取所有的分區(qū)信息 
  5.  val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets 
  6.  // 做差獲取新增的分區(qū)信息 
  7.  val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit 
  8.  // 新分區(qū)消費(fèi)位置,沒(méi)有記錄的化是由auto.offset.reset決定 
  9.  currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause 
  10.  c.pause(newPartitions.asJava) // find latest available offsets 
  11.  c.seekToEnd(currentOffsets.keySet.asJava) 
  12.  parts.map(tp => tp -> c.position(tp)).toMap 
  13.  } 

該方法內(nèi)有獲取 kafka 新增分區(qū),并將其更新到 currentOffsets 的過(guò)程,所以可以驗(yàn)證 Spark Streaming 與 kafka 0.10 版本結(jié)合支持動(dòng)態(tài)分區(qū)檢測(cè)。

Flink

入口類(lèi)是 FlinkKafkaConsumerBase,該類(lèi)是所有 flink 的 kafka 消費(fèi)者的父類(lèi)。

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

在 FlinkKafkaConsumerBase 的 run 方法中,創(chuàng)建了 kafkaFetcher,實(shí)際上就是消費(fèi)者:

  1. this.kafkaFetcher = createFetcher( 
  2.  sourceContext, 
  3.  subscribedPartitionsToStartOffsets, 
  4.  periodicWatermarkAssigner, 
  5.  punctuatedWatermarkAssigner, 
  6.  (StreamingRuntimeContext) getRuntimeContext(), 
  7.  offsetCommitMode, 
  8.  getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP), 
  9.  useMetrics); 

接是創(chuàng)建了一個(gè)線程,該線程會(huì)定期檢測(cè) kafka 新增分區(qū),然后將其添加到 kafkaFetcher 里。

  1. if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) { final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>(); this.discoveryLoopThread = new Thread(new Runnable() { @Override 
  2.  public void run() { try { // --------------------- partition discovery loop --------------------- 
  3.  
  4.  List<KafkaTopicPartition> discoveredPartitions; // throughout the loop, we always eagerly check if we are still running before 
  5.  // performing the next operation, so that we can escape the loop as soon as possible 
  6.  
  7.  while (running) { if (LOG.isDebugEnabled()) { LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask()); 
  8.  } try { 
  9.  discoveredPartitions = partitionDiscoverer.discoverPartitions(); 
  10.  } catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) { // the partition discoverer may have been closed or woken up before or during the discovery; 
  11.  // this would only happen if the consumer was canceled; simply escape the loop 
  12.  break; 
  13.  } // no need to add the discovered partitions if we were closed during the meantime 
  14.  if (running && !discoveredPartitions.isEmpty()) { 
  15.  kafkaFetcher.addDiscoveredPartitions(discoveredPartitions); 
  16.  } // do not waste any time sleeping if we're not running anymore 
  17.  if (running && discoveryIntervalMillis != 0) { try { Thread.sleep(discoveryIntervalMillis); 
  18.  } catch (InterruptedException iex) { // may be interrupted if the consumer was canceled midway; simply escape the loop 
  19.  break; 
  20.  } 
  21.  } 
  22.  } 
  23.  } catch (Exception e) { 
  24.  discoveryLoopErrorRef.set(e); 
  25.  } finally { // calling cancel will also let the fetcher loop escape 
  26.  // (if not running, cancel() was already called) 
  27.  if (running) { 
  28.  cancel(); 
  29.  } 
  30.  } 
  31.  } 
  32.  }, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks()); 
  33.  
  34.  discoveryLoopThread.start(); 
  35.  kafkaFetcher.runFetchLoop(); 

上面,就是 flink 動(dòng)態(tài)發(fā)現(xiàn) kafka 新增分區(qū)的過(guò)程。不過(guò)與 Spark 無(wú)需做任何配置不同的是,flink 動(dòng)態(tài)發(fā)現(xiàn) kafka 新增分區(qū),這個(gè)功能需要被使能的。也很簡(jiǎn)單,需要將 flink.partition-discovery.interval-millis 該屬性設(shè)置為大于 0 即可。

容錯(cuò)機(jī)制及處理語(yǔ)義

本節(jié)內(nèi)容主要是想對(duì)比兩者在故障恢復(fù)及如何保證僅一次的處理語(yǔ)義。這個(gè)時(shí)候適合拋出一個(gè)問(wèn)題:實(shí)時(shí)處理的時(shí)候,如何保證數(shù)據(jù)僅一次處理語(yǔ)義?

Spark Streaming 保證僅一次處理

對(duì)于 Spark Streaming 任務(wù),我們可以設(shè)置 checkpoint,然后假如發(fā)生故障并重啟,我們可以從上次 checkpoint 之處恢復(fù),但是這個(gè)行為只能使得數(shù)據(jù)不丟失,可能會(huì)重復(fù)處理,不能做到恰一次處理語(yǔ)義。

對(duì)于 Spark Streaming 與 kafka 結(jié)合的 direct Stream 可以自己維護(hù) offset 到 zookeeper、kafka 或任何其它外部系統(tǒng),每次提交完結(jié)果之后再提交 offset,這樣故障恢復(fù)重啟可以利用上次提交的 offset 恢復(fù),保證數(shù)據(jù)不丟失。但是假如故障發(fā)生在提交結(jié)果之后、提交 offset 之前會(huì)導(dǎo)致數(shù)據(jù)多次處理,這個(gè)時(shí)候我們需要保證處理結(jié)果多次輸出不影響正常的業(yè)務(wù)。

由此可以分析,假設(shè)要保證數(shù)據(jù)恰一次處理語(yǔ)義,那么結(jié)果輸出和 offset 提交必須在一個(gè)事務(wù)內(nèi)完成。在這里有以下兩種做法:

  • repartition(1) Spark Streaming 輸出的 action 變成僅一個(gè) partition,這樣可以利用事務(wù)去做:
  1. Dstream.foreachRDD(rdd=>{ 
  2.  rdd.repartition(1).foreachPartition(partition=>{ // 開(kāi)啟事務(wù) 
  3.  partition.foreach(each=>{// 提交數(shù)據(jù) 
  4.  }) // 提交事務(wù) 
  5.  }) 
  6.  }) 
  • 將結(jié)果和 offset 一起提交

也就是結(jié)果數(shù)據(jù)包含 offset。這樣提交結(jié)果和提交 offset 就是一個(gè)操作完成,不會(huì)數(shù)據(jù)丟失,也不會(huì)重復(fù)處理。故障恢復(fù)的時(shí)候可以利用上次提交結(jié)果帶的 offset。

Flink 與 kafka 0.11 保證僅一次處理

若要 sink 支持僅一次語(yǔ)義,必須以事務(wù)的方式寫(xiě)數(shù)據(jù)到 Kafka,這樣當(dāng)提交事務(wù)時(shí)兩次 checkpoint 間的所有寫(xiě)入操作作為一個(gè)事務(wù)被提交。這確保了出現(xiàn)故障或崩潰時(shí)這些寫(xiě)入操作能夠被回滾。

在一個(gè)分布式且含有多個(gè)并發(fā)執(zhí)行 sink 的應(yīng)用中,僅僅執(zhí)行單次提交或回滾是不夠的,因?yàn)樗薪M件都必須對(duì)這些提交或回滾達(dá)成共識(shí),這樣才能保證得到一致性的結(jié)果。Flink 使用兩階段提交協(xié)議以及預(yù)提交(pre-commit)階段來(lái)解決這個(gè)問(wèn)題。

本例中的 Flink 應(yīng)用如圖 11 所示包含以下組件:

  • 一個(gè)source,從Kafka中讀取數(shù)據(jù)(即KafkaConsumer)
  • 一個(gè)時(shí)間窗口化的聚會(huì)操作
  • 一個(gè)sink,將結(jié)果寫(xiě)回到Kafka(即KafkaProducer)

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

下面詳細(xì)講解 flink 的兩段提交思路:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

如圖 12 所示,F(xiàn)link checkpointing 開(kāi)始時(shí)便進(jìn)入到 pre-commit 階段。具體來(lái)說(shuō),一旦 checkpoint 開(kāi)始,F(xiàn)link 的 JobManager 向輸入流中寫(xiě)入一個(gè) checkpoint barrier ,將流中所有消息分割成屬于本次 checkpoint 的消息以及屬于下次 checkpoint 的,barrier 也會(huì)在操作算子間流轉(zhuǎn)。對(duì)于每個(gè) operator 來(lái)說(shuō),該 barrier 會(huì)觸發(fā) operator 狀態(tài)后端為該 operator 狀態(tài)打快照。data source 保存了 Kafka 的 offset,之后把 checkpoint barrier 傳遞到后續(xù)的 operator。

這種方式僅適用于 operator 僅有它的內(nèi)部狀態(tài)。內(nèi)部狀態(tài)是指 Flink state backends 保存和管理的內(nèi)容(如第二個(gè) operator 中 window 聚合算出來(lái)的 sum)。

當(dāng)一個(gè)進(jìn)程僅有它的內(nèi)部狀態(tài)的時(shí)候,除了在 checkpoint 之前將需要將數(shù)據(jù)更改寫(xiě)入到 state backend,不需要在預(yù)提交階段做其他的動(dòng)作。在 checkpoint 成功的時(shí)候,F(xiàn)link 會(huì)正確的提交這些寫(xiě)入,在 checkpoint 失敗的時(shí)候會(huì)終止提交,過(guò)程可見(jiàn)圖 13。

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

當(dāng)結(jié)合外部系統(tǒng)的時(shí)候,外部系統(tǒng)必須要支持可與兩階段提交協(xié)議捆綁使用的事務(wù)。顯然本例中的 sink 由于引入了 kafka sink,因此在預(yù)提交階段 data sink 必須預(yù)提交外部事務(wù)。如下圖:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

當(dāng) barrier 在所有的算子中傳遞一遍,并且觸發(fā)的快照寫(xiě)入完成,預(yù)提交階段完成。所有的觸發(fā)狀態(tài)快照都被視為 checkpoint 的一部分,也可以說(shuō) checkpoint 是整個(gè)應(yīng)用程序的狀態(tài)快照,包括預(yù)提交外部狀態(tài)。出現(xiàn)故障可以從 checkpoint 恢復(fù)。下一步就是通知所有的操作算子 checkpoint 成功。該階段 jobmanager 會(huì)為每個(gè) operator 發(fā)起 checkpoint 已完成的回調(diào)邏輯。

本例中 data source 和窗口操作無(wú)外部狀態(tài),因此該階段,這兩個(gè)算子無(wú)需執(zhí)行任何邏輯,但是 data sink 是有外部狀態(tài)的,因此,此時(shí)我們必須提交外部事務(wù),如下圖:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

以上就是 flink 實(shí)現(xiàn)恰一次處理的基本邏輯。

Back pressure

消費(fèi)者消費(fèi)的速度低于生產(chǎn)者生產(chǎn)的速度,為了使應(yīng)用正常,消費(fèi)者會(huì)反饋給生產(chǎn)者來(lái)調(diào)節(jié)生產(chǎn)者生產(chǎn)的速度,以使得消費(fèi)者需要多少,生產(chǎn)者生產(chǎn)多少。

*back pressure 后面一律稱(chēng)為背壓。

Spark Streaming 的背壓

Spark Streaming 跟 kafka 結(jié)合是存在背壓機(jī)制的,目標(biāo)是根據(jù)當(dāng)前 job 的處理情況來(lái)調(diào)節(jié)后續(xù)批次的獲取 kafka 消息的條數(shù)。為了達(dá)到這個(gè)目的,Spark Streaming 在原有的架構(gòu)上加入了一個(gè) RateController,利用的算法是 PID,需要的反饋數(shù)據(jù)是任務(wù)處理的結(jié)束時(shí)間、調(diào)度時(shí)間、處理時(shí)間、消息條數(shù),這些數(shù)據(jù)是通過(guò) SparkListener 體系獲得,然后通過(guò) PIDRateEsimator 的 compute 計(jì)算得到一個(gè)速率,進(jìn)而可以計(jì)算得到一個(gè) offset,然后跟限速設(shè)置最大消費(fèi)條數(shù)比較得到一個(gè)最終要消費(fèi)的消息最大 offset。

PIDRateEsimator 的 compute 方法如下:

  1. def compute( time: Long, // in milliseconds 
  2.  numElements: Long, processingDelay: Long, // in milliseconds 
  3.  schedulingDelay: Long // in milliseconds 
  4.  ): Option[Double] = { 
  5.  logTrace(s"\ntime = $time, # records = $numElements, " + 
  6.  s"processing time = $processingDelay, scheduling delay = $schedulingDelay") this.synchronized { if (time > latestTime && numElements > 0 && processingDelay > 0) { val delaySinceUpdate = (time - latestTime).toDouble / 1000 
  7.  
  8.  val processingRate = numElements.toDouble / processingDelay * 1000 
  9.  
  10.  val error = latestRate - processingRate val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis // in elements/(second ^ 2) 
  11.  val dError = (error - latestError) / delaySinceUpdate val newRate = (latestRate - proportional * error - 
  12.  integral * historicalError - 
  13.  derivative * dError).max(minRate) 
  14.  logTrace(s""" | latestRate = $latestRate, error = $error | latestError = $latestError, historicalError = $historicalError | delaySinceUpdate = $delaySinceUpdate, dError = $dError """.stripMargin) 
  15.  
  16.  latestTime = time if (firstRun) { 
  17.  latestRate = processingRate 
  18.  latestError = 0D 
  19.  firstRun = false 
  20.  logTrace("First run, rate estimation skipped") None 
  21.  } else { 
  22.  latestRate = newRate 
  23.  latestError = error 
  24.  logTrace(s"New rate = $newRate"Some(newRate) 
  25.  } 
  26.  } else { 
  27.  logTrace("Rate estimation skipped") None 
  28.  } 
  29.  } 
  30.  } 

Flink 的背壓

與 Spark Streaming 的背壓不同的是,F(xiàn)link 背壓是 jobmanager 針對(duì)每一個(gè) task 每 50ms 觸發(fā) 100 次 Thread.getStackTrace() 調(diào)用,求出阻塞的占比。過(guò)程如圖 16 所示:

 

關(guān)于流式計(jì)算:Flink與Spark Streaming誰(shuí)的拳頭更硬

 

阻塞占比在 web 上劃分了三個(gè)等級(jí):

  • OK: 0 <= Ratio <= 0.10,表示狀態(tài)良好;
  • LOW: 0.10 < Ratio <= 0.5,表示有待觀察;
  • HIGH: 0.5 < Ratio <= 1,表示要處理了。

 

責(zé)任編輯:武曉燕 來(lái)源: 今日頭條
相關(guān)推薦

2016-01-28 10:11:30

Spark StreaSpark大數(shù)據(jù)平臺(tái)

2023-10-24 20:32:40

大數(shù)據(jù)

2017-10-13 10:36:33

SparkSpark-Strea關(guān)系

2017-08-14 10:30:13

SparkSpark Strea擴(kuò)容

2017-06-06 08:31:10

Spark Strea計(jì)算模型監(jiān)控

2016-12-19 14:35:32

Spark Strea原理剖析數(shù)據(jù)

2010-04-23 22:06:13

機(jī)房監(jiān)控軟件Mocha BSM摩卡軟件

2017-10-11 11:10:02

Spark Strea大數(shù)據(jù)流式處理

2018-04-09 12:25:11

2016-05-11 10:29:54

Spark Strea數(shù)據(jù)清理Spark

2022-09-16 14:22:16

KafkaPulsar

2014-12-15 09:32:17

StormSpark

2015-11-03 16:59:54

SaaS

2021-08-20 16:37:42

SparkSpark Strea

2018-10-24 09:00:26

KafkaSpark數(shù)據(jù)

2013-05-31 10:07:59

大數(shù)據(jù)時(shí)代電影產(chǎn)業(yè)

2021-07-09 10:27:12

SparkStreaming系統(tǒng)

2018-04-18 08:54:28

RDD內(nèi)存Spark

2016-11-02 09:20:01

SparkHadoop MapR大數(shù)據(jù)

2019-10-17 09:25:56

Spark StreaPVUV
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)