看大片 深入理解Spark的概念和編程方式
第一次聽聞Spark是2013年年末,當時筆者對Scala(Spark的編程語言)感興趣。一段時間之后做了一個有趣的數(shù)據(jù)科學項目,試圖預測泰坦尼克號上的生還情況(Kaggle競賽項目,通過使用機器學習預測泰坦尼克號上哪些乘客具備更高的生還可能性)。通過該項目可以更深入地理解Spark的概念和編程方式。
在本文Introduction to Apache Spark with Examples and Use Cases,作者RADEK OSTROWSKI將通過Kaggle競賽項目“預測泰坦尼克號上的生還情況”帶大家深入學習Spark。
以下為譯文
第一次聽聞Spark是2013年年末,當時筆者對Scala(Spark的編程語言)感興趣。一段時間之后做了一個有趣的數(shù)據(jù)科學項目,試圖預測泰坦尼克號上的生還情況(Kaggle競賽項目,通過使用機器學習預測泰坦尼克號上哪些乘客具備更高的生還可能性)。通過該項目可以更深入地理解Spark的概念和編程方式,強推薦想要精進Spark的開發(fā)人員拿該項目入手。
如今Spark在眾多互聯(lián)網(wǎng)公司被廣泛采用,例如Amazon、eBay和Yahoo等。許多公司擁有運行在上千個節(jié)點的Spark集群。根據(jù)Spark FAQ,已知最大的集群有著超過8000個節(jié)點。不難看出,Spark是一項值得關注和學習的技術。
本文通過一些實際案例和代碼示例對Spark進行介紹,案例和代碼示例部分出自Apache Spark官方網(wǎng)站,也有一部分出自《Learning Spark - Lightning-Fast Big Data Analysis》一書。
什么是 Apache Spark? 初步介紹
Spark是Apache的一個項目,被宣傳為"閃電般快速集群計算",它擁有繁榮的開源社區(qū),同時也是目前最活躍的Apache項目。
Spark提供了一個更快更通用的數(shù)據(jù)處理平臺。與Hadoop相比,運行在內(nèi)存中的程序,Spark的速度可以提高100倍,即使運行在磁盤上,其速度也能提高10倍。去年,Spark在處理速度方面已經(jīng)超越了Hadoop,僅利用十分之一于Hadoop平臺的機器,卻以3倍于Hadoop的速度完成了100TB數(shù)量級的Daytona GreySort比賽,成為了PB級別排序速度最快的開源引擎。
通過使用Spark所提供的超過80個高級函數(shù),讓更快速地完成編碼成為可能。大數(shù)據(jù)中的"Hello World!"(編程語言延續(xù)下來一個慣例):Word Count程序示例可以說明這一點,同樣的邏輯使用Java語言編寫MapReduce代碼需要50行左右,但在Spark(Scala評議實現(xiàn))中的實現(xiàn)非常簡單:
- sparkContext.textFile("hdfs://..."). flatMap(line => line.split(" ")). map(word => (word, 1)). reduceByKey(_ + _).saveAsTextFile("hdfs://...")
學習如Apache Spark的另一個重要途徑是使用交互式shell (REPL),使用REPL可以交互顯示代碼運行結果,實時測試每行代碼的運行結果,無需先編碼、再執(zhí)行整個作業(yè),如此便能縮短花在代碼上的工作時間,同時為即席數(shù)據(jù)分析提供了可能。
Spark的其他主要功能包括:
- 目前支持Scala,Java和Python三種語言的 API,并正在逐步支持其他語言(例如R語言);
- 能夠與Hadoop生態(tài)系統(tǒng)和數(shù)據(jù)源(HDFS,Amazon S3,Hive,HBase,Cassandra等)完美集成;
- 可以運行在Hadoop YARN或者Apache Mesos管理的集群上,也可以通過自帶的資源管理器獨立運行。
Spark 內(nèi)核之上還有許多強大的、更高級的庫作為補充,可以在同一應用程序中直接使用,目前有SparkSQL,Spark Streaming,MLlib(用于機器學習)和GraphX這四大組件庫,本文將對Spark Core及四大組件庫進行詳細介紹。當然,還有額外其它的Spark庫和擴展庫目前也處于開發(fā)中。
Spark Core
Spark Core是大規(guī)模并行計算和分布式數(shù)據(jù)處理的基礎引擎。它的職責有:
- 內(nèi)存管理和故障恢復;
- 調(diào)度、分發(fā)和監(jiān)控集群上的作業(yè);
- 與存儲系統(tǒng)進行交互。
Spark引入了RDD(彈性分布式數(shù)據(jù)集)的概念,RDD是一個不可變的容錯、分布式對象集合,支持并行操作。RDD可包含任何類型的對象,可通過加載外部數(shù)據(jù)集或通過Driver程序中的集合來完成創(chuàng)建。
RDD支持兩種類型的操作:
- 轉換(Transformations)指的是作用于一個RDD上并會產(chǎn)生包含結果的新RDD的操作(例如map, filter, join, union等)
- 動作(Actions)指的是作用于一個RDD之后,會觸發(fā)集群計算并得到返回值的操作(例如reduce,count,first等)
Spark中的轉換操作是“延遲的(lazy)”,意味著轉換時它們并不立即啟動計算并返回結果。相反,它們只是“記住”要執(zhí)行的操作和待執(zhí)行操作的數(shù)據(jù)集(例如文件)。轉換操作僅當產(chǎn)生調(diào)用action操作時才會觸發(fā)實際計算,完成后將結果返回到driver程序。這種設計使Spark能夠更有效地運行,例如,如果一個大文件以不同方式進行轉換操作并傳遞到首個action操作,此時Spark將只返回第一行的結果,而不是對整個文件執(zhí)行操作。
默認情況下,每次對其觸發(fā)執(zhí)行action操作時,都需要重新計算前面經(jīng)過轉換操作的RDD,不過,你也可以使用持久化或緩存方法在內(nèi)存中持久化RDD來避免這一問題,此時,Spark將在集群的內(nèi)存中保留這些元素,從而在下次使用時可以加速訪問。
SparkSQL
SparkSQL是Spark中支持SQL語言或者Hive查詢語言查詢數(shù)據(jù)的一個組件。它起先作為Apache Hive 端口運行在Spark之上(替代MapReduce),現(xiàn)在已經(jīng)被集成為Spark的一個重要組件。除支持各種數(shù)據(jù)源,它還可以使用代碼轉換來進行SQL查詢,功能十分強大。下面是兼容Hive查詢的示例:
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
- sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
- sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") // Queries are expressed in HiveQL
- sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
Spark Streaming
Spark Streaming支持實時處理流數(shù)據(jù),例如生產(chǎn)環(huán)境中的Web服務器日志文件(例如 Apache Flume和 HDFS/S3),社交媒體數(shù)據(jù)(例如Twitter)和各種消息隊列中(例如Kafka)的實時數(shù)據(jù)。在引擎內(nèi)部,Spark Streaming接收輸入的數(shù)據(jù)流,與此同時將數(shù)據(jù)進行切分,形成數(shù)據(jù)片段(batch),然后交由Spark引擎處理,按數(shù)據(jù)片段生成最終的結果流,如下圖所示。
Spark Streaming API與Spark Core緊密結合,使得開發(fā)人員可以輕松地同時駕駛批處理和流數(shù)據(jù)。
MLlib
MLlib是一個提供多種算法的機器學習庫,目的是使用分類,回歸,聚類,協(xié)同過濾等算法能夠在集群上橫向擴展(可以查閱Toptal中關于機器學習的文章詳細了解)。MLlib中的一些算法也能夠與流數(shù)據(jù)一起使用,例如使用普通最小二乘法的線性回歸算法或k均值聚類算法(以及更多其他正在開發(fā)的算法)。Apache Mahout(一個Hadoop的機器學習庫)摒棄MapReduce并將所有的力量放在Spark MLlib上。
GraphX
GraphX是一個用于操作圖和執(zhí)行圖并行操作的庫。它為ETL即Extraction-Transformation-Loading、探索性分析和迭代圖計算提供了統(tǒng)一的工具。除了內(nèi)置的圖操作之外,它也提供了一個通用的圖算法庫如PageRank。
如何使用Apache Spark: 事件監(jiān)測用例
回答了“什么是Apache Spark?”的問題之后,現(xiàn)在回過頭來想想哪些類型的問題或者挑戰(zhàn)可以使Spark得到更有效的使用。
我最近偶然發(fā)現(xiàn)了一篇關于通過分析Twitter流來檢測地震的實驗,有趣的是,實驗結果已經(jīng)表明使用這種方式通知日本發(fā)生地震的速度會比日本氣象局更快。即使在文章中他們使用了與本文不同的技術,但我認為這是一個很好的例子,通過使用Spark編寫簡潔的代碼,同時又無需為兼容性、互操作性而編寫膠水代碼(glue code)。
首先,我們必須過濾出與“earthquake” 或 “shaking”等相關的tweets消息流,可以很容易地使用Spark Streaming實現(xiàn)此目的:
- TwitterUtils.createStream(...) .filter(_.getText.contains("earthquake") || _.getText.contains("shaking"))
然后,我們需要對tweets消息流進行語義分析,以確定它們表示的是否是當前正在發(fā)生的地震。例如,“Earthquake!”或者“Now it is shaking”等tweets消息將被視為正面匹配。而像“參加地震會議(Attending an Earthquake Conference)”或“昨天地震真可怕(The earthquake yesterday was scary)”等tweets消息則不會被匹配。文章的作者為實現(xiàn)此功能使用了支持向量機(SVM),我們這里也可以這么做,但是也可以嘗試使用流式計算實現(xiàn)的版本,下面是使用MLlib生成的代碼示例:
- // We would prepare some earthquake tweet data and load it in LIBSVM format.
- val data = MLUtils.loadLibSVMFile(sc, "sample_earthquate_tweets.txt") // Split data into training (60%) and test (40%).
- val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
- val training = splits(0).cache() val test = splits(1) // Run training algorithm to build the model
- val numIterations = 100
- val model = SVMWithSGD.train(training, numIterations) // Clear the default
- threshold. model.clearThreshold() // Compute raw scores on the test set.
- val scoreAndLabels = test.map { point => val score = model.predict(point.features) (score, point.label)} // Get evaluation metrics.
- val metrics = new BinaryClassificationMetrics(scoreAndLabels)
- val auROC = metrics.areaUnderROC()
- println("Area under ROC = " + auROC)
如果我們對該模型的預測率感到滿意,我們可以進入下一階段并在發(fā)生地震時作出反應。為了預測一個地震的發(fā)生,我們需要在規(guī)定的時間窗口內(nèi)(如文章中所描述的)檢測一定數(shù)量(即密度)的正向微博。需要注意的是,對于啟用Twitter位置服務的tweet消息,我們還會提取地震的位置。有了前面這些知識的鋪墊,我們可以使用SparkSQL查詢現(xiàn)有的Hive表(存儲著對接收地震通知感興趣的用戶)來檢索對應用戶的電子郵件地址,并向各用戶發(fā)送個性化的警告郵件,如下所示:
- // sc is an existing SparkContext.
- val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) // sendEmail is a custom function
- sqlContext.sql("FROM earthquake_warning_users SELECT firstName, lastName, city, email") .collect().foreach(sendEmail)
其他Apache Spark使用示例
Spark的使用場景當然不僅僅局限于對地震的檢測。
這里提供關于一個非常適合Spark技術處理的案例速查指南(但肯定沒有接近窮盡),這些案例中的場景都面臨著大數(shù)據(jù)普遍存在的速度(Velocity)、多樣性(Variety)和容量(Volume)問題。
在游戲行業(yè)中,處理和發(fā)現(xiàn)來自實時游戲事件流中的隱藏模式,并能夠即時對它們做出響應是公司能夠產(chǎn)生營收的關鍵能力,主要目的是為實現(xiàn)玩家留存,定向廣告,復雜等級的自動調(diào)整等。
在電子商務行業(yè)中,實時交易信息可以傳遞到流聚類算法如k-means或者協(xié)同過濾算法如ALS,然后其結果可以與其他非結構化數(shù)據(jù)源(例如客戶評論或者產(chǎn)品評論)相結合,并持續(xù)不斷地提高和改進推薦算法以適應新的發(fā)展趨勢。
在金融或安全行業(yè)中,Spark技術??梢詰糜谄墼p或入侵檢測系統(tǒng)、或應用于基于風險的身份驗證。Spark可以通過收集大量歸檔日志,同時結合外部數(shù)據(jù)源如泄露的數(shù)據(jù)、受損賬戶信息(如https://haveibeenpwned.com/)及來自外部連接/請求(如IP地理位置或時間)的數(shù)據(jù)來達到最好的結果。,
結論
總而言之,Spark幫助降低了具備挑戰(zhàn)性和計算密集型的海量實時或離線數(shù)據(jù)(包括結構化和非結構化數(shù)據(jù))處理任務的難度,無縫集成相關復雜功能,如機器學習和圖形算法。Spark的大數(shù)據(jù)處理能力將惠及大眾,請盡情嘗試!