自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

解析SparkStreaming和Kafka集成的兩種方式

大數(shù)據(jù) Kafka Spark
Spark Streaming是基于微批處理的流式計算引擎,通常是利用Spark Core或者Spark Core與Spark Sql一起來處理數(shù)據(jù)。在企業(yè)實時處理架構中,通常將Spark Streaming和Kafka集成作為整個大數(shù)據(jù)處理架構的核心環(huán)節(jié)之一。

Spark Streaming是基于微批處理的流式計算引擎,通常是利用Spark Core或者Spark Core與Spark Sql一起來處理數(shù)據(jù)。在企業(yè)實時處理架構中,通常將Spark Streaming和Kafka集成作為整個大數(shù)據(jù)處理架構的核心環(huán)節(jié)之一。

針對不同的Spark、Kafka版本,集成處理數(shù)據(jù)的方式分為兩種:Receiver based Approach和Direct Approach,不同集成版本處理方式的支持,可參考下圖:

解析SparkStreaming和Kafka集成的兩種方式

Receiver based Approach

基于receiver的方式是使用kafka消費者高階API實現(xiàn)的。

對于所有的receiver,它通過kafka接收的數(shù)據(jù)會被存儲于spark的executors上,底層是寫入BlockManager中,默認200ms生成一個block(通過配置參數(shù)spark.streaming.blockInterval決定)。然后由spark streaming提交的job構建BlockRdd,最終以spark core任務的形式運行。

關于receiver方式,有以下幾點需要注意:

  • receiver作為一個常駐線程調度到executor上運行,占用一個cpu
  • receiver個數(shù)由KafkaUtils.createStream調用次數(shù)決定,一次一個receiver
  • kafka中的topic分區(qū)并不能關聯(lián)產生在spark streaming中的rdd分區(qū)
  • 增加在KafkaUtils.createStream()中的指定的topic分區(qū)數(shù),僅僅增加了單個receiver消費的topic的線程數(shù),它不會增加處理數(shù)據(jù)中的并行的spark的數(shù)量【topicMap[topic,num_threads]map的value對應的數(shù)值是每個topic對應的消費線程數(shù)】
  • receiver默認200ms生成一個block,建議根據(jù)數(shù)據(jù)量大小調整block生成周期。
  • receiver接收的數(shù)據(jù)會放入到BlockManager,每個executor都會有一個BlockManager實例,由于數(shù)據(jù)本地性,那些存在receiver的executor會被調度執(zhí)行更多的task,就會導致某些executor比較空閑

建議通過參數(shù)spark.locality.wait調整數(shù)據(jù)本地性。該參數(shù)設置的不合理,比如設置為10而任務2s就處理結束,就會導致越來越多的任務調度到數(shù)據(jù)存在的executor上執(zhí)行,導致任務執(zhí)行緩慢甚至失敗(要和數(shù)據(jù)傾斜區(qū)分開)

多個kafka輸入的DStreams可以使用不同的groups、topics創(chuàng)建,使用多個receivers接收處理數(shù)據(jù)

兩種receiver可靠的receiver:

  • 可靠的receiver在接收到數(shù)據(jù)并通過復制機制存儲在spark中時準確的向可靠的數(shù)據(jù)源發(fā)送ack確認不可靠的receiver:
  • 不可靠的receiver不會向數(shù)據(jù)源發(fā)送數(shù)據(jù)已接收確認。 這適用于用于不支持ack的數(shù)據(jù)源當然,我們也可以自定義receiver。
  • receiver處理數(shù)據(jù)可靠性默認情況下,receiver是可能丟失數(shù)據(jù)的。
  • 可以通過設置spark.streaming.receiver.writeAheadLog.enable為true開啟預寫日志機制,將數(shù)據(jù)先寫入一個可靠地分布式文件系統(tǒng)如hdfs,確保數(shù)據(jù)不丟失,但會失去一定性能

限制消費者消費的最大速率涉及三個參數(shù):

  • spark.streaming.backpressure.enabled:默認是false,設置為true,就開啟了背壓機制;
  • spark.streaming.backpressure.initialRate:默認沒設置初始消費速率,第一次啟動時每個receiver接收數(shù)據(jù)的最大值;
  • spark.streaming.receiver.maxRate:默認值沒設置,每個receiver接收數(shù)據(jù)的最大速率(每秒記錄數(shù))。每個流每秒最多將消費此數(shù)量的記錄,將此配置設置為0或負數(shù)將不會對最大速率進行限制

在產生job時,會將當前job有效范圍內的所有block組成一個BlockRDD,一個block對應一個分區(qū)

kafka082版本消費者高階API中,有分組的概念,建議使消費者組內的線程數(shù)(消費者個數(shù))和kafka分區(qū)數(shù)保持一致。如果多于分區(qū)數(shù),會有部分消費者處于空閑狀態(tài)

Direct Approach

direct approach是spark streaming不使用receiver集成kafka的方式,一般在企業(yè)生產環(huán)境中使用較多。相較于receiver,有以下特點:

1.不使用receiver

不需要創(chuàng)建多個kafka streams并聚合它們

減少不必要的CPU占用

減少了receiver接收數(shù)據(jù)寫入BlockManager,然后運行時再通過blockId、網(wǎng)絡傳輸、磁盤讀取等來獲取數(shù)據(jù)的整個過程,提升了效率

無需wal,進一步減少磁盤IO操作

2.direct方式生的rdd是KafkaRDD,它的分區(qū)數(shù)與kafka分區(qū)數(shù)保持一致一樣多的rdd分區(qū)來消費,更方便我們對并行度進行控制

注意:在shuffle或者repartition操作后生成的rdd,這種對應關系會失效

3.可以手動維護offset,實現(xiàn)exactly once語義

4.數(shù)據(jù)本地性問題。在KafkaRDD在compute函數(shù)中,使用SimpleConsumer根據(jù)指定的topic、分區(qū)、offset去讀取kafka數(shù)據(jù)。

但在010版本后,又存在假如kafka和spark處于同一集群存在數(shù)據(jù)本地性的問題

5.限制消費者消費的最大速率

spark.streaming.kafka.maxRatePerPartition:從每個kafka分區(qū)讀取數(shù)據(jù)的最大速率(每秒記錄數(shù))。這是針對每個分區(qū)進行限速,需要事先知道kafka分區(qū)數(shù),來評估系統(tǒng)的吞吐量。

責任編輯:未麗燕 來源: 阿里云棲社區(qū)
相關推薦

2010-07-27 15:03:37

Flex ArrayC

2011-08-08 14:13:47

iPhone XML NSXMLParse

2023-11-09 09:09:36

ZookeeperCP組件

2010-07-13 14:54:15

Perl面向對象編程

2011-03-03 10:26:04

Pureftpd

2021-05-27 10:57:01

TCP定時器網(wǎng)絡協(xié)議

2010-08-06 09:38:11

Flex讀取XML

2023-03-29 13:06:36

2011-07-01 17:50:13

Python 多線程

2009-06-25 13:43:00

Buffalo AJA

2010-10-21 16:24:18

sql server升

2012-12-24 13:30:34

iOS

2010-09-07 11:09:59

2011-04-02 09:48:38

深拷貝

2016-11-07 09:02:02

Malloc內存syscall

2011-06-16 10:02:08

JAVA靜態(tài)載入

2010-07-15 14:38:55

Perl eval函數(shù)

2009-09-08 15:22:20

Spring依賴注入

2010-08-03 13:27:04

FlexBuilder

2024-02-04 09:24:45

MyBatisSQL語句Spring
點贊
收藏

51CTO技術棧公眾號