?用 PySpark ML 構(gòu)建機器學習模型
?Spark 是一種專門用于交互式查詢、機器學習和實時工作負載的開源框架,而 PySpark 是 Python 使用 Spark 的庫。
PySpark 是一種用于大規(guī)模執(zhí)行探索性數(shù)據(jù)分析、構(gòu)建機器學習管道以及為數(shù)據(jù)平臺創(chuàng)建 ETL 的出色語言。如果你已經(jīng)熟悉 Python 和 Pandas 等庫,那么 PySpark 是一種很好的學習語言,可以創(chuàng)建更具可擴展性的分析和管道。
這篇文章的目的是展示如何使用 PySpark 構(gòu)建機器學習模型。
Conda 創(chuàng)建 python 虛擬環(huán)境
conda將幾乎所有的工具、第三方包都當作package進行管理,甚至包括python 和conda自身。Anaconda是一個打包的集合,里面預裝好了conda、某個版本的python、各種packages等。
1.安裝Anaconda。
打開命令行輸入conda -V檢驗是否安裝及當前conda的版本。
通過Anaconda安裝默認版本的Python,3.6的對應的是 Anaconda3-5.2,5.3以后的都是python 3.7。
(https://repo.anaconda.com/archive/)
2.conda常用的命令
1) 查看安裝了哪些包
conda list
2) 查看當前存在哪些虛擬環(huán)境
conda env list
conda info -e
3) 檢查更新當前conda
conda update conda
3.Python創(chuàng)建虛擬環(huán)境
conda create -n your_env_name python=x.x
anaconda命令創(chuàng)建python版本為x.x,名字為your_env_name的虛擬環(huán)境。your_env_name文件可以在Anaconda安裝目錄envs文件下找到。
4.激活或者切換虛擬環(huán)境
打開命令行,輸入python --version檢查當前 python 版本。
Linux: source activate your_env_nam
Windows: activate your_env_name
5.對虛擬環(huán)境中安裝額外的包
conda install -n your_env_name [package]
6.關閉虛擬環(huán)境
(即從當前環(huán)境退出返回使用PATH環(huán)境中的默認python版本)
deactivate env_name
# 或者`activate root`切回root環(huán)境
Linux下:source deactivate
7.刪除虛擬環(huán)境
conda remove -n your_env_name --all
8.刪除環(huán)境鐘的某個包
conda remove --name $your_env_name $package_name
9.設置國內(nèi)鏡像
http://Anaconda.org 的服務器在國外,安裝多個packages時,conda下載的速度經(jīng)常很慢。清華TUNA鏡像源有Anaconda倉庫的鏡像,將其加入conda的配置即可:
# 添加Anaconda的TUNA鏡像
conda config --add channels https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/free/
# 設置搜索時顯示通道地址
conda config --set show_channel_urls yes
10.恢復默認鏡像
conda config --remove-key channels
安裝 PySpark
PySpark 的安裝過程和其他 python 的包一樣簡單(例如 Pandas、Numpy、scikit-learn)。
一件重要的事情是,首先確保你的機器上已經(jīng)安裝了java。然后你可以在你的 jupyter notebook 上運行 PySpark。
探索數(shù)據(jù)
我們使用糖尿病數(shù)據(jù)集,它與美國國家糖尿病、消化和腎臟疾病研究所的糖尿病疾病有關。分類目標是預測患者是否患有糖尿?。ㄊ?否)。
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ml-diabetes').getOrCreate()
df = spark.read.csv('diabetes.csv', header = True, inferSchema = True)
df.printSchema()
數(shù)據(jù)集由幾個醫(yī)學預測變量和一個目標變量 Outcome 組成。預測變量包括患者的懷孕次數(shù)、BMI、胰島素水平、年齡等。
- Pregnancies:懷孕次數(shù)
- Glucose:2小時內(nèi)口服葡萄糖耐量試驗的血糖濃度
- BloodPressure:舒張壓(mm Hg)
- SkinThickness:三頭肌皮膚褶皺厚度(mm)
- Insulin:2小時血清胰島素(mu U/ml)
- BMI:身體質(zhì)量指數(shù)(體重單位kg/(身高單位m)2)
- diabespedigreefunction:糖尿病譜系功能
- Age:年齡(年)
- Outcome:類變量(0或1)
- 輸入變量: 葡萄糖、血壓、BMI、年齡、懷孕、胰島素、皮膚厚度、糖尿病譜系函數(shù)。
- 輸出變量: 結(jié)果。
看看前五個觀察結(jié)果。Pandas 數(shù)據(jù)框比 Spark DataFrame.show() 更漂亮。
import pandas as pd
pd.DataFrame(df.take(5),
columns=df.columns).transpose()
在 PySpark 中,您可以使用 Pandas 的 DataFrame 顯示數(shù)據(jù) toPandas()。
df.toPandas()
檢查類是完全平衡的!
df.groupby('Outcome').count().toPandas()
描述性統(tǒng)計
numeric_features = [t[0] for t in df.dtypes if t[1] == 'int']
df.select(numeric_features)\
.describe()\
.toPandas()\
.transpose()
自變量之間的相關性
from pandas.plotting import scatter_matrix
numeric_data = df.select(numeric_features).toPandas()
axs = scatter_matrix(numeric_data, figsize=(8, 8));
# Rotate axis labels and remove axis ticks
n = len(numeric_data.columns)
for i in range(n):
v = axs[i, 0]
v.yaxis.label.set_rotation(0)
v.yaxis.label.set_ha('right')
v.set_yticks(())
h = axs[n-1, i]
h.xaxis.label.set_rotation(90)
h.set_xticks(())
數(shù)據(jù)準備和特征工程
在這一部分中,我們將刪除不必要的列并填充缺失值。最后,為機器學習模型選擇特征。這些功能將分為訓練和測試兩部分。
缺失數(shù)據(jù)處理
from pyspark.sql.functions import isnull, when, count, col
df.select([count(when(isnull(c), c)).alias(c)
for c in df.columns]).show()
這個數(shù)據(jù)集很棒,沒有任何缺失值。
不必要的列丟棄
dataset = dataset.drop('SkinThickness')
dataset = dataset.drop('Insulin')
dataset = dataset.drop('DiabetesPedigreeFunction')
dataset = dataset.drop('Pregnancies')
dataset.show()
特征轉(zhuǎn)換為向量
VectorAssembler —— 將多列合并為向量列的特征轉(zhuǎn)換器。
# 用VectorAssembler合并所有特性
required_features = ['Glucose',
'BloodPressure',
'BMI',
'Age']
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
inputCols=required_features,
outputCol='features')
transformed_data = assembler.transform(dataset)
transformed_data.show()
現(xiàn)在特征轉(zhuǎn)換為向量已完成。
訓練和測試拆分
將數(shù)據(jù)隨機分成訓練集和測試集,并設置可重復性的種子。
(training_data, test_data) = transformed_data.randomSplit([0.8,0.2], seed =2020)
print("訓練數(shù)據(jù)集總數(shù): " + str(training_data.count()))
print("測試數(shù)據(jù)集總數(shù): " + str(test_data.count()))
訓練數(shù)據(jù)集總數(shù):620
測試數(shù)據(jù)集數(shù)量:148
機器學習模型構(gòu)建
隨機森林分類器
隨機森林是一種監(jiān)督學習算法,用于分類和回歸。但是,它主要用于分類問題。眾所周知,森林是由樹木組成的,樹木越多,森林越茂盛。類似地,隨機森林算法在數(shù)據(jù)樣本上創(chuàng)建決策樹,然后從每個樣本中獲取預測,最后通過投票選擇最佳解決方案。這是一種比單個決策樹更好的集成方法,因為它通過對結(jié)果進行平均來減少過擬合。
from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(labelCol='Outcome',
featuresCol='features',
maxDepth=5)
model = rf.fit(training_data)
rf_predictions = model.transform(test_data)
評估隨機森林分類器模型
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(
labelCol = 'Outcome', metricName = 'accuracy')
print('Random Forest classifier Accuracy:', multi_evaluator.evaluate(rf_predictions))
Random Forest classifier Accuracy:0.79452
決策樹分類器
決策樹被廣泛使用,因為它們易于解釋、處理分類特征、擴展到多類分類設置、不需要特征縮放,并且能夠捕獲非線性和特征交互。
from pyspark.ml.classification import DecisionTreeClassifier
dt = DecisionTreeClassifier(featuresCol = 'features',
labelCol = 'Outcome',
maxDepth = 3)
dtModel = dt.fit(training_data)
dt_predictions = dtModel.transform(test_data)
dt_predictions.select('Glucose', 'BloodPressure',
'BMI', 'Age', 'Outcome').show(10)
評估決策樹模型
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(
labelCol = 'Outcome',
metricName = 'accuracy')
print('Decision Tree Accuracy:',
multi_evaluator.evaluate(dt_predictions))
Decision Tree Accuracy: 0.78767
邏輯回歸模型
邏輯回歸是在因變量是二分(二元)時進行的適當回歸分析。與所有回歸分析一樣,邏輯回歸是一種預測分析。邏輯回歸用于描述數(shù)據(jù)并解釋一個因二元變量與一個或多個名義、序數(shù)、區(qū)間或比率水平自變量之間的關系。當因變量(目標)是分類時,使用邏輯回歸。
from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features',
labelCol = 'Outcome',
maxIter=10)
lrModel = lr.fit(training_data)
lr_predictions = lrModel.transform(test_data)
評估我們的邏輯回歸模型。
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(
labelCol = 'Outcome',
metricName = 'accuracy')
print('Logistic Regression Accuracy:',
multi_evaluator.evaluate(lr_predictions))
Logistic Regression Accuracy:0.78767
梯度提升樹分類器模型
梯度提升是一種用于回歸和分類問題的機器學習技術(shù),它以弱預測模型(通常是決策樹)的集合形式生成預測模型。
from pyspark.ml.classification import GBTClassifier
gb = GBTClassifier(
labelCol = 'Outcome',
featuresCol = 'features')
gbModel = gb.fit(training_data)
gb_predictions = gbModel.transform(test_data)
評估我們的梯度提升樹分類器。
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
multi_evaluator = MulticlassClassificationEvaluator(
labelCol = 'Outcome',
metricName = 'accuracy')
print('Gradient-boosted Trees Accuracy:',
multi_evaluator.evaluate(gb_predictions))
Gradient-boosted Trees Accuracy:0.80137
結(jié)論
PySpark 是一種非常適合數(shù)據(jù)科學家學習的語言,因為它支持可擴展的分析和 ML 管道。如果您已經(jīng)熟悉 Python 和 Pandas,那么您的大部分知識都可以應用于 Spark??偠灾覀円呀?jīng)學習了如何使用 PySpark 構(gòu)建機器學習應用程序。我們嘗試了三種算法,梯度提升在我們的數(shù)據(jù)集上表現(xiàn)最好。