如何用最快的方式發(fā)送 10 萬個 HTTP 請求
假如有一個文件,里面有 10 萬個 url,需要對每個 url 發(fā)送 http 請求,并打印請求結(jié)果的狀態(tài)碼,如何編寫代碼盡可能快的完成這些任務呢?
Python 并發(fā)編程有很多方法,多線程的標準庫 threading,concurrency,協(xié)程 asyncio,當然還有 grequests 這種異步庫,每一個都可以實現(xiàn)上述需求,下面一一用代碼實現(xiàn)一下,本文的代碼可以直接運行,給你以后的并發(fā)編程作為參考:
隊列+多線程
定義一個大小為 400 的隊列,然后開啟 200 個線程,每個線程都是不斷的從隊列中獲取 url 并訪問。
主線程讀取文件中的 url 放入隊列中,然后等待隊列中所有的元素都被接收和處理完畢。代碼如下:
- from threading import Thread
- import sys
- from queue import Queue
- import requests
- concurrent = 200
- def doWork():
- while True:
- url = q.get()
- status, url = getStatus(url)
- doSomethingWithResult(status, url)
- q.task_done()
- def getStatus(ourl):
- try:
- res = requests.get(ourl)
- return res.status_code, ourl
- except:
- return "error", ourl
- def doSomethingWithResult(status, url):
- print(status, url)
- q = Queue(concurrent * 2)
- for i in range(concurrent):
- t = Thread(target=doWork)
- t.daemon = True
- t.start()
- try:
- for url in open("urllist.txt"):
- q.put(url.strip())
- q.join()
- except KeyboardInterrupt:
- sys.exit(1)
運行結(jié)果如下:
有沒有 get 到新技能?
線程池
如果你使用線程池,推薦使用更高級的 concurrent.futures 庫:
- import concurrent.futures
- import requests
- out = []
- CONNECTIONS = 100
- TIMEOUT = 5
- urls = []
- with open("urllist.txt") as reader:
- for url in reader:
- urls.append(url.strip())
- def load_url(url, timeout):
- ans = requests.get(url, timeout=timeout)
- return ans.status_code
- with concurrent.futures.ThreadPoolExecutor(max_workers=CONNECTIONS) as executor:
- future_to_url = (executor.submit(load_url, url, TIMEOUT) for url in urls)
- for future in concurrent.futures.as_completed(future_to_url):
- try:
- data = future.result()
- except Exception as exc:
- data = str(type(exc))
- finally:
- out.append(data)
- print(data)
協(xié)程 + aiohttp
協(xié)程也是并發(fā)非常常用的工具了:
- import asyncio
- from aiohttp import ClientSession, ClientConnectorError
- async def fetch_html(url: str, session: ClientSession, **kwargs) -> tuple:
- try:
- resp = await session.request(method="GET", url=url, **kwargs)
- except ClientConnectorError:
- return (url, 404)
- return (url, resp.status)
- async def make_requests(urls: set, **kwargs) -> None:
- async with ClientSession() as session:
- tasks = []
- for url in urls:
- tasks.append(
- fetch_html(url=url, session=session, **kwargs)
- )
- results = await asyncio.gather(*tasks)
- for result in results:
- print(f'{result[1]} - {str(result[0])}')
- if __name__ == "__main__":
- import sys
- assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
- with open("urllist.txt") as infile:
- urls = set(map(str.strip, infile))
- asyncio.run(make_requests(urls=urls))
grequests[1]
這是個第三方庫,目前有 3.8K 個星,就是 Requests + Gevent[2],讓異步 http 請求變得更加簡單。Gevent 的本質(zhì)還是協(xié)程。
使用前:
- pip install grequests
使用起來那是相當?shù)暮唵危?/p>
- import grequests
- urls = []
- with open("urllist.txt") as reader:
- for url in reader:
- urls.append(url.strip())
- rs = (grequests.get(u) for u in urls)
- for result in grequests.map(rs):
- print(result.status_code, result.url)
注意 grequests.map(rs) 是并發(fā)執(zhí)行的。運行結(jié)果如下:
也可以加入異常處理:
- >>> def exception_handler(request, exception):
- ... print("Request failed")
- >>> reqs = [
- ... grequests.get('http://httpbin.org/delay/1', timeout=0.001),
- ... grequests.get('http://fakedomain/'),
- ... grequests.get('http://httpbin.org/status/500')]
- >>> grequests.map(reqs, exception_handler=exception_handler)
- Request failed
- Request failed
- [None, None, <Response [500]>]
最后的話
今天分享了并發(fā) http 請求的幾種實現(xiàn)方式,有人說異步(協(xié)程)性能比多線程好,其實要分場景看的,沒有一種方法適用所有的場景,筆者就曾做過一個實驗,也是請求 url,當并發(fā)數(shù)量超過 500 時,協(xié)程明顯變慢。