用Spark機(jī)器學(xué)習(xí)數(shù)據(jù)流水線進(jìn)行廣告檢測
在這篇文章中,我們Spark的其它機(jī)器學(xué)習(xí)API,名為Spark ML,如果要用數(shù)據(jù)流水線來開發(fā)大數(shù)據(jù)應(yīng)用程序的話,這個(gè)是推薦的解決方案。關(guān)鍵點(diǎn):
- 了解機(jī)器學(xué)習(xí)數(shù)據(jù)流水線有關(guān)內(nèi)容。
- 怎么用Apache Spark機(jī)器學(xué)習(xí)包來實(shí)現(xiàn)機(jī)器學(xué)習(xí)數(shù)據(jù)流水線。
- 數(shù)據(jù)價(jià)值鏈處理的步驟。
- Spark機(jī)器學(xué)習(xí)流水線模塊和API。
- 文字分類和廣告檢測用例。
Spark ML(spark.ml)包提供了構(gòu)建在DataFrame之上的機(jī)器學(xué)習(xí)API,它已經(jīng)成了Spark SQL庫的核心部分。這個(gè)包可以用于開發(fā)和管理機(jī)器學(xué)習(xí)流水線。它也可以提供特征抽取器、轉(zhuǎn)換器、選擇器,并支持分類、匯聚和分簇等機(jī)器學(xué)習(xí)技術(shù)。這些全都對開發(fā)機(jī)器學(xué)習(xí)解決方案至關(guān)重要。
在這里我們看看如何使用Apache Spark來做探索式數(shù)據(jù)分析(Exploratory Data Analysis)、開發(fā)機(jī)器學(xué)習(xí)流水線,并使用Spark ML包中提供的API和算法。
因?yàn)橹С謽?gòu)建機(jī)器學(xué)習(xí)數(shù)據(jù)流水線,Apache Spark框架現(xiàn)在已經(jīng)成了一個(gè)非常不錯(cuò)的選擇,可以用于構(gòu)建一個(gè)全面的用例,包括ETL、指量分析、實(shí)時(shí)流分析、機(jī)器學(xué)習(xí)、圖處理和可視化等。
機(jī)器學(xué)習(xí)數(shù)據(jù)流水線
機(jī)器學(xué)習(xí)流水線可以用于創(chuàng)建、調(diào)節(jié)和檢驗(yàn)機(jī)器學(xué)習(xí)工作流程序等。機(jī)器學(xué)習(xí)流水線可以幫助我們更加專注于項(xiàng)目中的大數(shù)據(jù)需求和機(jī)器學(xué)習(xí)任務(wù)等,而不是把時(shí)間和精力花在基礎(chǔ)設(shè)施和分布式計(jì)算領(lǐng)域上。它也可以在處理機(jī)器學(xué)習(xí)問題時(shí)幫助我們,在探索階段我們要開發(fā)迭代式功能和組合模型。
機(jī)器學(xué)習(xí)工作流通常需要包括一系列的處理和學(xué)習(xí)階段。機(jī)器學(xué)習(xí)數(shù)據(jù)流水線常被描述為一種階段的序列,每個(gè)階段或者是一個(gè)轉(zhuǎn)換器模塊,或者是個(gè)估計(jì)器模塊。這些階段會按順序執(zhí)行,輸入數(shù)據(jù)在流水線中流經(jīng)每個(gè)階段時(shí)會被處理和轉(zhuǎn)換。
機(jī)器學(xué)習(xí)開發(fā)框架要支持分布式計(jì)算,并作為組裝流水線模塊的工具。還有一些其它的構(gòu)建數(shù)據(jù)流水線的需求,包括容錯(cuò)、資源管理、可擴(kuò)展性和可維護(hù)性等。
在真實(shí)項(xiàng)目中,機(jī)器學(xué)習(xí)工作流解決方案也包括模型導(dǎo)入導(dǎo)出工具、交叉驗(yàn)證來選擇參數(shù)、為多個(gè)數(shù)據(jù)源積累數(shù)據(jù)等。它們也提供了一些像功能抽取、選擇和統(tǒng)計(jì)等的數(shù)據(jù)工具。這些框架支持機(jī)器學(xué)習(xí)流水線持久化來保存和導(dǎo)入機(jī)器學(xué)習(xí)模型和流水線,以備將來使用。
機(jī)器學(xué)習(xí)工作流的概念和工作流處理器的組合已經(jīng)在多種不同系統(tǒng)中越來越受歡迎。象scikit-learn和GraphLab等大數(shù)據(jù)處理框架也使用流水線的概念來構(gòu)建系統(tǒng)。
一個(gè)典型的數(shù)據(jù)價(jià)值鏈流程包括如下步驟:
- 發(fā)現(xiàn)
- 注入
- 處理
- 保存
- 整合
- 分析
- 展示
機(jī)器學(xué)習(xí)數(shù)據(jù)流水線所用的方法都是類似的。下圖展示了在機(jī)器學(xué)習(xí)流水線處理中涉及到的不同步驟。
表一:機(jī)器學(xué)習(xí)流水線處理步驟
這些步驟也可以用下面的圖一表示。
圖一:機(jī)器學(xué)習(xí)數(shù)據(jù)流水線處理流圖
接下來讓我們一起看看每個(gè)步驟的細(xì)節(jié)。
數(shù)據(jù)注入:我們收集起來供給機(jī)器學(xué)習(xí)流水線應(yīng)用程序的數(shù)據(jù)可以來自于多種數(shù)據(jù)源,數(shù)據(jù)規(guī)模也是從幾百GB到幾TB都可以。而且,大數(shù)據(jù)應(yīng)用程序還有一個(gè)特征,就是注入不同格式的數(shù)據(jù)。
數(shù)據(jù)清洗:數(shù)據(jù)清洗這一步在整個(gè)數(shù)據(jù)分析流水線中是***步,也是至關(guān)重要的一步,也可以叫做數(shù)據(jù)清理或數(shù)據(jù)轉(zhuǎn)換,這一步主要是要把輸入數(shù)據(jù)變成結(jié)構(gòu)化的,以方便后續(xù)的數(shù)據(jù)處理和預(yù)測性分析。依進(jìn)入到系統(tǒng)中的數(shù)據(jù)質(zhì)量不同,總處理時(shí)間的60%-70%會被花在數(shù)據(jù)清洗上,把數(shù)據(jù)轉(zhuǎn)成合適的格式,這樣才能把機(jī)器學(xué)習(xí)模型應(yīng)用到數(shù)據(jù)上。
數(shù)據(jù)總會有各種各樣的質(zhì)量問題,比如數(shù)據(jù)不完整,或者數(shù)據(jù)項(xiàng)不正確或不合法等。數(shù)據(jù)清洗過程通常會使用各種不同的方法,包括定制轉(zhuǎn)換器等,用流水線中的定制的轉(zhuǎn)換器去執(zhí)行數(shù)據(jù)清洗動作。
稀疏或粗粒數(shù)據(jù)是數(shù)據(jù)分析中的另一個(gè)挑戰(zhàn)。在這方面總會發(fā)生許多極端案例,所以我們要用上面講到的數(shù)據(jù)清洗技術(shù)來保證輸入到數(shù)據(jù)流水線中的數(shù)據(jù)必須是高質(zhì)量的。
伴隨著我們對問題的深入理解,每一次的連續(xù)嘗試和不斷地更新模型,數(shù)據(jù)清洗也通常是個(gè)迭代的過程。象Trifacta、OpenRefine或ActiveClean等數(shù)據(jù)轉(zhuǎn)換工具都可以用來完成數(shù)據(jù)清洗任務(wù)。
特征抽?。涸谔卣鞒槿?有時(shí)候也叫特征工程)這一步,我們會用特征哈希(Hashing Term Frequency)和Word2Vec等技術(shù)來從原始數(shù)據(jù)中抽取具體的功能。這一步的輸出結(jié)果常常也包括一個(gè)匯編模塊,會一起傳入下一個(gè)步驟進(jìn)行處理。
模型訓(xùn)練:機(jī)器學(xué)習(xí)模型訓(xùn)練包括提供一個(gè)算法,并提供一些訓(xùn)練數(shù)據(jù)讓模型可以學(xué)習(xí)。學(xué)習(xí)算法會從訓(xùn)練數(shù)據(jù)中發(fā)現(xiàn)模式,并生成輸出模型。
模型驗(yàn)證:這一步包評估和調(diào)整機(jī)器學(xué)習(xí)模型,以衡量用它來做預(yù)測的有效性。如這篇文章所說,對于二進(jìn)制分類模型評估指標(biāo)可以用接收者操作特征(Receiver Operating Characteristic,ROC)曲線。ROC曲線可以表現(xiàn)一個(gè)二進(jìn)制分類器系統(tǒng)的性能。創(chuàng)建它的方法是在不同的閾值設(shè)置下描繪真陽性率(True Positive Rate,TPR)和假陽性率(False Positive Rate,F(xiàn)PR)之間的對應(yīng)關(guān)系。
模型選擇:模型選擇指讓轉(zhuǎn)換器和估計(jì)器用數(shù)據(jù)去選擇參數(shù)。這在機(jī)器學(xué)習(xí)流水線處理過程中也是關(guān)鍵的一步。ParamGridBuilder和CrossValidator等類都提供了API來選擇機(jī)器學(xué)習(xí)模型。
模型部署:一旦選好了正確的模型,我們就可以開始部署,輸入新數(shù)據(jù)并得到預(yù)測性的分析結(jié)果。我們也可以把機(jī)器學(xué)習(xí)模型部署成網(wǎng)頁服務(wù)。
Spark機(jī)器學(xué)習(xí)
機(jī)器學(xué)習(xí)流水線API是在Apache Spark框架1.2版中引入的。它給開發(fā)者們提供了API來創(chuàng)建并執(zhí)行復(fù)雜的機(jī)器學(xué)習(xí)工作流。流水線API的目標(biāo)是通過為不同機(jī)器學(xué)習(xí)概念提供標(biāo)準(zhǔn)化的API,來讓用戶可以快速并輕松地組建并配置可行的分布式機(jī)器學(xué)習(xí)流水線。流水線API包含在org.apache.spark.ml包中。
Spark ML也有助于把多種機(jī)器學(xué)習(xí)算法組合到一條流水線中。
Spark機(jī)器學(xué)習(xí)API被分成了兩個(gè)包,分別是spark.mllib和spark.ml。其中spark.ml包包括了基于RDD構(gòu)建的原始API。而spark.ml包則提供了構(gòu)建于DataFrame之上的高級API,用于構(gòu)建機(jī)器學(xué)習(xí)流水線。
基于RDD的MLlib庫API現(xiàn)在處于維護(hù)模式。
如下面圖二所示,Spark ML是Apache Spark生態(tài)系統(tǒng)中的一個(gè)非常重要的大數(shù)據(jù)分析庫。
圖二:包括了Spark ML的Spark生態(tài)系統(tǒng)
機(jī)器學(xué)習(xí)流水線模塊
機(jī)器學(xué)習(xí)數(shù)據(jù)流水線包括了完成數(shù)據(jù)分析任務(wù)所需要的多個(gè)模塊。數(shù)據(jù)流水線的關(guān)鍵模塊被列在了下面:
- 數(shù)據(jù)集
- 流水線
- 流水線的階段
- 轉(zhuǎn)換器
- 估計(jì)器
- 評估器
- 參數(shù)(和參數(shù)地圖)
接下來我們簡單看看這些模塊可以怎么對應(yīng)到整體的步驟中。
數(shù)據(jù)集:在機(jī)器學(xué)習(xí)流水線中是使用DataFrame來表現(xiàn)數(shù)據(jù)集的。它也允許按有名字的字段保存結(jié)構(gòu)化數(shù)據(jù)。這些字段可以用于保存文字、功能向量、真實(shí)標(biāo)簽和預(yù)測。
流水線:機(jī)器學(xué)習(xí)工作流被建模為流水線,這包括了一系列的階段。每個(gè)階段都對輸入數(shù)據(jù)進(jìn)行處理,為下一個(gè)階段產(chǎn)生輸出數(shù)據(jù)。一個(gè)流水線把多個(gè)轉(zhuǎn)換器和估計(jì)器串連起來,描述一個(gè)機(jī)器學(xué)習(xí)工作流。
流水線的階段:我們定義兩種階段,轉(zhuǎn)換器和估計(jì)器。
轉(zhuǎn)換器:算法可以把一個(gè)DataFrame轉(zhuǎn)換成另一個(gè)DataFrame。比如,機(jī)器學(xué)習(xí)模型就是一個(gè)轉(zhuǎn)換器,用于把一個(gè)有特征的DataFrame轉(zhuǎn)換成一個(gè)有預(yù)測信息的DataFrame。
轉(zhuǎn)換器會把一個(gè)DataFrame轉(zhuǎn)成另一個(gè)DataFrame,同時(shí)為它加入新的特征。比如在Spark ML包中,OneHotEncoder就會把一個(gè)有標(biāo)簽索引的字段轉(zhuǎn)換成一個(gè)有向量特征的字段。每個(gè)轉(zhuǎn)換器都有一個(gè)transform()函數(shù),被調(diào)用時(shí)就會把一個(gè)DataFrame轉(zhuǎn)換成另一個(gè)。
估計(jì)器:估計(jì)器就是一種機(jī)器學(xué)習(xí)算法,會從你提供的數(shù)據(jù)中進(jìn)行學(xué)習(xí)。估計(jì)器的輸入是一個(gè)DataFrame,輸出就是一個(gè)轉(zhuǎn)換器。估計(jì)器用于訓(xùn)練模型,它生成轉(zhuǎn)換器。比如,邏輯回歸估計(jì)器就會產(chǎn)生邏輯回歸轉(zhuǎn)換器。另一個(gè)例子是把K-Means做為估計(jì)器,它接受訓(xùn)練數(shù)據(jù),生成K-Means模型,就是一個(gè)轉(zhuǎn)換器。
參數(shù):機(jī)器學(xué)習(xí)模塊會使用通用的API來描述參數(shù)。參數(shù)的例子之一就是模型要使用的***迭代次數(shù)。
下圖展示的是一個(gè)用作文字分類的數(shù)據(jù)流水線的各個(gè)模塊。
圖三:使用Spark ML的數(shù)據(jù)流水線
用例
機(jī)器學(xué)習(xí)流水線的用例之一就是文字分類。這種用例通常包括如下步驟:
- 清洗文字?jǐn)?shù)據(jù)
- 將數(shù)據(jù)轉(zhuǎn)化成特征向量,并且
- 訓(xùn)練分類模型
在文字分類中,在進(jìn)行分類模型(類似SVM)的訓(xùn)練之前,會進(jìn)行n-gram抽象和TF-IDF特征權(quán)重等數(shù)據(jù)預(yù)處理。
另一個(gè)機(jī)器學(xué)習(xí)流水線用例就是在這篇文章中描述的圖像分類。
還有很多種其它機(jī)器學(xué)習(xí)用例,包括欺詐檢測(使用分類模型,這也是監(jiān)督式學(xué)習(xí)的一部分),用戶分區(qū)(聚簇模型,這也是非監(jiān)督式學(xué)習(xí)的一部分)。
TF-IDF
詞頻-逆向文檔頻率(Term Frequency - Inverse Document Frequency,TF-IDF)是一種在給定樣本集合內(nèi)評估一個(gè)詞的重要程度的靜態(tài)評估方法。這是一種信息獲取算法,用于在一個(gè)文檔集合內(nèi)給一個(gè)詞的重要性打分。
TF:如果一個(gè)詞在一份文檔中反復(fù)出現(xiàn),那這個(gè)詞就比較重要。具體計(jì)算方法為:
TF = (# of times word X appears in a document) / (Total # of
words in the document)
IDF:但如果一個(gè)詞在多份文檔中都頻繁出現(xiàn)(比如the,and,of等),那就說明這個(gè)詞沒有什么實(shí)際意義,因此就要降低它的評分。
示例程序
下面我們看個(gè)示例程序,了解一下Spark ML包可以怎樣用在大數(shù)據(jù)處理系統(tǒng)中。我們會開發(fā)一個(gè)文檔分類程序,用于區(qū)別程序輸入數(shù)據(jù)中的廣告內(nèi)容。測試用的輸入數(shù)據(jù)集包括文檔、電子郵件或其它任何從外部系統(tǒng)中收到的可能包含廣告的內(nèi)容。
我們將使用在Strata Hadoop World Conference研討會上討論的“用Spark構(gòu)建機(jī)器學(xué)習(xí)應(yīng)用”的廣告檢測示例來構(gòu)建我們的示例程序。
用例
這個(gè)用例會對發(fā)送到我們的系統(tǒng)中的各種不同消息進(jìn)行分析。有些消息里面是含有廣告信息的,但有些消息里面沒有。我們的目標(biāo)就是要用Spark ML API找出那些包含了廣告的消息。
算法
我們將使用機(jī)器學(xué)習(xí)中的邏輯回歸算法。邏輯回歸是一種回歸分析模型,可以基于一個(gè)或多個(gè)獨(dú)立變量來預(yù)測得到是或非的可能結(jié)果。
詳細(xì)的解決方案
接下來咱們看看這個(gè)Spark ML示例程序的細(xì)節(jié),以及運(yùn)行步驟。
數(shù)據(jù)注入:我們會把包含廣告的數(shù)據(jù)(文本文件)和不包含廣告的數(shù)據(jù)都導(dǎo)入。
數(shù)據(jù)清洗:在示例程序中,我們不做任何特別的數(shù)據(jù)清洗操作。我們只是把所有的數(shù)據(jù)都匯聚到一個(gè)DataFrame對象中。
我們隨機(jī)地從訓(xùn)練數(shù)據(jù)和測試數(shù)據(jù)中選擇一些數(shù)據(jù),創(chuàng)建一個(gè)數(shù)組對象。在這個(gè)例子中我們的選擇是70%的訓(xùn)練數(shù)據(jù),和30%的測試數(shù)據(jù)。
在后續(xù)的流水線操作中我們分別用這兩個(gè)數(shù)據(jù)對象來訓(xùn)練模型和做預(yù)測。
我們的機(jī)器學(xué)習(xí)數(shù)據(jù)流水線包括四步:
- Tokenizer
- HashingTF
- IDF
- LR
創(chuàng)建一個(gè)流水線對象,并且在流水線中設(shè)置上面的各個(gè)階段。然后我們就可以按照例子,基于訓(xùn)練數(shù)據(jù)來創(chuàng)建一個(gè)邏輯回歸模型。
現(xiàn)在,我們再使用測試數(shù)據(jù)(新數(shù)據(jù)集)來用模型做預(yù)測。
下面圖四中展示了例子程序的架構(gòu)圖。
圖4:數(shù)據(jù)分類程序架構(gòu)圖
技術(shù)
在實(shí)現(xiàn)機(jī)器學(xué)習(xí)流水線解決方案時(shí)我們用到了下面的技術(shù)。
表二:在機(jī)器學(xué)習(xí)例子中用到的技術(shù)和工具
Spark ML程序
根據(jù)研討會上的例子而寫成的機(jī)器學(xué)習(xí)代碼是用Scala編程語言寫的,我們可以直接使用Spark Shell控制臺來運(yùn)行這個(gè)程序。
廣告檢測Scala代碼片段:
***步:創(chuàng)建一個(gè)定制的類,用來存儲廣告內(nèi)容的細(xì)節(jié)。
- case class SpamDocument(file: String, text: String, label:
- Double)
第二步:初始化SQLContext,并通過隱式轉(zhuǎn)換方法來把Scala對象轉(zhuǎn)換成DataFrame。然后從存放著輸入文件的指定目錄導(dǎo)入數(shù)據(jù)集,結(jié)果會返回RDD對象。然后由這兩個(gè)數(shù)據(jù)集的RDD對象創(chuàng)建DataFrame對象。
- val sqlContext = new SQLContext(sc)
- import sqlContext.implicits._
- //
- // Load the data files with spam
- //
- val rddSData = sc.wholeTextFiles("SPAM_DATA_FILE_DIR", 1)
- val dfSData = rddSData.map(d => SpamDocument(d._1, d._2,1)).toDF()
- dfSData.show()
- //
- // Load the data files with no spam
- //
- val rddNSData = sc.wholeTextFiles("NO_SPAM_DATA_FILE_DIR",
- 1)
- val dfNSData = rddNSData.map(d => SpamDocument(d._1,d._2, 0)).toDF()
- dfNSData.show()
第三步:現(xiàn)在,把數(shù)據(jù)集匯聚起來,然后根據(jù)70%和30%的比例來把整份數(shù)據(jù)拆分成訓(xùn)練數(shù)據(jù)和測試數(shù)據(jù)。
- //
- // Aggregate both data frames
- //
- val dfAllData = dfSData.unionAll(dfNSData)
- dfAllData.show()
- //
- // Split the data into 70% training data and 30% test data
- //
- val Array(trainingData, testData) =
- dfAllData.randomSplit(Array(0.7, 0.3))
第四步:現(xiàn)在可以配置機(jī)器學(xué)習(xí)數(shù)據(jù)流水線了,要創(chuàng)建我們在文章前面部分討論到的幾個(gè)部分:Tokenizer、HashingTF和IDF。然后再用訓(xùn)練數(shù)據(jù)創(chuàng)建回歸模型,在這個(gè)例子中是邏輯回歸。
- //
- // Configure the ML data pipeline
- //
- //
- // Create the Tokenizer step
- //
- val tokenizer = new Tokenizer()
- .setInputCol("text")
- .setOutputCol("words")
- //
- // Create the TF and IDF steps
- //
- val hashingTF = new HashingTF()
- .setInputCol(tokenizer.getOutputCol)
- .setOutputCol("rawFeatures")
- val idf = new
- IDF().setInputCol("rawFeatures").setOutputCol("features")
- //
- // Create the Logistic Regression step
- //
- val lr = new LogisticRegression()
- .setMaxIter(5)
- lr.setLabelCol("label")
- lr.setFeaturesCol("features")
- //
- // Create the pipeline
- //
- val pipeline = new Pipeline()
- .setStages(Array(tokenizer, hashingTF, idf, lr))
- val lrModel = pipeline.fit(trainingData)
- println(lrModel.toString())
第五步:***,我們調(diào)用邏輯回歸模型中的轉(zhuǎn)換方法來用測試數(shù)據(jù)做預(yù)測。
- //
- // Make predictions.
- //
- val predictions = lrModel.transform(testData)
- //
- // Display prediction results
- //
- predictions.select("file", "text", "label", "features", "prediction").show(300)
結(jié)論
Spark機(jī)器學(xué)習(xí)庫是Apache Spark框架中最重要的庫之一。它用于實(shí)現(xiàn)數(shù)據(jù)流水線。在這篇文章中,我們了解了如何使用Spark ML包的API以及用它來實(shí)現(xiàn)一個(gè)文本分類用例。
接下來的內(nèi)容
圖數(shù)據(jù)模型是關(guān)于在數(shù)據(jù)模型中不同的實(shí)體之間的連接和關(guān)系的。圖數(shù)據(jù)處理技術(shù)最近受到了很多關(guān)注,因?yàn)榭梢杂盟鼇斫鉀Q許多問題,包括欺詐檢測和開發(fā)推薦引擎等。
Spark框架提供了一個(gè)庫,專門用于圖數(shù)據(jù)分析。我們在這個(gè)系列的文章中,接下來會了解這個(gè)名為Spark GraphX的庫。我們會用Spark GraphX來開發(fā)一個(gè)示例程序,用于圖數(shù)據(jù)處理和分析。