利用 SMACK 堆棧構建可擴展數(shù)據(jù)處理平臺
在今天的文章中,我們將著重探討如何利用SMACK(即Spark、Mesos、Akka、Cassandra以及Kafka)堆棧構建可擴展數(shù)據(jù)處理平臺。雖然這套堆棧僅由數(shù)個簡單部分組成,但其能夠實現(xiàn)大量不同系統(tǒng)設計。除了純粹的批量或者流處理機制之外,我們亦可借此實現(xiàn)復雜的Lambda以及Kappa架構。
在本文開始闡述之前,讓我們首先立足于已有生產項目經(jīng)驗從設計與示例入手進行說明。
綜述
- Spark - 一套高速通用型引擎,用于實現(xiàn)分布式大規(guī)模數(shù)據(jù)處理任務。
- Mesos - 集群資源管理系統(tǒng),能夠立足于分布式應用程序提供行之有效的資源隔離與共享能力。
- Akka - 一套用于在JVM之上構建高并發(fā)、分布式及彈性消息驅動型應用程序的工具包與運行時。
- Cassandra - 一套分布式高可用性數(shù)據(jù)庫,旨在跨越多座數(shù)據(jù)中心處理大規(guī)模數(shù)據(jù)。
- Kafka -一套高吞吐能力、低延遲、分布式消息收發(fā)系統(tǒng)/提交日志方案,旨在處理實時數(shù)據(jù)供給。
存儲層: Cassandra
Cassandra一直以其高可用性與高吞吐能力兩大特性而備受矚目,其同時能夠處理極為可觀的寫入負載并具備節(jié)點故障容錯能力。以CAP原則為基礎,Cassandra能夠為業(yè)務運營提供可調整的一致性/可用性水平。
更有趣的是,Cassandra在處理數(shù)據(jù)時擁有線性可擴展能力(即可通過向集群當中添加節(jié)點的方式實現(xiàn)負載增容)并能夠提供跨數(shù)據(jù)中心復制(簡稱XDCR)能力。事實上,跨數(shù)據(jù)中心復制功能除了數(shù)據(jù)復制,同時也能夠實現(xiàn)以下各類擴展用例:
地理分布式數(shù)據(jù)中心處理面向特定區(qū)域或者客戶周邊位置之數(shù)據(jù)。
在不同數(shù)據(jù)中心之間者數(shù)據(jù)遷移,從而實現(xiàn)故障后恢復或者將數(shù)據(jù)移動至新數(shù)據(jù)中心。
對運營工作負載與分析工作負載加以拆分。
但上述特性也都有著自己的實現(xiàn)成本,而對于Cassandra而言這種成本體現(xiàn)為數(shù)據(jù)模型——這意味著我們需要通過聚類對分區(qū)鍵及入口進行分組/分類,從而實現(xiàn)嵌套有序映射。以下為簡單示例:
CREATE TABLE campaign( id uuid, year int, month int, day int, views bigint, clicks bigint, PRIMARY KEY (id, year, month, day) ); INSERT INTO campaign(id, year, month, day, views, clicks) VALUES(40b08953-a…,2015, 9, 10, 1000, 42); SELECT views, clicks FROM campaign WHERE id=40b08953-a… and year=2015 and month>8;
為了獲取某一范圍內的特定數(shù)據(jù),我們必須指定全鍵,且不允許除列表內***一列之外的其它任何范圍劃定得以執(zhí)行。這種限制用于針對不同范圍進行多重掃描限定,否則其可能帶來隨機磁盤訪問并拖慢整體性能表現(xiàn)。這意味著該數(shù)據(jù)模型必須根據(jù)讀取查詢進行認真設計,從而限制讀取/掃描量——但這同時也會導致對新查詢的支持靈活性有所下降。
那么如果我們需要將某些表加入到其它表當中,又該如何處理?讓我們考慮下一種場景:針對特定月份對全部活動進行總體訪問量計算。
CREATE TABLE event( id uuid, ad_id uuid, campaign uuid, ts bigint, type text, PRIMARY KEY(id) );
在特定模型之下,實現(xiàn)這一目標的惟一辦法就是讀取全部活動、讀取全部事件、匯總各屬性值(其與活動id相匹配)并將其分配給活動。實現(xiàn)這類應用程序操作顯然***挑戰(zhàn),因為保存在Casandra中的數(shù)據(jù)總量往往非常龐大,內存容量根本不足以加以容納。因此我們必須以分布式方式對此類數(shù)據(jù)加以處理,而Spark在這類用例中將發(fā)揮重要作用。
處理層: Spark
Spark的抽象核心主要涉及RDD(即彈性分布式數(shù)據(jù)集,一套分布式元素集合)以及由以下四個主要階段構成的工作流:
- RDD操作(轉換與操作)以DAG(即有向無環(huán)圖)形式進行
- DAG會根據(jù)各任務階段進行拆分,并隨后被提交至集群管理器
- 各階段無需混洗/重新分配即可與任務相結合
- 任務運行在工作程序之上,而結果隨后返回至客戶端
以下為我們如何利用Spark與Cassandra解決上述問題:
val sc = new SparkContext(conf) case class Event(id: UUID, ad_id: UUID, campaign: UUID, ts: Long, `type`: String) sc.cassandraTable[Event]("keyspace", "event") .filter(e => e.`type` == "view" && checkMonth(e.ts)) .map(e => (e.campaign, 1)) .reduceByKey(_ + _) .collect()
指向Cassandra的交互通過Spark-Cassandra-連接器負責執(zhí)行,其能夠讓整個流程變得更為直觀且簡便。另有一個非常有趣的選項能夠幫助大家實現(xiàn)對NoSQL存儲內容的交互——SparkSQL,其能夠將SQL語句翻譯成一系列RDD操作。
case class CampaignReport(id: String, views: Long, clicks: Long) sql("""SELECT campaign.id as id, campaign.views as views, campaign.clicks as clicks, event.type as type FROM campaign JOIN event ON campaign.id = event.campaign """).rdd .groupBy(row => row.getAs[String]("id")) .map{ case (id, rows) => val views = rows.head.getAs[Long]("views") val clicks = rows.head.getAs[Long]("clicks") val res = rows.groupBy(row => row.getAs[String]("type")).mapValues(_.size) CampaignReport(id, views = views + res("view"), clicks = clicks + res("click")) }.saveToCassandra(“keyspace”, “campaign_report”)
通過幾行代碼,我們已經(jīng)能夠實現(xiàn)原生Lambda設計——其復雜度顯然較高,但這一示例表明大家完全有能力以簡單方式實現(xiàn)既定功能。
類MapReduce解決方案:拉近處理與數(shù)據(jù)間的距離
Spark-Cassandra連接器擁有數(shù)據(jù)位置識別能力,并會從集群內距離最近的節(jié)點處讀取數(shù)據(jù),從而***程度降低數(shù)據(jù)在網(wǎng)絡中的傳輸需求。為了充分發(fā)揮Spark-C*連接器的數(shù)據(jù)位置識別能力,大家應當讓Spark工作程序與Cassandra節(jié)點并行協(xié)作。
除了Spark與Cassandra的協(xié)作之外,我們也有理由將運營(或者高寫入強度)集群同分析集群區(qū)分開來,從而保證:
- 不同集群能夠獨立進行規(guī)模伸縮
- 數(shù)據(jù)由Cassandra負責復制,而無需其它機制介入
- 分析集群擁有不同的讀取/寫入負載模式
- 分析集群能夠容納額外數(shù)據(jù)(例如詞典)與處理結果
- Spark對資源的影響只局限于單一集群當中
下面讓我們再次回顧Spark的應用程序部署選項:
目前我們擁有三種主要集群資源管理器選項可供選擇:
- 單獨使用Spark——Spark作為主體,各工作程序以獨立應用程序的形式安裝并執(zhí)行(這明顯會增加額外資源負擔,且只支持為每工作程序分配靜態(tài)資源)
- 如果大家已經(jīng)擁有Hadoop生態(tài)系統(tǒng),那么YARN絕對是個不錯的選項
- Mesos自誕生之初就在設計中考慮到對集群資源的動態(tài)分配,而且除了Hadoop應用程序之外,同時也適合處理各類異構工作負載
Mesos架構
Mesos集群由各主節(jié)點構成,它們負責資源供應與調度,而各從節(jié)點則實際承擔任務執(zhí)行負載。在HA模式當中,我們利用多個主ZooKeeper 節(jié)點負責進行主節(jié)點選擇與服務發(fā)現(xiàn)。Mesos之上執(zhí)行的各應用程序被稱為“框架(Framework)”,并利用API處理資源供應及將任務提交至 Mesos??傮w來講,其任務執(zhí)行流程由以下幾個步驟構成:
- 從節(jié)點為主節(jié)點提供可用資源
- 主節(jié)點向框架發(fā)送資源供應
- 調度程序回應這些任務及每任務資源需求
- 主節(jié)點將任務發(fā)送至從節(jié)點
將Spark、Mesos以及Cassandra加以結合
正如之前所提到,Spark工作程序應當與Cassandra節(jié)點協(xié)作,從而實現(xiàn)數(shù)據(jù)位置識別能力以降低網(wǎng)絡流量與Cassandra集群負載。下圖所示為利用Mesos實現(xiàn)這一目標的可行部署場景示例:
- Mesos主節(jié)點與ZooKeeper協(xié)作
- Mesos從節(jié)點與Cassandra節(jié)點協(xié)作,從而為Spark提供更理想的數(shù)據(jù)位置
- Spark二進制文件部署至全部工作節(jié)點當中,而spark-env.sh則配置以合適的主端點及執(zhí)行器jar位置
- Spark執(zhí)行器JAR被上傳至S3/HDFS當中
根據(jù)以上設置流程Spark任務可利用簡單的spark-submit調用從任意安裝有Spark二進制文件并上傳有包含實際任務邏輯jar的工作節(jié)點被提交至集群中。
spark-submit --class io.datastrophic.SparkJob /etc/jobs/spark-jobs.jar
由于現(xiàn)有選項已經(jīng)能夠運行Docker化Spark,因此我們不必將二進制文件分發(fā)至每個單一集群節(jié)點當中。
定期與長期運行任務之執(zhí)行機制
每套數(shù)據(jù)處理系統(tǒng)遲早都要面對兩種必不可少的任務運行類別:定期批量匯聚型定期/階段性任務以及以數(shù)據(jù)流處理為代表的長期任務。這兩類任務的一大主要要求在于容錯能力——各任務必須始終保持運行,即使集群節(jié)點發(fā)生故障。Mesos提供兩套出色的框架以分別支持這兩種任務類別。
Marathon是一套專門用于實現(xiàn)長期運行任務高容錯性的架構,且支持與ZooKeeper相配合之HA模式。其能夠運行Docker并提供出色的REST API。以下shell命令示例為通過運行spark-submit實現(xiàn)簡單任務配置:
Chronos擁有與Marathon相同的特性,但其設計目標在于運行定期任務,而且總體而言其分布式HA cron支持任務圖譜。以下示例為利用簡單的bash腳本實現(xiàn)S3壓縮任務配置:
目前已經(jīng)有多種框架方案可供選擇,或者正處于積極開發(fā)當中以對接各類系統(tǒng)中所廣泛采用的Mesos資源管理功能。下面列舉其中一部分典型代表:
- Hadoop
- Cassandra
- Kafka
- Myriad: YARN on Mesos
- Storm
- Samza
數(shù)據(jù)提取
到目前為止可謂一切順利:存儲層已經(jīng)設計完成,資源管理機制設置妥當,而各任務亦經(jīng)過配置。接下來惟一要做的就是數(shù)據(jù)處理工作了。
假定輸入數(shù)據(jù)將以極高速率涌來,這時端點要順利應對就需要滿足以下要求:
- 提供高吞吐能力/低延遲
- 具備彈性
- 可輕松實現(xiàn)規(guī)模擴展
- 支持背壓
背壓能力并非必需,不過將其作為選項來應對負載峰值是個不錯的選擇。
Akka能夠***支持以上要求,而且基本上其設計目標恰好是提供這套功能集。下面來看Akka的特性:
- JVM面向JVM的角色模型實現(xiàn)能力
- 基于消息且支持異步架構
- 強制執(zhí)行非共享可變狀態(tài)
- 可輕松由單一進程擴展至設備集群
- 利用自上而下之監(jiān)督機制實現(xiàn)角色層級
- 不僅是并發(fā)框架:akka-http、akka-stream以及akka-persistence
以下簡要示例展示了三個負責處理JSON HttpRequest的角色,它們將該請求解析為域模型例類,并將其保存在Cassandra當中:
class HttpActor extends Actor { def receive = { case req: HttpRequest => system.actorOf(Props[JsonParserActor]) ! req.body case e: Event => system.actorOf(Props[CassandraWriterActor]) ! e } } class JsonParserActor extends Actor { def receive = { case s: String => Try(Json.parse(s).as[Event]) match { case Failure(ex) => //error handling code case Success(event) => sender ! event } } } class CassandraWriterActor extends Actor with ActorLogging { //for demo purposes, session initialized here val session = Cluster.builder() .addContactPoint("cassandra.host") .build() .connect() override def receive: Receive = { case event: Event => val statement = new SimpleStatement(event.createQuery) .setConsistencyLevel(ConsistencyLevel.QUORUM) Try(session.execute(statement)) match { case Failure(ex) => //error handling code case Success => sender ! WriteSuccessfull } } }
看起來只需幾行代碼即可實現(xiàn)上述目標,不過利用Akka向Cassandra當中寫入原始數(shù)據(jù)(即事件)卻有可能帶來以下問題:
-Cassandra的設計思路仍然偏重高速交付而非批量處理,因此必須對輸入數(shù)據(jù)進行預匯聚。
-匯聚/匯總所帶來的計算時間會隨著數(shù)據(jù)總量的增長而逐步加長。
-由于采用無狀態(tài)設計模式,各角色并不適合用于執(zhí)行匯聚任務。
-微批量機制能夠在一定程度上解決這個難題。
-仍然需要為原始數(shù)據(jù)提供某種可靠的緩沖機制
Kafka充當輸入數(shù)據(jù)之緩沖機制
為了保留輸入數(shù)據(jù)并對其進行預匯聚/處理,我們也可以使用某種類型的分布式提交日志機制。在以下用例中,消費程序將批量讀取數(shù)據(jù),對其進行處理并將其以預匯聚形式保存在Cassandra當中。該示例說明了如何利用akka-http通過HTTP將JSON數(shù)據(jù)發(fā)布至Kafka當中:
val config = new ProducerConfig(KafkaConfig()) lazy val producer = new KafkaProducer[A, A](config) val topic = “raw_events” val routes: Route = { post{ decodeRequest{ entity(as[String]){ str => JsonParser.parse(str).validate[Event] match { case s: JsSuccess[String] => producer.send(new KeyedMessage(topic, str)) case e: JsError => BadRequest -> JsError.toFlatJson(e).toString() } } } } } object AkkaHttpMicroservice extends App with Service { Http().bindAndHandle(routes, config.getString("http.interface"), config.getInt("http.port")) }
數(shù)據(jù)消費:Spark Streaming
盡管Akka也能夠用于消耗來自Kafka的流數(shù)據(jù),但將Spark納入生態(tài)系統(tǒng)以引入Spark Streaming能夠切實解決以下難題:
以下代碼示例闡述了如何利用Spark Streaming消費來自Kinesis的事件流:
val ssc = new StreamingContext(conf, Seconds(10)) val kinesisStream = KinesisUtils.createStream(ssc,appName,streamName, endpointURL,regionName, InitialPositionInStream.LATEST, Duration(checkpointInterval), StorageLevel.MEMORY_ONLY) } //transforming given stream to Event and saving to C* kinesisStream.map(JsonUtils.byteArrayToEvent) .saveToCassandra(keyspace, table) ssc.start() ssc.awaitTermination()
故障設計:備份與補丁安裝
通常來講,故障設計是任何系統(tǒng)當中最為枯燥的部分,但其重要性顯然不容質疑——當數(shù)據(jù)中心不可用或者需要對崩潰狀況加以分析時,盡可能保障數(shù)據(jù)免于丟失可謂至關重要。
那么為什么要將數(shù)據(jù)存儲在Kafka/Kinesis當中?截至目前,Kinesis仍然是惟一在無需備份的情況下能夠確保全部處理結果丟失后保留數(shù)據(jù)的解決方案。雖然Kafka也能夠支持數(shù)據(jù)長期保留,但硬件持有成本仍是個需要認真考慮的問題,因為S3存儲服務的使用成本要遠低于支持Kafka 所需要的大量實例——另外,S3也提供非常理想的服務水平協(xié)議。
除了備份能力,恢復/補丁安裝策略還應當考慮到前期與測試需求,從而保證任何與數(shù)據(jù)相關的問題能夠得到迅速解決。程序員們在匯聚任務或者重復數(shù)據(jù)刪除操作中可能不慎破壞計算結果,因此修復這類錯誤的能力就變得非常關鍵。簡化這類操作任務的一種簡便方式在于在數(shù)據(jù)模型當中引入冪等機制,這樣同一操作的多次重復將產生相同的結果(例如SQL更新屬于冪等操作,而計數(shù)遞增則不屬于)。
以下示例為Spark任務讀取S3備份并將其載入至Cassandra:
val sc = new SparkContext(conf) sc.textFile(s"s3n://bucket/2015/*/*.gz") .map(s => Try(JsonUtils.stringToEvent(s))) .filter(_.isSuccess).map(_.get) .saveToCassandra(config.keyspace, config.table)
宏觀構成
利用SMACK構建數(shù)據(jù)平臺頂層設計
縱觀全文,SMACK堆棧的卓越能力包括:
- 簡明的工具儲備以解決范圍極廣的各類數(shù)據(jù)處理場景
- 軟件方案久經(jīng)考驗且擁有廣泛普及度,背后亦具備強大的技術社區(qū)
- 易于實現(xiàn)規(guī)模伸縮與數(shù)據(jù)復制,且提供較低延遲水平
- 統(tǒng)一化集群管理以實現(xiàn)異構負載
- 可面向任意應用程序類型的單一平臺
- 面向不同架構設計(批量、流數(shù)據(jù)、Lambda、Kappa)的實現(xiàn)平臺
- 出色的產品發(fā)布速度(例如用于MVP驗證)