代碼詳解Python多線程、多進(jìn)程、協(xié)程
一、前言
很多時(shí)候我們寫了一個(gè)爬蟲,實(shí)現(xiàn)了需求后會(huì)發(fā)現(xiàn)了很多值得改進(jìn)的地方,其中很重要的一點(diǎn)就是爬取速度。本文就通過代碼講解如何使用多進(jìn)程、多線程、協(xié)程來提升爬取速度。注意:我們不深入介紹理論和原理,一切都在代碼中。
二、同步
首先我們寫一個(gè)簡(jiǎn)化的爬蟲,對(duì)各個(gè)功能細(xì)分,有意識(shí)進(jìn)行函數(shù)式編程。下面代碼的目的是訪問300次百度頁面并返回狀態(tài)碼,其中parse_1函數(shù)可以設(shè)定循環(huán)次數(shù),每次循環(huán)將當(dāng)前循環(huán)數(shù)(從0開始)和url傳入parse_2函數(shù)。
import requests def parse_1(): url = 'https://www.baidu.com' for i in range(300): parse_2(url) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
性能的消耗主要在IO請(qǐng)求中,當(dāng)單進(jìn)程單線程模式下請(qǐng)求URL時(shí)必然會(huì)引起等待
示例代碼就是典型的串行邏輯,parse_1將url和循環(huán)數(shù)傳遞給parse_2,parse_2請(qǐng)求并返回狀態(tài)碼后parse_1繼續(xù)迭代一次,重復(fù)之前步驟
三、多線程
因?yàn)镃PU在執(zhí)行程序時(shí)每個(gè)時(shí)間刻度上只會(huì)存在一個(gè)線程,因此多線程實(shí)際上提高了進(jìn)程的使用率從而提高了CPU的使用率
實(shí)現(xiàn)多線程的庫有很多,這里用concurrent.futures中的ThreadPoolExecutor來演示。介紹ThreadPoolExecutor庫是因?yàn)樗啾绕渌麕齑a更簡(jiǎn)潔
為了方便說明問題,下面代碼中如果是新增加的部分,代碼行前會(huì)加上 > 符號(hào)便于觀察說明問題,實(shí)際運(yùn)行需要去掉
import requests > from concurrent.futures import ThreadPoolExecutor def parse_1(): url = 'https://www.baidu.com' # 建立線程池 > pool = ThreadPoolExecutor(6) for i in range(300): > pool.submit(parse_2, url) > pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
跟同步相對(duì)的就是異步。異步就是彼此獨(dú)立,在等待某事件的過程中繼續(xù)做自己的事,不需要等待這一事件完成后再工作。線程就是實(shí)現(xiàn)異步的一個(gè)方式,也就是說多線程是異步處理異步就意味著不知道處理結(jié)果,有時(shí)候我們需要了解處理結(jié)果,就可以采用回調(diào)
import requests from concurrent.futures import ThreadPoolExecutor # 增加回調(diào)函數(shù) > def callback(future): > print(future.result()) def parse_1(): url = 'https://www.baidu.com' pool = ThreadPoolExecutor(6) for i in range(300): > results = pool.submit(parse_2, url) # 回調(diào)的關(guān)鍵步驟 > results.add_done_callback(callback) pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
Python實(shí)現(xiàn)多線程有一個(gè)無數(shù)人詬病的GIL(全局解釋器鎖),但多線程對(duì)于爬取網(wǎng)頁這種多數(shù)屬于IO密集型的任務(wù)依舊很合適。
四、多進(jìn)程
多進(jìn)程用兩個(gè)方法實(shí)現(xiàn):ProcessPoolExecutor和multiprocessing
1. ProcessPoolExecutor
和實(shí)現(xiàn)多線程的ThreadPoolExecutor類似
import requests > from concurrent.futures import ProcessPoolExecutor def parse_1(): url = 'https://www.baidu.com' # 建立線程池 > pool = ProcessPoolExecutor(6) for i in range(300): > pool.submit(parse_2, url) > pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
可以看到改動(dòng)了兩次類名,代碼依舊很簡(jiǎn)潔,同理也可以添加回調(diào)函數(shù)
import requests from concurrent.futures import ProcessPoolExecutor > def callback(future): > print(future.result()) def parse_1(): url = 'https://www.baidu.com' pool = ProcessPoolExecutor(6) for i in range(300): > results = pool.submit(parse_2, url) > results.add_done_callback(callback) pool.shutdown(wait=True) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
2. multiprocessing
直接看代碼,一切都在注釋中。
import requests > from multiprocessing import Pool def parse_1(): url = 'https://www.baidu.com' # 建池 > pool = Pool(processes=5) # 存放結(jié)果 > res_lst = [] for i in range(300): # 把任務(wù)加入池中 > res = pool.apply_async(func=parse_2, args=(url,)) # 獲取完成的結(jié)果(需要取出) > res_lst.append(res) # 存放最終結(jié)果(也可以直接存儲(chǔ)或者print) > good_res_lst = [] > for res in res_lst: # 利用get獲取處理后的結(jié)果 > good_res = res.get() # 判斷結(jié)果的好壞 > if good_res: > good_res_lst.append(good_res) # 關(guān)閉和等待完成 > pool.close() > pool.join() def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
可以看到multiprocessing庫的代碼稍繁瑣,但支持更多的拓展。多進(jìn)程和多線程確實(shí)能夠達(dá)到加速的目的,但如果遇到IO阻塞會(huì)出現(xiàn)線程或者進(jìn)程的浪費(fèi),因此有一個(gè)更好的方法……
五、異步非阻塞
協(xié)程+回調(diào)配合動(dòng)態(tài)協(xié)作就可以達(dá)到異步非阻塞的目的,本質(zhì)只用了一個(gè)線程,所以很大程度利用了資源
實(shí)現(xiàn)異步非阻塞經(jīng)典是利用asyncio庫+yield,為了方便利用逐漸出現(xiàn)了更上層的封裝 aiohttp,要想更好的理解異步非阻塞最好還是深入了解asyncio庫。而gevent是一個(gè)非常方便實(shí)現(xiàn)協(xié)程的庫
import requests > from gevent import monkey # 猴子補(bǔ)丁是協(xié)作運(yùn)行的靈魂 > monkey.patch_all() > import gevent def parse_1(): url = 'https://www.baidu.com' # 建立任務(wù)列表 > tasks_list = [] for i in range(300): > task = gevent.spawn(parse_2, url) > tasks_list.append(task) > gevent.joinall(tasks_list) def parse_2(url): response = requests.get(url) print(response.status_code) if __name__ == '__main__': parse_1()
gevent能很大提速,也引入了新的問題:如果我們不想速度太快給服務(wù)器造成太大負(fù)擔(dān)怎么辦?如果是多進(jìn)程多線程的建池方法,可以控制池內(nèi)數(shù)量。如果用gevent想要控制速度也有一個(gè)不錯(cuò)的方法:建立隊(duì)列。gevent中也提供了Quene類,下面代碼改動(dòng)較大
import requests from gevent import monkey monkey.patch_all() import gevent > from gevent.queue import Queue def parse_1(): url = 'https://www.baidu.com' tasks_list = [] # 實(shí)例化隊(duì)列 > quene = Queue() for i in range(300): # 全部url壓入隊(duì)列 > quene.put_nowait(url) # 兩路隊(duì)列 > for _ in range(2): > task = gevent.spawn(parse_2) > tasks_list.append(task) gevent.joinall(tasks_list) # 不需要傳入?yún)?shù),都在隊(duì)列中 > def parse_2(): # 循環(huán)判斷隊(duì)列是否為空 > while not quene.empty(): # 彈出隊(duì)列 > url = quene.get_nowait() response = requests.get(url) # 判斷隊(duì)列狀態(tài) > print(quene.qsize(), response.status_code) if __name__ == '__main__': parse_1()
結(jié)束語
以上就是幾種常用的加速方法。如果對(duì)代碼測(cè)試感興趣可以利用time模塊判斷運(yùn)行時(shí)間。爬蟲的加速是重要技能,但適當(dāng)控制速度也是爬蟲工作者的良好習(xí)慣,不要給服務(wù)器太大壓力,拜拜~