Python數(shù)據(jù)預處理:使用Dask和Numba并行化加速
如果你善于使用Pandas變換數(shù)據(jù)、創(chuàng)建特征以及清洗數(shù)據(jù)等,那么你就能夠輕松地使用Dask和Numba并行加速你的工作。單純從速度上比較,Dask完勝Python,而Numba打敗Dask,那么Numba+Dask基本上算是無敵的存在。
將數(shù)值計算分成Numba sub-function和使用Dask map_partition+apply,而不是使用Pandas。對于100萬行數(shù)據(jù),使用Pandas方法和混合數(shù)值計算創(chuàng)建新特征的速度比使用Numba+Dask方法的速度要慢許多倍。
Python:60.9x | Dask:8.4x | Numba:5.8x |Numba+Dask:1x
作為舊金山大學的一名數(shù)據(jù)科學碩士,會經(jīng)常跟數(shù)據(jù)打交道。使用Apply函數(shù)是我用來創(chuàng)建新特征或清理數(shù)據(jù)的眾多技巧之一?,F(xiàn)在,我只是一名數(shù)據(jù)科學家,而不是計算機科學方面的專家,但我是一個喜歡搗鼓并使得代碼運行更快的程序員?,F(xiàn)在,我將會分享我在并行應用上的經(jīng)驗。
大多Python愛好者可能了解Python實現(xiàn)的全局解釋器鎖(GIL),GIL會占用計算機中所有的CPU性能。更糟糕的是,我們主要的數(shù)據(jù)處理包,比如Pandas,很少能實現(xiàn)并行處理代碼。
Apply函數(shù)vs Multiprocessing.map
- %time df.some_col.apply(lambda x : clean_transform_kthx(x))
- Wall time: HAH! RIP BUDDY
- # WHY YOU NO RUN IN PARALLEL!?
Tidyverse已經(jīng)為處理數(shù)據(jù)做了一些美好的事情,Plyr是我最喜愛的數(shù)據(jù)包之一,它允許R語言使用者輕松地并行化他們的數(shù)據(jù)應用。Hadley Wickham說過:
“plyr是一套處理一組問題的工具:需要把一個大的數(shù)據(jù)結構分解成一些均勻的數(shù)據(jù)塊,之后對每一數(shù)據(jù)塊應用一個函數(shù),***將所有結果組合在一起。”
對于Python而言,我希望有類似于plyr這樣的數(shù)據(jù)包可供使用。然而,目前這樣的數(shù)據(jù)包還不存在,但我可以使用并行數(shù)據(jù)包構成一個簡單的解決方案。
Dask
之前在Spark上花費了一些時間,因此當我開始使用Dask時,還是比較容易地掌握其重點內(nèi)容。Dask被設計成能夠在多核CPU上并行處理任務,此外也借鑒了許多Pandas的語法規(guī)則。
現(xiàn)在開始本文所舉例子。對于最近的數(shù)據(jù)挑戰(zhàn)而言,我試圖獲取一個外部數(shù)據(jù)源(包含許多地理編碼點),并將其與要分析的一大堆街區(qū)相匹配。在計算歐幾里得距離的同時,使用***啟發(fā)式將***值分配給一個街區(qū)。
最初的apply:
- my_df.apply(lambda x: nearest_street(x.lat,x.lon),axis=1)
Dask apply:
- dd.from_pandas(my_df,npartitions=nCores).\
- map_partitions(\
- lambda df : df.apply(\
- lambda x : nearest_street(x.lat,x.lon),axis=1)).\
- compute(get=get)
- # imports at the end
二者看起來很相似,apply核心語句是map_partitions,***有一個compute()語句。此外,不得不對npartitions初始化。 分區(qū)的工作原理就是將Pandas數(shù)據(jù)幀劃分成塊,對于我的電腦而言,配置是6核-12線程,我只需告訴它使用的是12分區(qū),Dask就會完成剩下的工作。
接下來,將map_partitions的lambda函數(shù)應用于每個分區(qū)。由于許多數(shù)據(jù)處理代碼都是獨立地運行,所以不必過多地擔心這些操作的順序問題。***,compute()函數(shù)告訴Dask來處理剩余的事情,并把最終計算結果反饋給我。在這里,compute()調(diào)用Dask將apply適用于每個分區(qū),并使其并行處理。
由于我通過迭代行來生成一個新隊列(特征),而Dask apply只在列上起作用,因此我沒有使用Dask apply,以下是Dask程序:
- from dask import dataframe as dd
- from dask.multiprocessing import get
- from multiprocessing import cpu_count
- nCores = cpu_count()
由于我是根據(jù)一些簡單的線性運算(基本上是勾股定理)對數(shù)據(jù)進行分類,所以認為使用類似下面的Python代碼會運行得更快一些。
- for i in intersections:
- l3 = np.sqrt( (i[0] - [1])**2 + (i[2] - i[3])**2 )
- # ... Some more of these
- dist = l1 + l2
- if dist < (l3 * 1.2):
- matches.append(dist)
- # ... More stuff
- ### you get the idea, there's a for-loop checking to see if
- ### my points are close to my streets and then returning
- closest
- ### I even used numpy, that means fast right?
Broadcasting用以描述Numpy中對兩個形狀不同的矩陣進行數(shù)學計算的處理機制。假設我有一個數(shù)組,我會通過迭代并逐個變換每個單元格來改變它
- # over one array
- for cell in array:
- cell * CONSTANT - CONSTANT2
- # over two arrays
- for i in range(len(array)):
- array[i] = array[i] + array2[i]
相反,我完全可以跳過for循環(huán),并對整個數(shù)組執(zhí)行操作。Numpy與broadcasting混合使用,用來執(zhí)行元素智能乘積(對位相乘)。
- # over one array
- (array * CONSTANT) - CONSTANT2
- # over two arrays of same length
- # different lengths follow broadcasting rules
- array = array - array2
Broadcasting可以實現(xiàn)更多的功能,現(xiàn)在看看骨架代碼:
- from numba import jit
- @jit # numba magic
- def some_func()
- l3_arr = np.sqrt( (intersections[:,0] -
- intersections[:,1])**2 +\
- (intersections[:,2] -
- intersections[:,3])**2 )
- # now l3 is an array containing all of my block lengths
- # likewise, l1 and l2 are now equal sized arrays
- # containing distance of point to all intersections
- dist = l1_arr + l2_arr
- match_arr = dist < (l3_arr * 1.2)
- # so instead of iterating, I just immediately compare all
- of my
- # point-to-street distances at once and have a handy
- # boolean index
從本質(zhì)上講,代碼的功能是改變數(shù)組。好的一方面是運行很快,甚至能和Dask并行處理速度比較。其次,如果使用的是最基本的Numpy和Python,那么就可以及時編譯任何函數(shù)。壞的一面在于它只適合Numpy和簡單Python語法。我不得不把所有的數(shù)值計算從我的函數(shù)轉(zhuǎn)換成子函數(shù),但其計算速度會增加得非???。
將其一起使用
簡單地使用map_partition()就可以將Numba函數(shù)與Dask結合在一起,如果并行操作和broadcasting能夠密切合作以加快運行速度,那么對于大數(shù)據(jù)集而言,將會看到其運行速度得到大幅提升。
上面的***張圖表明,沒有broadcasting的線性計算其表現(xiàn)不佳,并行處理和Dask對速度提升也有效果。此外,可以明顯地發(fā)現(xiàn),Dask和Numba組合的性能優(yōu)于其它方法。
上面的第二張圖稍微有些復雜,其橫坐標是對行數(shù)取對數(shù)。從第二張圖可以發(fā)現(xiàn),對于1k到10k這樣小的數(shù)據(jù)集,單獨使用Numba的性能要比聯(lián)合使用Numba+Dask的性能更好,盡管在大數(shù)據(jù)集上Numba+Dask的性能非常好。
優(yōu)化
為了能夠使用Numba編譯JIT,我重寫了函數(shù)以更好地利用broadcasting。之后,重新運行這些函數(shù)后發(fā)現(xiàn),平均而言,對于相同的代碼,JIT的執(zhí)行速度大約快了24%。
可以肯定的說,一定有進一步的優(yōu)化方法使得執(zhí)行速度更快,但目前沒有發(fā)現(xiàn)。Dask是一個非常友好的工具,本文使用Dask+Numba實現(xiàn)的***成果是提升運行速度60倍。如果你知道其它的提升執(zhí)行速度的技巧,歡迎在留言區(qū)分享。
作者信息
Ernest Kim,舊金山大學碩士生,專注于機器學習、數(shù)據(jù)科學。