Python網(wǎng)絡(luò)爬蟲(chóng)的同步和異步
一、同步與異步
- #同步編程(同一時(shí)間只能做一件事,做完了才能做下一件事情)
- <-a_url-><-b_url-><-c_url->
- #異步編程 (可以近似的理解成同一時(shí)間有多個(gè)事情在做,但有先后)
- <-a_url->
- <-b_url->
- <-c_url->
- <-d_url->
- <-e_url->
- <-f_url->
- <-g_url->
- <-h_url->
- <--i_url-->
- <--j_url-->
模板
- import asyncio
- #函數(shù)名:做現(xiàn)在的任務(wù)時(shí)不等待,能繼續(xù)做別的任務(wù)。
- async def donow_meantime_dontwait(url):
- response = await requests.get(url)
- #函數(shù)名:快速高效的做任務(wù)
- async def fast_do_your_thing():
- await asyncio.wait([donow_meantime_dontwait(url) for url in urls])
- #下面兩行都是套路,記住就好
- loop = asyncio.get_event_loop()
- loop.run_until_complete(fast_do_your_thing())
tips:
await表達(dá)式中的對(duì)象必須是awaitable
requests不支持非阻塞
aiohttp是用于異步請(qǐng)求的庫(kù)
代碼
- import asyncio
- import requests
- import time
- import aiohttp
- urls = ['https://book.douban.com/tag/小說(shuō)','https://book.douban.com/tag/科幻',
- 'https://book.douban.com/tag/漫畫(huà)','https://book.douban.com/tag/奇幻',
- 'https://book.douban.com/tag/歷史','https://book.douban.com/tag/經(jīng)濟(jì)學(xué)']
- async def requests_meantime_dont_wait(url):
- print(url)
- async with aiohttp.ClientSession() as session:
- async with session.get(url) as resp:
- print(resp.status)
- print("{url} 得到響應(yīng)".format(url=url))
- async def fast_requsts(urls):
- start = time.time()
- await asyncio.wait([requests_meantime_dont_wait(url) for url in urls])
- end = time.time()
- print("Complete in {} seconds".format(end - start))
- loop = asyncio.get_event_loop()
- loop.run_until_complete(fast_requsts(urls))
gevent簡(jiǎn)介
gevent是一個(gè)python的并發(fā)庫(kù),它為各種并發(fā)和網(wǎng)絡(luò)相關(guān)的任務(wù)提供了整潔的API。
gevent中用到的主要模式是greenlet,它是以C擴(kuò)展模塊形式接入Python的輕量級(jí)協(xié)程。 greenlet全部運(yùn)行在主程序操作系統(tǒng)進(jìn)程的內(nèi)部,但它們被協(xié)作式地調(diào)度。
猴子補(bǔ)丁
requests庫(kù)是阻塞式的,為了將requests同步更改為異步。只有將requests庫(kù)阻塞式更改為非阻塞,異步操作才能實(shí)現(xiàn)。
而gevent庫(kù)中的猴子補(bǔ)?。╩onkey patch),gevent能夠修改標(biāo)準(zhǔn)庫(kù)里面大部分的阻塞式系統(tǒng)調(diào)用。這樣在不改變?cè)写a的情況下,將應(yīng)用的阻塞式方法,變成協(xié)程式的(異步)。
代碼
- from gevent import monkey
- import gevent
- import requests
- import time
- monkey.patch_all()
- def req(url):
- print(url)
- resp = requests.get(url)
- print(resp.status_code,url)
- def synchronous_times(urls):
- """同步請(qǐng)求運(yùn)行時(shí)間"""
- start = time.time()
- for url in urls:
- req(url)
- end = time.time()
- print('同步執(zhí)行時(shí)間 {} s'.format(end-start))
- def asynchronous_times(urls):
- """異步請(qǐng)求運(yùn)行時(shí)間"""
- start = time.time()
- gevent.joinall([gevent.spawn(req,url) for url in urls])
- end = time.time()
- print('異步執(zhí)行時(shí)間 {} s'.format(end - start))
- urls = ['https://book.douban.com/tag/小說(shuō)','https://book.douban.com/tag/科幻',
- 'https://book.douban.com/tag/漫畫(huà)','https://book.douban.com/tag/奇幻',
- 'https://book.douban.com/tag/歷史','https://book.douban.com/tag/經(jīng)濟(jì)學(xué)']
- synchronous_times(urls)
- asynchronous_times(urls)
gevent:異步理論與實(shí)戰(zhàn)
gevent庫(kù)中使用的最核心的是Greenlet-一種用C寫(xiě)的輕量級(jí)python模塊。在任意時(shí)間,系統(tǒng)只能允許一個(gè)Greenlet處于運(yùn)行狀態(tài)
一個(gè)greenlet遇到IO操作時(shí),比如訪問(wèn)網(wǎng)絡(luò),就自動(dòng)切換到其他的greenlet,等到IO操作完成,再在適當(dāng)?shù)臅r(shí)候切換回來(lái)繼續(xù)執(zhí)行。由于IO操作非常耗時(shí),經(jīng)常使程序處于等待狀態(tài),有了gevent為我們自動(dòng)切換協(xié)程,就保證總有g(shù)reenlet在運(yùn)行,而不是等待IO。
串行和異步
高并發(fā)的核心是讓一個(gè)大的任務(wù)分成一批子任務(wù),并且子任務(wù)會(huì)被被系統(tǒng)高效率的調(diào)度,實(shí)現(xiàn)同步或者異步。在兩個(gè)子任務(wù)之間切換,也就是經(jīng)常說(shuō)到的上下文切換。
同步就是讓子任務(wù)串行,而異步有點(diǎn)影分身之術(shù),但在任意時(shí)間點(diǎn),真身只有一個(gè),子任務(wù)并不是真正的并行,而是充分利用了碎片化的時(shí)間,讓程序不要浪費(fèi)在等待上。這就是異步,效率杠桿的。
gevent中的上下文切換是通過(guò)yield實(shí)現(xiàn)。在這個(gè)例子中,我們會(huì)有兩個(gè)子任務(wù),互相利用對(duì)方等待的時(shí)間做自己的事情。這里我們使用gevent.sleep(0)代表程序會(huì)在這里停0秒。
- import gevent
- def foo():
- print('Running in foo')
- gevent.sleep(0)
- print('Explicit context switch to foo again')
- def bar():
- print('Explicit context to bar')
- gevent.sleep(0)
- print('Implicit context switch back to bar')
- gevent.joinall([
- gevent.spawn(foo),
- gevent.spawn(bar)
- ])
運(yùn)行的順序:
- Running in foo
- Explicit context to bar
- Explicit context switch to foo again
- Implicit context switch back to bar
同步異步的順序問(wèn)題
同步運(yùn)行就是串行,123456...,但是異步的順序是隨機(jī)的任意的(根據(jù)子任務(wù)消耗的時(shí)間而定)
代碼
- import gevent
- import random
- def task(pid):
- """
- Some non-deterministic task
- """
- gevent.sleep(random.randint(0,2)*0.001)
- print('Task %s done' % pid)
- #同步(結(jié)果更像串行)
- def synchronous():
- for i in range(1,10):
- task(i)
- #異步(結(jié)果更像亂步)
- def asynchronous():
- threads = [gevent.spawn(task, i) for i in range(10)]
- gevent.joinall(threads)
- print('Synchronous同步:')
- synchronous()
- print('Asynchronous異步:')
- asynchronous()
輸出
Synchronous同步:
- Task 1 done
- Task 2 done
- Task 3 done
- Task 4 done
- Task 5 done
- Task 6 done
- Task 7 done
- Task 8 done
- Task 9 done
Asynchronous異步:
- Task 1 done
- Task 5 done
- Task 6 done
- Task 2 done
- Task 4 done
- Task 7 done
- Task 8 done
- Task 9 done
- Task 0 done
- Task 3 done
同步案例中所有的任務(wù)都是按照順序執(zhí)行,這導(dǎo)致主程序是阻塞式的(阻塞會(huì)暫停主程序的執(zhí)行)。
gevent.spawn會(huì)對(duì)傳入的任務(wù)(子任務(wù)集合)進(jìn)行進(jìn)行調(diào)度,gevent.joinall方法會(huì)阻塞當(dāng)前程序,除非所有的greenlet都執(zhí)行完畢,程序才會(huì)結(jié)束。
實(shí)戰(zhàn)
實(shí)現(xiàn)gevent到底怎么用,把異步訪問(wèn)得到的數(shù)據(jù)提取出來(lái)。
在有道詞典搜索框輸入“hello”按回車。觀察數(shù)據(jù)請(qǐng)求情況 觀察有道的url構(gòu)建。
分析url規(guī)律
- #url構(gòu)建只需要傳入word即可
- url = "http://dict.youdao.com/w/eng/{}/".format(word)
解析網(wǎng)頁(yè)數(shù)據(jù)
- def fetch_word_info(word):
- url = "http://dict.youdao.com/w/eng/{}/".format(word)
- resp = requests.get(url,headers=headers)
- doc = pq(resp.text)
- pros = ''
- for pro in doc.items('.baav .pronounce'):
- pros+=pro.text()
- description = ''
- for li in doc.items('#phrsListTab .trans-container ul li'):
- description +=li.text()
- return {'word':word,'音標(biāo)':pros,'注釋':description}
因?yàn)閞equests庫(kù)在任何時(shí)候只允許有一個(gè)訪問(wèn)結(jié)束完全結(jié)束后,才能進(jìn)行下一次訪問(wèn)。無(wú)法通過(guò)正規(guī)途徑拓展成異步,因此這里使用了monkey補(bǔ)丁
同步代碼
- import requests
- from pyquery import PyQuery as pq
- import gevent
- import time
- import gevent.monkey
- gevent.monkey.patch_all()
- words = ['good','bad','cool',
- 'hot','nice','better',
- 'head','up','down',
- 'right','left','east']
- def synchronous():
- start = time.time()
- print('同步開(kāi)始了')
- for word in words:
- print(fetch_word_info(word))
- end = time.time()
- print("同步運(yùn)行時(shí)間: %s 秒" % str(end - start))
- #執(zhí)行同步
- synchronous()
異步代碼
- import requests
- from pyquery import PyQuery as pq
- import gevent
- import time
- import gevent.monkey
- gevent.monkey.patch_all()
- words = ['good','bad','cool',
- 'hot','nice','better',
- 'head','up','down',
- 'right','left','east']
- def asynchronous():
- start = time.time()
- print('異步開(kāi)始了')
- events = [gevent.spawn(fetch_word_info,word) for word in words]
- wordinfos = gevent.joinall(events)
- for wordinfo in wordinfos:
- #獲取到數(shù)據(jù)get方法
- print(wordinfo.get())
- end = time.time()
- print("異步運(yùn)行時(shí)間: %s 秒"%str(end-start))
- #執(zhí)行異步
- asynchronous()
我們可以對(duì)待爬網(wǎng)站實(shí)時(shí)異步訪問(wèn),速度會(huì)大大提高。我們現(xiàn)在是爬取12個(gè)詞語(yǔ)的信息,也就是說(shuō)一瞬間我們對(duì)網(wǎng)站訪問(wèn)了12次,這還沒(méi)啥問(wèn)題,假如爬10000+個(gè)詞語(yǔ),使用gevent的話,那幾秒鐘之內(nèi)就給網(wǎng)站一股腦的發(fā)請(qǐng)求,說(shuō)不定網(wǎng)站就把爬蟲(chóng)封了。
解決辦法
將列表等分為若干個(gè)子列表,分批爬取。舉例我們有一個(gè)數(shù)字列表(0-19),要均勻的等分為4份,也就是子列表有5個(gè)數(shù)。下面是我在stackoverflow查找到的列表等分方案:
方法1
- seqence = list(range(20))
- size = 5 #子列表長(zhǎng)度
- output = [seqence[i:i+size] for i in range(0, len(seqence), size)]
- print(output)
方法2
- chunks = lambda seq, size: [seq[i: i+size] for i in range(0, len(seq), size)]
- print(chunks(seq, 5))
方法3
- def chunks(seq,size):
- for i in range(0,len(seq), size):
- yield seq[i:i+size]
- prinT(chunks(seq,5))
- for x in chunks(req,5):
- print(x)
數(shù)據(jù)量不大的情況下,選哪一種方法都可以。如果特別大,建議使用方法3.
動(dòng)手實(shí)現(xiàn)
- import requests
- from pyquery import PyQuery as pq
- import gevent
- import time
- import gevent.monkey
- gevent.monkey.patch_all()
- words = ['good','bad','cool',
- 'hot','nice','better',
- 'head','up','down',
- 'right','left','east']
- def fetch_word_info(word):
- url = "http://dict.youdao.com/w/eng/{}/".format(word)
- resp = requests.get(url,headers=headers)
- doc = pq(resp.text)
- pros = ''
- for pro in doc.items('.baav .pronounce'):
- pros+=pro.text()
- description = ''
- for li in doc.items('#phrsListTab .trans-container ul li'):
- description +=li.text()
- return {'word':word,'音標(biāo)':pros,'注釋':description}
- def asynchronous(words):
- start = time.time()
- print('異步開(kāi)始了')
- chunks = lambda seq, size: [seq[i: i + size] for i in range(0, len(seq), size)]
- for subwords in chunks(words,3):
- events = [gevent.spawn(fetch_word_info, word) for word in subwords]
- wordinfos = gevent.joinall(events)
- for wordinfo in wordinfos:
- # 獲取到數(shù)據(jù)get方法
- print(wordinfo.get())
- time.sleep(1)
- end = time.time()
- print("異步運(yùn)行時(shí)間: %s 秒" % str(end - start))
- asynchronous(words)