一篇文章帶你了解Python的分布式進程接口
一、前言
在Thread和Process中,應(yīng)當優(yōu)選Process,因為Process更穩(wěn)定,而且,Process可以分布到多臺機器上,而Thread最多只能分布到同一臺機器的多個CPU上。
Python的multiprocessing模塊不但支持多進程,其中managers子模塊還支持把多進程分布到多臺機器上。可以寫一個服務(wù)進程作為調(diào)度者,將任務(wù)分布到其他多個進程中,依靠網(wǎng)絡(luò)通信進行管理。
二、案例分析
在做爬蟲程序時,抓取某個網(wǎng)站的所有圖片,如果使用多進程的話,一般是一個進程負責抓取圖片的鏈接地址,將鏈接地址放到queue中,另外的進程負責 從queue中取鏈接地址進行下載和存儲到本地。
怎么用分布式進程實現(xiàn)?
一臺機器上的進程負責抓取鏈接地址,其他機器上的進程負責系在存儲。那么遇到的主要問題是將queue 暴露到網(wǎng)絡(luò)中,讓其他機器進程都可以訪問,分布式進程就是將這個過程進行了封裝,可以將這個過程稱為本地隊列的網(wǎng)絡(luò)化。
例:
1.py
- from multiprocessing.managers import BaseManager
- from multiprocessing import freeze_support, Queue
- # 任務(wù)個數(shù)
- task_number = 10
- # 收發(fā)隊列
- task_quue = Queue(task_number)
- result_queue = Queue(task_number)
- def get_task():
- return task_quue
- def get_result():
- return result_queue
- # 創(chuàng)建類似的queueManager
- class QueueManager(BaseManager):
- pass
- def win_run():
- # 注冊在網(wǎng)絡(luò)上,callable 關(guān)聯(lián)了Queue 對象
- # 將Queue對象在網(wǎng)絡(luò)中暴露
- # window下綁定調(diào)用接口不能直接使用lambda,所以只能先定義函數(shù)再綁定
- QueueManager.register('get_task_queue', callable=get_task)
- QueueManager.register('get_result_queue', callable=get_result)
- # 綁定端口和設(shè)置驗證口令
- manager = QueueManager(address=('127.0.0.1', 8001), authkey='qiye'.encode())
- # 啟動管理,監(jiān)聽信息通道
- manager.start()
- try:
- # 通過網(wǎng)絡(luò)獲取任務(wù)隊列和結(jié)果隊列
- task = manager.get_task_queue()
- result = manager.get_result_queue()
- # 添加任務(wù)
- for url in ["ImageUrl_" + str(i) for i in range(10)]:
- print('url is %s' % url)
- task.put(url)
- print('try get result')
- for i in range(10):
- print('result is %s' % result.get(timeout=10))
- except:
- print('Manager error')
- finally:
- manager.shutdown()
- if __name__ == '__main__':
- freeze_support()
- win_run()
連接服務(wù)器,端口和驗證口令注意保持與服務(wù)器進程中完全一致從網(wǎng)絡(luò)獲取Queue,進行本地化,從task隊列獲取任務(wù),并且把結(jié)果寫入result隊列
2.py
- #coding:utf-8
- import time
- from multiprocessing.managers import BaseManager
- # 創(chuàng)建類似的Manager:
- class Manager(BaseManager):
- pass
- #使用QueueManager注冊獲取Queue的方法名稱
- Manager.register('get_task_queue')
- Manager.register('get_result_queue')
- #連接到服務(wù)器:
- server_addr = '127.0.0.1'
- print('Connect to server %s...' % server_addr)
- # 端口和驗證口令注意保持與服務(wù)進程設(shè)置的完全一致:
- m = Manager(address=(server_addr, 8001), authkey='qiye')
- # 從網(wǎng)絡(luò)連接:
- m.connect()
- #獲取Queue的對象:
- task = m.get_task_queue()
- result = m.get_result_queue()
- #從task隊列取任務(wù),并把結(jié)果寫入result隊列:
- while(not task.empty()):
- image_url = task.get(True,timeout=5)
- print('run task download %s...' % image_url)
- time.sleep(1)
- result.put('%s--->success'%image_url)
- #結(jié)束:
- print('worker exit.')
任務(wù)進程要通過網(wǎng)絡(luò)連接到服務(wù)進程,所以要指定服務(wù)進程的IP。
運行結(jié)果如下:
獲取圖片地址,將地址傳到2.py。
接收1.py傳遞的地址,進行圖片的下載,控制臺顯示爬取結(jié)果。
三、總結(jié)
本文基于Python基礎(chǔ),Python的分布式進程接口簡單,封裝良好,適合需要把繁重任務(wù)分布到多臺機器的環(huán)境下。通過講解Queue的作用是用來傳遞任務(wù)和接收結(jié)果。
歡迎大家積極嘗試,有時候看到別人實現(xiàn)起來很簡單,但是到自己動手實現(xiàn)的時候,總會有各種各樣的問題,切勿眼高手低,勤動手,才可以理解的更加深刻。