Python中最快解壓zip文件的方法
假設(shè)現(xiàn)在的上下文(LCTT 譯注:context,計算機術(shù)語,此處意為業(yè)務(wù)情景)是這樣的:一個 zip 文件被上傳到一個Web 服務(wù)中,然后 Python 需要解壓這個 zip 文件然后分析和處理其中的每個文件。這個特殊的應(yīng)用查看每個文件各自的名稱和大小,并和已經(jīng)上傳到 AWS S3 上的文件進(jìn)行比較,如果文件(和 AWS S3 上的相比)有所不同或者文件本身更新,那么就將它上傳到 AWS S3。
Uploads today
挑戰(zhàn)在于這些 zip 文件太大了。它們的平均大小是 560MB 但是其中一些大于 1GB。這些文件中大多數(shù)是文本文件,但是其中同樣也有一些巨大的二進(jìn)制文件。不同尋常的是,每個 zip 文件包含 100 個文件但是其中 1-3 個文件卻占據(jù)了多達(dá) 95% 的 zip 文件大小。
最開始我嘗試在內(nèi)存中解壓文件,并且每次只處理一個文件。在各種內(nèi)存爆炸和 EC2 耗盡內(nèi)存的情況下,這個方法壯烈失敗了。我覺得這個原因是這樣的。最開始你有 1GB 文件在內(nèi)存中,然后你現(xiàn)在解壓每個文件,在內(nèi)存中大約就要占用 2-3GB。所以,在很多次測試之后,解決方案是將這些 zip 文件復(fù)制到磁盤上(在臨時目錄 /tmp
中),然后遍歷這些文件。這次情況好多了但是我仍然注意到了整個解壓過程花費了巨量的時間。是否可能有方法優(yōu)化呢?
原始函數(shù)
首先是下面這些模擬對 zip 文件中文件實際操作的普通函數(shù):
def _count_file(fn):
with open(fn, 'rb') as f:
return _count_file_object(f)
def _count_file_object(f):
# Note that this iterates on 'f'.
# You *could* do 'return len(f.read())'
# which would be faster but potentially memory
# inefficient and unrealistic in terms of this
# benchmark experiment.
total = 0
for line in f:
total += len(line)
return total
這里是可能最簡單的另一個函數(shù):
def f1(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extractall(dest)
total = 0
for root, dirs, files in os.walk(dest):
for file_ in files:
fn = os.path.join(root, file_)
total += _count_file(fn)
return total
如果我更仔細(xì)地分析一下,我將會發(fā)現(xiàn)這個函數(shù)花費時間 40% 運行 extractall
,60% 的時間在遍歷各個文件并讀取其長度。
***步嘗試
我的***步嘗試是使用線程。先創(chuàng)建一個 zipfile.ZipFile
的實例,展開其中的每個文件名,然后為每一個文件開始一個線程。每個線程都給它一個函數(shù)來做“實質(zhì)工作”(在這個基準(zhǔn)測試中,就是遍歷每個文件然后獲取它的名稱)。實際業(yè)務(wù)中的函數(shù)進(jìn)行的工作是復(fù)雜的 S3、Redis 和 PostgreSQL 操作,但是在我的基準(zhǔn)測試中我只需要制作一個可以找出文件長度的函數(shù)就好了。線程池函數(shù):
def f2(fn, dest):
def unzip_member(zf, member, dest):
zf.extract(member, dest)
fn = os.path.join(dest, member.filename)
return _count_file(fn)
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ThreadPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member,
zf,
member,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結(jié)果:加速 ~10%
第二步嘗試
所以可能是 GIL(LCTT 譯注:Global Interpreter Lock,一種全局鎖,CPython 中的一個概念)阻礙了我。最自然的想法是嘗試使用多線程在多個 CPU 上分配工作。但是這樣做有缺點,那就是你不能傳遞一個非可 pickle 序列化的對象(LCTT 譯注:意為只有可 pickle 序列化的對象可以被傳遞),所以你只能發(fā)送文件名到之后的函數(shù)中:
def unzip_member_f3(zip_filepath, filename, dest):
with open(zip_filepath, 'rb') as f:
zf = zipfile.ZipFile(f)
zf.extract(filename, dest)
fn = os.path.join(dest, filename)
return _count_file(fn)
def f3(fn, dest):
with open(fn, 'rb') as f:
zf = zipfile.ZipFile(f)
futures = []
with concurrent.futures.ProcessPoolExecutor() as executor:
for member in zf.infolist():
futures.append(
executor.submit(
unzip_member_f3,
fn,
member.filename,
dest,
)
)
total = 0
for future in concurrent.futures.as_completed(futures):
total += future.result()
return total
結(jié)果: 加速 ~300%
這是作弊
使用處理器池的問題是這樣需要存儲在磁盤上的原始 .zip
文件。所以為了在我的 web 服務(wù)器上使用這個解決方案,我首先得要將內(nèi)存中的 zip 文件保存到磁盤,然后調(diào)用這個函數(shù)。這樣做的代價我不是很清楚但是應(yīng)該不低。
好吧,再翻翻看又沒有損失??赡埽鈮哼^程加速到足以彌補這樣做的損失了吧。
但是一定記??!這個優(yōu)化取決于使用所有可用的 CPU。如果一些其它的 CPU 需要執(zhí)行在 gunicorn
中的其它事務(wù)呢?這時,這些其它進(jìn)程必須等待,直到有 CPU 可用。由于在這個服務(wù)器上有其他的事務(wù)正在進(jìn)行,我不是很確定我想要在進(jìn)程中接管所有其他 CPU。
結(jié)論
一步一步地做這個任務(wù)的這個過程感覺挺好的。你被限制在一個 CPU 上但是表現(xiàn)仍然特別好。同樣地,一定要看看在f1
和 f2
兩段代碼之間的不同之處!利用 concurrent.futures
池類你可以獲取到允許使用的 CPU 的個數(shù),但是這樣做同樣給人感覺不是很好。如果你在虛擬環(huán)境中獲取的個數(shù)是錯的呢?或者可用的個數(shù)太低以致無法從負(fù)載分配獲取好處并且現(xiàn)在你僅僅是為了移動負(fù)載而支付營運開支呢?
我將會繼續(xù)使用 zipfile.ZipFile(file_buffer).extractall(temp_dir)
。這個工作這樣做已經(jīng)足夠好了。
想試試手嗎?
我使用一個 c5.4xlarge
EC2 服務(wù)器來進(jìn)行我的基準(zhǔn)測試。文件可以從此處下載:
wget https://www.peterbe.com/unzip-in-parallel/hack.unzip-in-parallel.py
wget https://www.peterbe.com/unzip-in-parallel/symbols-2017-11-27T14_15_30.zip
這里的 .zip
文件有 34MB。和在服務(wù)器上的相比已經(jīng)小了很多。
hack.unzip-in-parallel.py
文件里是一團(tuán)糟。它包含了大量可怕的修正和丑陋的代碼,但是這只是一個開始。