Java 和 Python 是當(dāng)今最流行的兩種計(jì)算機(jī)語(yǔ)言。兩者都非常成熟,并提供了工具和技術(shù)生態(tài)系統(tǒng),幫助我們解決數(shù)據(jù)科學(xué)領(lǐng)域出現(xiàn)的挑戰(zhàn)性問(wèn)題。每種語(yǔ)言都各有優(yōu)勢(shì),我們要知道什么時(shí)候應(yīng)該使用哪種工具,或者什么時(shí)候它們應(yīng)該協(xié)同工作相互補(bǔ)充。
Python 是一種動(dòng)態(tài)類(lèi)型語(yǔ)言,使用起來(lái)非常簡(jiǎn)單,如果我們不想接觸復(fù)雜的程序,它肯定是進(jìn)行復(fù)雜計(jì)算的首選語(yǔ)言。Python 提供了優(yōu)秀的庫(kù)(Pandas、NumPy、Matplotlib、ScyPy、PyTorch、TensorFlow 等)來(lái)支持對(duì)數(shù)據(jù)結(jié)構(gòu)或數(shù)組的邏輯、數(shù)學(xué)和科學(xué)操作。
Java 是一種非常健壯的語(yǔ)言,具有強(qiáng)類(lèi)型,因此有更嚴(yán)格的語(yǔ)法規(guī)則,所以不易出現(xiàn)程序錯(cuò)誤。與Python一樣,它也提供了大量的庫(kù)來(lái)處理數(shù)據(jù)結(jié)構(gòu)、線性代數(shù)、機(jī)器學(xué)習(xí)和數(shù)據(jù)處理(ND4J、Mahout、Spark、Deeplearning4J 等)。
本文將介紹如何對(duì)大量表格數(shù)據(jù)進(jìn)行簡(jiǎn)單的數(shù)據(jù)分析,并使用 Java 和 Python 計(jì)算一些統(tǒng)計(jì)數(shù)據(jù)。我們可以看到使用各個(gè)平臺(tái)進(jìn)行數(shù)據(jù)分析的不同技術(shù),對(duì)比它們的擴(kuò)展方式,以及應(yīng)用并行計(jì)算來(lái)提高其性能的可行性。
提出問(wèn)題
我們要對(duì)不同州的一大批城市的價(jià)格做一個(gè)簡(jiǎn)單的分析,這里假設(shè)有一個(gè)包含此信息的 CSV 文件。閱讀文件并繼續(xù)過(guò)濾掉一些州,并將剩余的州按城市-州分組以進(jìn)行一些基本統(tǒng)計(jì)。希望能夠找到有效執(zhí)行的解決方案,并且能夠隨著輸入數(shù)據(jù)規(guī)模的增長(zhǎng)而有良好的擴(kuò)展。
數(shù)據(jù)樣本是:
城市 | 州 | 基本價(jià)格 | 實(shí)際價(jià)格 |
La Jose | PA | 34.17 | 33.19 |
Preachers Slough | WA | 27,46 | 90.17 |
Doonan Corners | NY | 92.0 | 162.46 |
Doonan Corners | NY | 97.45 | 159.46 |
Castle Rock | WA | 162.16 | 943.21 |
Marble Rock | IA | 97.13 | 391.49 |
Mineral | CA | 99.13 | 289.37 |
Blountville | IN | 92.50 | 557.66 |
Blountsville | IN | 122.50 | 557.66 |
Coe | IN | 187.85 | 943.98 |
Cecilia | KY | 92.85 | 273.61 |
目的是展示如何使用 Java 和 Python 解決這些類(lèi)型的問(wèn)題。該示例非常簡(jiǎn)單且范圍有限,但很容易拓展到更具挑戰(zhàn)性的問(wèn)題。
Java 的方法
首先定義一個(gè)封裝數(shù)據(jù)元素的 Java 記錄:
record InputEntry(String city, String state, double basePrice, double actualPrice) {}
記錄(record)是 JDK 14 中引入的一種新型類(lèi)型聲明。它是定義提供構(gòu)造函數(shù)、訪問(wèn)器、equals 和哈希實(shí)現(xiàn)的不可變類(lèi)的一種簡(jiǎn)捷方式。
接下來(lái),讀取 CVS 文件并將它們?cè)黾拥揭粋€(gè)列表中:
List<InputEntry> inputEntries = readRecordEntriesFromCSVFile(recordEntries.csv);
為了按城市和州對(duì)輸入的元素進(jìn)行分組,將其定義:
record CityState(String city, String state) {};
使用以下類(lèi)來(lái)封裝屬于一個(gè)組的所有元素的統(tǒng)計(jì)信息:
record StatsAggregation(StatsAccumulator basePrice, StatsAccumulator actualPrice) {}
StatsAccumulator是Guava 庫(kù)的一部分??梢詫㈦p精度值集合添加到類(lèi)中,它會(huì)計(jì)算基本統(tǒng)計(jì)數(shù)據(jù),例如計(jì)數(shù)、平均值、方差或標(biāo)準(zhǔn)差??梢允褂肧tatsAccumulator來(lái)獲取InputEntry的basePrice和actualPrice的統(tǒng)計(jì)數(shù)據(jù)。
現(xiàn)在我們已經(jīng)擁有了解決問(wèn)題的所有材料。Java Streams提供了一個(gè)強(qiáng)大的框架來(lái)實(shí)現(xiàn)數(shù)據(jù)操作和分析。它的聲明式編程風(fēng)格,對(duì)選擇、過(guò)濾、分組和聚合的支持,簡(jiǎn)化了數(shù)據(jù)操作和統(tǒng)計(jì)分析。它的框架還提供了一個(gè)強(qiáng)大的實(shí)現(xiàn),可以處理大量的(甚至是無(wú)限的流),并通過(guò)使用并行性、懶惰性和短路操作來(lái)高效處理。所有這些特性使Java Streams成為解決這類(lèi)問(wèn)題的絕佳選擇。實(shí)現(xiàn)非常簡(jiǎn)單:
Map<CityState, StatsAggregation> stats = inputEntries.stream().
filter(i -> !(i.state().equals("MN") || i.state().equals("CA"))).collect(
groupingBy(entry -> new CityState(entry.city(), entry.state()),
collectingAndThen(Collectors.toList(),
list -> {StatsAccumulator sac = new StatsAccumulator();
sac.addAll(list.stream().mapToDouble(InputEntry::basePrice));
StatsAccumulator sas = new StatsAccumulator();
sas.addAll(list.stream().mapToDouble(InputEntry::actualPrice));
return new StatsAggregation(sac, sas);}
)));
在代碼的第 2 行,我們使用Stream::filter. 這是一個(gè)布爾值函數(shù),用于過(guò)濾列表中的元素。可以實(shí)現(xiàn)一個(gè) lambda 表達(dá)式來(lái)刪除任何包含“MN”或“CA”狀態(tài)的元素。
然后繼續(xù)收集列表的元素并調(diào)用Collectors::groupingBy()(第 3 行),它接受兩個(gè)參數(shù):
- 一個(gè)分類(lèi)功能,使用CityState記錄來(lái)做城市和州的分組(第3行)。
- 下游的收集器,包含屬于同一<城州>的元素。使用Collectors::collectingAndThen(第 4 行),它采用兩個(gè)參數(shù)分兩步進(jìn)行歸約:
·我們使用Collectors::toList(第 4 行),它返回一個(gè)收集器,它將屬于同一<城州>的所有元素放到一個(gè)列表中。
·隨后對(duì)這個(gè)列表進(jìn)行了整理轉(zhuǎn)換。使用一個(gè)lambda函數(shù)(第5行至第9行)來(lái)定義兩個(gè)StatsAccumulator(s),在這里分別計(jì)算前一個(gè)列表中的basePrice和actualPrice元素的統(tǒng)計(jì)數(shù)據(jù)。最后,返回到新創(chuàng)建的包含這些元素的StatsAggregation記錄。
正如前文所述,使用Java Streams的優(yōu)勢(shì)之一是,它提供了一種簡(jiǎn)單的機(jī)制,可以使用多線程進(jìn)行并行處理。這允許利用CPU的多核資源,同時(shí)執(zhí)行多個(gè)線程。只要在流中添加一個(gè) "parallel":
Map<CityState, StatsAggregation> stats = inputEntries.stream().parallel().
這導(dǎo)致流框架將元素列表細(xì)分為多個(gè)部分,并同時(shí)在單獨(dú)的線程中運(yùn)行它們。隨著所有不同的線程完成它們的計(jì)算,框架將它們串行添加到生成的 Map 中。
在第4行中使用Collectors::groupingByConcurrent而不是Collectors:groupingBy。在這種情況下,框架使用并發(fā)映射,允許將來(lái)自不同線程的元素直接插入到此映射中,而不必串行組合。
有了這三種可能性,可以檢查它們?nèi)绾螆?zhí)行之前的統(tǒng)計(jì)計(jì)算(不包括從 CSV 文件加載數(shù)據(jù)的時(shí)間),因?yàn)榧虞d量從500萬(wàn)條翻倍到2000萬(wàn)條:
串行 | 平行 | 并行 & GroupByConcurrent | |
五百萬(wàn)個(gè)元素 | 3.045 秒 | 1.941 秒 | 1.436 秒 |
一千萬(wàn)個(gè)元素 | 6.405 秒 | 2.876 秒 | 2.785 秒 |
兩千萬(wàn)個(gè)元素 | 8.507 秒 | 4.956 秒 | 4.537 秒 |
可以看到并行運(yùn)行大大提高了性能;隨著負(fù)載的增加,時(shí)間幾乎減半。使用 GroupByConcurrent 還可額外獲得 10% 的收益。
最后,得到結(jié)果是微不足道的;例如,要獲得印第安納州 Blountsville 的統(tǒng)計(jì)數(shù)據(jù),我們只需要:
StatsAggregation aggreg = stateAggr.get(new CityState("Blountsville ", "IN"));
System.out.println("Blountsville, IN");
System.out.println("basePrice.mean: " + aggreg.basePrice().mean());
System.out.println("basePrice.populationVariance: " + aggreg.basePrice().populationVariance());
System.out.println("basePrice.populationStandardDeviation: " + aggreg.basePrice().populationStandardDeviation());
System.out.println("actualPrice.mean: " + aggreg.basePrice().mean());
System.out.println("actualPrice.populationVariance: " + aggreg.actualPrice().populationVariance());
System.out.println("actualPrice.populationStandardDeviation: " + aggreg.actualPrice().populationStandardDeviation());
得到的結(jié)果:
Blountsville : IN
basePrice.mean: 50.302588996763795
basePrice.sampleVariance: 830.7527439246837
basePrice.sampleStandardDeviation: 28.822781682632293
basePrice.count: 309
basePrice.min: 0.56
basePrice.max: 99.59
actualPrice.mean: 508.8927831715211
actualPrice.sampleVariance: 78883.35878833274
actualPrice.sampleStandardDeviation: 280.86181440048546
actualPrice.count: 309
actualPrice.min: 0.49
actualPrice.max: 999.33
Python的方法
在 Python 中,有幾個(gè)庫(kù)可以處理數(shù)據(jù)統(tǒng)計(jì)和分析。其中,Pandas 庫(kù)非常適合處理大量表格數(shù)據(jù),它提供了非常有效的過(guò)濾、分組和統(tǒng)計(jì)分析方法。
使用 Python 分析以前的數(shù)據(jù):
import pandas as pd
def group_aggregations(df_group_by):
df_result = df_group_by.agg(
{'basePrice': ['count', 'min', 'max', 'mean', 'std', 'var'],
'actualPrice': ['count', 'min', 'max', 'mean', 'std', 'var']}
)
return df_result
if __name__ == '__main__':
df = pd.read_csv("recordEntries.csv")
excluded_states = ['MN', 'CA']
df_st = df.loc[~ df['state'].isin(excluded_states)]
group_by = df_st.groupby(['city', 'state'], sort=False)
aggregated_results = group_aggregations(group_by)
在主要部分,先調(diào)用pandas.read_csv()(第 11 行)將文件中用逗號(hào)分隔的值加載到 PandasDataFrame中。
在第13行,使用~df['state'].isin(excluded_states)來(lái)得到一個(gè)Pandas系列的布爾值,使用pandas.loc()來(lái)過(guò)濾其中不包括的州(MN和CA)。
接下來(lái),在第14行使用DataFrame.groupby()來(lái)按城市和州進(jìn)行分組。結(jié)果由group_aggregations()處理,保存每個(gè)組的basePrice和actualPrice的統(tǒng)計(jì)數(shù)據(jù)。
在Python中打印結(jié)果是非常直接的。IN和Blountsville的結(jié)果:
print(aggregated_results.loc['Blountsville', 'IN']['basePrice'])
print(aggregated_results.loc['Blountsville', 'IN']['actualPrice'])
統(tǒng)計(jì)數(shù)據(jù):
base_price:
Name: (Blountsville, IN), dtype: float64
count 309.000000
min 0.560000
max 99.590000
mean 50.302589
std 28.822782
var 830.752744
actual_price:
Name: (Blountsville, IN), dtype: float64
count 309.000000
min 0.490000
max 999.330000
mean 508.892783
std 280.861814
var 78883.358788
為了并行運(yùn)行前面的代碼,我們必須記住,Python并不像Java那樣支持細(xì)粒度的鎖機(jī)制。必須解決好與全局解釋器鎖(GIL)的問(wèn)題,無(wú)論你有多少個(gè)CPU多核或線程,一次只允許一個(gè)線程執(zhí)行。
為了支持并發(fā),我們必須考慮到有一個(gè)CPU 密集型進(jìn)程,因此,最好的方法是使用multiprocessing。所以需要修改代碼:
from multiprocessing import Pool
import pandas as pd
def aggreg_basePrice(df_group):
ct_st, grp = df_group
return ct_st, grp.basePrice.agg(['count', 'min', 'max', 'mean', 'std', 'var'])
if __name__ == '__main__':
df = pd.read_csv("recordEntries.csv")
start = time.perf_counter()
excluded_states = ['MN', 'CA']
filtr = ~ df['state'].isin(excluded_states)
df_st = df.loc[filtr]
grouped_by_ct_st = df_st.groupby(['city', 'state'], sort=False)
with Pool() as p:
list_parallel = p.map(aggreg_basePrice, [(ct_st, grouped) for ct_st, grouped in grouped_by_ct_st])
print(f'Time elapsed parallel: {round(finish - start, 2)} sec')
和之前一樣,使用Pandas groupby()來(lái)獲得按城市和州分組的數(shù)據(jù)(第14行)。在下一行,使用多進(jìn)程庫(kù)提供的Pool()來(lái)映射分組的數(shù)據(jù),使用aggreg_basePrice來(lái)計(jì)算每組的統(tǒng)計(jì)數(shù)據(jù)。Pool()會(huì)對(duì)數(shù)據(jù)進(jìn)行分割,并在幾個(gè)平行的獨(dú)立進(jìn)程中進(jìn)行統(tǒng)計(jì)計(jì)算。
正如下面的表格中所示,多進(jìn)程比串行運(yùn)行進(jìn)程慢得多。因此,對(duì)于這些類(lèi)型的問(wèn)題,不值得使用這種方法。
可以使用另一種并發(fā)運(yùn)行代碼 - Modin。Modin提供了一種無(wú)縫的方式來(lái)并行化你的代碼,當(dāng)你必須處理大量的數(shù)據(jù)時(shí)是非常有用的。將導(dǎo)入語(yǔ)句從import pandas as pd改為import modin.pandas as pd,可以并行運(yùn)行代碼,并利用環(huán)境中可能存在的內(nèi)核集群來(lái)加速代碼的執(zhí)行。
下面的表格是剛剛涉及的不同場(chǎng)景的運(yùn)行時(shí)間(和以前一樣,不包括從CSV文件中讀取數(shù)據(jù)的時(shí)間):
串行 | 多進(jìn)程 | Modin 過(guò)程 | |
五百萬(wàn)個(gè)元素 | 1.94 秒 | 20.25 秒 | 6.99 秒 |
一千萬(wàn)個(gè)元素 | 4.07 秒 | 25.1 秒 | 12.88 秒 |
兩千萬(wàn)個(gè)元素 | 7.62 秒 | 36.2 秒 | 25.94 秒 |
根據(jù)表格顯示,在Python中串行運(yùn)行代碼甚至比在Java中更快。然而,使用多進(jìn)程會(huì)大大降低性能。使用Moding可以改善結(jié)果,使串行運(yùn)行進(jìn)程更有利。值得一提的是,和以前一樣,我們?cè)谟?jì)算時(shí)間時(shí)不包括從CSV文件中讀取數(shù)據(jù)的時(shí)間。
可以發(fā)現(xiàn),對(duì)于 Pandas 中的 CPU 密集型進(jìn)程來(lái)說(shuō),并行化代碼是沒(méi)有優(yōu)勢(shì)的。從某種意義上說(shuō),這反映了 Pandas 最初的架構(gòu)方式。Pandas 在串行模式下的運(yùn)行速度令人印象深刻,而且即使處理大量數(shù)據(jù)也具有很好的擴(kuò)展性。
需要指出的是,Python中統(tǒng)計(jì)數(shù)字的計(jì)算速度取決于它的執(zhí)行方式。為了得到快速的計(jì)算,需要應(yīng)用到統(tǒng)計(jì)函數(shù)。例如,做一個(gè)簡(jiǎn)單的pandas.DataFrame.describe()來(lái)獲得統(tǒng)計(jì)信息,運(yùn)行速度會(huì)非常慢。
Java 的 Streams 或 Python 的 Pandas 是對(duì)大量數(shù)據(jù)進(jìn)行分析和統(tǒng)計(jì)的兩個(gè)絕佳選擇。兩者都有非常可靠的框架,以及足夠的支持,能夠?qū)崿F(xiàn)出色的性能和可擴(kuò)展性。
Java 提供了非常強(qiáng)大的基礎(chǔ)架構(gòu),非常適合處理復(fù)雜的程序流。它非常高效,可以有效地并行運(yùn)行進(jìn)程。適用于快速獲得結(jié)果。
Python 非常適合做數(shù)學(xué)和統(tǒng)計(jì)。它非常簡(jiǎn)單,相當(dāng)快,非常適合進(jìn)行復(fù)雜的計(jì)算。
譯者介紹
翟珂,51CTO社區(qū)編輯,目前在杭州從事軟件研發(fā)工作,做過(guò)電商、征信等方面的系統(tǒng),享受分享知識(shí)的過(guò)程,充實(shí)自己的生活。
原文標(biāo)題:??Data Statistics and Analysis With Java and Python???,作者:??Manu Barriola??