Python 升級(jí)之路( Lv29 ) 并發(fā)編程三劍客(上)
大家好,我是了不起。
今天我們將學(xué)習(xí)并發(fā)編程三劍客之一的進(jìn)程, 了解其創(chuàng)建方式, 通信方式以及管理方式。
今日冒險(xiǎn)片段上:
走出暗影迷宮后, 二人便來(lái)到了暗精靈圣地熔巖穴, 熔巖穴與火焰圣地卡崖克火山的地脈相連, 據(jù)說(shuō)很久前,那里曾出現(xiàn)過(guò)一種怪異的炙熱熔巖,它不僅融化了各種生命體,還使融化后的巖漿變成了有生命的怪物. 現(xiàn)在地底壓力噴出的熔巖,使那里的盜尸者和僵尸出現(xiàn)了奇怪的異變.??而了不起他們也在小心翼翼的躲避著這些怪物, 并同時(shí)注意腳下隨時(shí)可能噴薄而出的火山熔巖. 但是在即將通過(guò)時(shí), 還是被這片區(qū)域的領(lǐng)主所阻攔. 而且領(lǐng)主不只有一個(gè), 而是有三個(gè), 分別是:泰坦, 歌利亞, 阿特拉斯. 這三個(gè)是熔巖生命體, 不僅身形巨大, 而且身體外表溫度很高. 這讓二人犯了難...
一、進(jìn)程
書寫上文, 我們可以知道: 進(jìn)程是資源(CPU、內(nèi)存等)分配的基本單位,它是程序執(zhí)行時(shí)的一個(gè)實(shí)例. 程序運(yùn)行時(shí)系統(tǒng)就會(huì)創(chuàng)建一個(gè)進(jìn)程,并為它分配資源,然后把該進(jìn)程放入進(jìn)程就緒隊(duì)列,進(jìn)程調(diào)度器選中它的時(shí)候就會(huì)為它分配CPU時(shí)間,程序開始真正運(yùn)行. 進(jìn)程切換需要的資源很最大,效率低。
二、創(chuàng)建方式
進(jìn)程創(chuàng)建有兩種方式:方法包裝和類包裝。
熟悉Java的人可能會(huì)發(fā)現(xiàn), 類包裝更符合我們?cè)瓉?lái)的書寫習(xí)慣創(chuàng)建進(jìn)程后, 我們使用start() 來(lái)啟動(dòng)進(jìn)程。
1. 類包裝
(1) 主要步驟:
- 定義一個(gè)進(jìn)程類, 并修改初始化構(gòu)造, 改為有參構(gòu)造
- 創(chuàng)建進(jìn)程時(shí), 傳入初始化方法中添加的參數(shù)即可
(2) 實(shí)操代碼
import time
from multiprocessing import Process
class MyProcess(Process):
"""進(jìn)程的創(chuàng)建方式: 2.類包裝"""
def __init__(self, name):
Process.__init__(self)
self.name = name
def run(self):
print(f"進(jìn)程{self.name} 啟動(dòng)")
time.sleep(3)
print(f"進(jìn)程{self.name} 結(jié)束")
if __name__ == "__main__":
print("創(chuàng)建進(jìn)程")
p1 = MyProcess("p1")
p2 = MyProcess("p2")
p1.start()
p2.start()
(3) 執(zhí)行結(jié)果
2. 方法包裝
(1) 主要步驟
在創(chuàng)建進(jìn)程時(shí): 已默認(rèn)值參數(shù)的方式聲明目標(biāo)函數(shù), 以及傳入目標(biāo)函數(shù)的參數(shù)(元組的方式)
(2) 實(shí)操代碼
import os
import time
from multiprocessing import Process
def function(name):
"""進(jìn)程的創(chuàng)建方式: 1.方法包裝"""
print("當(dāng)前進(jìn)程ID:", os.getpid())
print("父進(jìn)程ID", os.getppid())
print(f"Process:{name} start")
time.sleep(3)
print(f"Process:{name} end")
if __name__ == "__main__":
print("當(dāng)前main進(jìn)程ID: ", os.getppid())
# 創(chuàng)建進(jìn)程
p1 = Process(target=function, args=("p1",))
p2 = Process(target=function, args=("p2",))
p1.start()
p2.start()
(3) 執(zhí)行結(jié)果
(4) 多說(shuō)一句
元組中如果只有一個(gè)元素, 是需要加逗號(hào)的!!! 這是因?yàn)槔ㄌ?hào)( )既可以表示tuple,又可以表示數(shù)學(xué)公式中的小括號(hào), 所以如果沒有加逗號(hào),那你里面放什么類型的數(shù)據(jù)那么類型就會(huì)是什么。
三、進(jìn)程間通信方式
進(jìn)程間通信有兩種方式:Queue隊(duì)列和Pipe管道方式。
1. Queue隊(duì)列
要實(shí)現(xiàn)進(jìn)程間通信,需要使用 multiprocessing 模塊中的 Queue 類。
進(jìn)程間通信的方式,就是使用了操作系統(tǒng)給開辟的一個(gè)隊(duì)列空間,各個(gè)進(jìn)程可以把數(shù)據(jù)放到該隊(duì)列中,當(dāng)然也可以從隊(duì)列中把自己需要的信息取走。
實(shí)現(xiàn)核心:
- 這里利用類包裝的方式, 并且添加了一個(gè)參數(shù)mq
- 主函數(shù)聲明一個(gè)Queue隊(duì)列, 放入需要通信的消息
- 在需要調(diào)用時(shí), 利用mq,get 獲取當(dāng)前進(jìn)程所傳入的值
實(shí)操代碼:
from multiprocessing import Process, Queue
class MyProcess(Process):
def __init__(self, name, mq):
Process.__init__(self)
self.name = name
self.mq = mq
def run(self):
print("Process {} started".format(self.name))
print("===Queue", self.mq.get(), "===")
self.mq.put(self.name)
print("Process {} end".format(self.name))
if __name__ == "__main__":
# 創(chuàng)建進(jìn)程列表
t_list = []
mq = Queue()
mq.put("1")
mq.put("2")
mq.put("3")
# 利用range序列重復(fù)創(chuàng)建進(jìn)程
for i in range(3):
t = MyProcess("p{}".format(i), mq)
t.start()
t_list.append(t)
# 等待進(jìn)程結(jié)束
for t in t_list:
t.join()
print(mq.get())
print(mq.get())
print(mq.get())
執(zhí)行結(jié)果:
2. Pipe管道
Pipe 直譯過(guò)來(lái)的意思是“管”或“管道”,和實(shí)際生活中的管(管道)是非常類似的. Pipe方法返回(conn1, conn2)代表一個(gè)管道的兩個(gè)端.
- Pipe方法有duplex參數(shù),如果duplex參數(shù)為True(默認(rèn)值),那么這個(gè)參數(shù)是全雙工模式,也就是說(shuō)conn1和conn2均可收發(fā)
- 若duplex為False,conn1只負(fù)責(zé)接收消息,conn2只負(fù)責(zé)發(fā)送消息. send和recv方法分別是發(fā)送和接受消息的方法. 例如,在全雙工模式下,可以調(diào)用conn1.send發(fā)送消息,conn1.recv接收消息.
- 如果沒有消息可接收,recv方法會(huì)一直阻塞. 如果管道已經(jīng)被關(guān)閉,那么recv方法會(huì)拋出EOFError
實(shí)現(xiàn)核心:
- 主函數(shù)聲明管道的兩端 conn1, conn2 = multiprocessing.Pipe()
- 以方法包裝方式創(chuàng)建進(jìn)程后, 在對(duì)應(yīng)方法中調(diào)用管道的兩端調(diào)用消息收發(fā)的方法 conn1.send/conn1.recv
實(shí)操代碼:
import multiprocessing
import time
def fun1(conn1):
"""
管道結(jié)構(gòu)
進(jìn)程<==>conn1(管道頭)==pipe==conn2(管道尾)<==>進(jìn)程2
"""
sub_info = "進(jìn)程向conn1發(fā)送消息, 管道另一頭conn2 可以接收到消息"
print(f"進(jìn)程1--{multiprocessing.current_process().pid}發(fā)送數(shù)據(jù):{sub_info}")
time.sleep(1)
conn1.send(sub_info) # 調(diào)用conn1.send發(fā)送消息, 發(fā)送的消息會(huì)被管道的另一頭接收
print(f"conn1接收消息:{conn1.recv()}") # conn1.recv接收消息, 如果沒有消息可接收, recv方法會(huì)一直阻塞. 如果管道已經(jīng)被關(guān)閉,那么recv方法會(huì)拋出EOFError
time.sleep(1)
def fun2(conn2):
sub_info = "進(jìn)程向conn2發(fā)送消息, 管道另一頭conn1 可以接收到消息"
print(f"進(jìn)程2--{multiprocessing.current_process().pid}發(fā)送數(shù)據(jù):{sub_info}")
time.sleep(1)
conn2.send(sub_info)
print(f"conn2接收消息:{conn2.recv()}")
time.sleep(1)
if __name__ == "__main__":
# 創(chuàng)建管道
# Pipe方法返回(conn1, conn2)代表一個(gè)管道的兩個(gè)端.如果conn1帶表頭, conn2代表尾, conn1發(fā)送的消息只會(huì)被conn2接收, 同理conn2發(fā)送的消息也只會(huì)被conn1接收
conn1, conn2 = multiprocessing.Pipe()
# 創(chuàng)建子進(jìn)程
# Python中,圓括號(hào)意味著調(diào)用函數(shù). 在沒有圓括號(hào)的情況下,Python會(huì)把函數(shù)當(dāng)做普通對(duì)象
process1 = multiprocessing.Process(target=fun1, args=(conn1,))
process2 = multiprocessing.Process(target=fun2, args=(conn2,))
# 啟動(dòng)子進(jìn)程
process1.start()
process2.start()
四、Manager管理器
管理器提供了一種創(chuàng)建共享數(shù)據(jù)的方法,從而可以在不同進(jìn)程中共享。
實(shí)現(xiàn)核心:
- 創(chuàng)建進(jìn)程
- 利用Manager創(chuàng)建字典, 列表等對(duì)象, 傳入進(jìn)程
- 在各進(jìn)程所對(duì)應(yīng)的方法中修改上面創(chuàng)建的對(duì)象
實(shí)操代碼:
from multiprocessing import Manager, Process
def func1(name,m_list,m_dict):
m_dict['area'] = '羅布泊'
m_list.append('錢三強(qiáng)')
def func2(name, m_list, m_dict):
m_dict['work'] = '造核彈'
m_list.append('鄧稼先')
if __name__ == "__main__":
with Manager() as mgr:
m_list = mgr.list()
m_dict = mgr.dict()
m_list.append("錢學(xué)森")
# 兩個(gè)進(jìn)程不能直接互相使用對(duì)象,需要互相傳遞
p1 = Process(target=func1, args=('p1', m_list, m_dict))
p1.start()
p1.join() # 等p1進(jìn)程結(jié)束,主進(jìn)程繼續(xù)執(zhí)行
print(m_list)
print(m_dict)
p2 = Process(target=func2, args=('p1', m_list, m_dict))
p2.start()
p2.join() # 等p2進(jìn)程結(jié)束,主進(jìn)程繼續(xù)執(zhí)行
print(m_list)
print(m_dict)
執(zhí)行結(jié)果太過(guò)機(jī)密, 不予展示。
五、進(jìn)程池(Pool)
進(jìn)程池可以提供指定數(shù)量的進(jìn)程給用戶使用,即當(dāng)有新的請(qǐng)求提交到進(jìn)程池中時(shí),如果池未滿,則會(huì)創(chuàng)建一個(gè)新的進(jìn)程用來(lái)執(zhí)行該請(qǐng)求;反之,如果池中的進(jìn)程數(shù)已經(jīng)達(dá)到規(guī)定最大值,那么該請(qǐng)求就會(huì)等待,只要池中有進(jìn)程空閑下來(lái),該請(qǐng)求就能得到執(zhí)行 使用進(jìn)程池的好處就是可以節(jié)約內(nèi)存空間, 提高資源利用率。
進(jìn)程池相關(guān)方法:
類/方法 | 功能 | 參數(shù) |
Pool(processes) | 創(chuàng)建進(jìn)程池對(duì)象 | processes表示進(jìn)程池中有多少進(jìn)程 |
pool.apply_async(func,args,kwds) | 異步執(zhí)行;將事件放入到進(jìn)程池隊(duì)列 | func 事件函數(shù) args 以元組形式給func傳參kwds 以字典形式給func傳參返回值:返回一個(gè)代表進(jìn)程池事件的對(duì)象,通過(guò)返回值的get方法可以得到事件函數(shù)的返回值 |
pool.apply(func,args,kwds) | 同步執(zhí)行;將事件放入到進(jìn)程池隊(duì)列 | func 事件函數(shù) args 以元組形式給func傳參kwds 以字典形式給func傳參 |
pool.close() | 關(guān)閉進(jìn)程池 | |
pool.join() | 關(guān)閉進(jìn)程池 | |
pool.map(func,iter) | 類似于python的map函數(shù),將要做的事件放入進(jìn)程池 | func 要執(zhí)行的函數(shù) iter 迭代對(duì)象 |
實(shí)現(xiàn)核心:
- 創(chuàng)建和初始化進(jìn)程池
- 以方法包裝的方式傳入相關(guān)參數(shù), 并調(diào)用相關(guān)api
實(shí)操代碼:
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1輸出: 當(dāng)前進(jìn)程的ID:{os.getpid()},{name}")
sleep(2)
return name
def func2(args):
print("方法2輸出: ", args)
if __name__ == "__main__":
pool = Pool(5)
pool.apply_async(func=func1, args=('進(jìn)程1',), callback=func2)
pool.apply_async(func=func1, args=('進(jìn)程2',), callback=func2)
pool.apply_async(func=func1, args=('進(jìn)程3',), callback=func2)
pool.apply_async(func=func1, args=('進(jìn)程4',))
pool.apply_async(func=func1, args=('進(jìn)程5',))
pool.apply_async(func=func1, args=('進(jìn)程6',))
pool.apply_async(func=func1, args=('進(jìn)程7',))
pool.close()
pool.join()
執(zhí)行結(jié)果:
1. 使用with管理進(jìn)程池
使用with 方法, 可以進(jìn)行優(yōu)雅的進(jìn)行資源管理. 在這里是可以幫助我們優(yōu)雅的關(guān)閉線程池。
關(guān)于with方法:
with所求值的對(duì)象必須有一個(gè)enter()方法,一個(gè)exit()方法. 緊跟with后面的語(yǔ)句被求值后,返回對(duì)象的__enter__()方法被調(diào)用, 這個(gè)方法的返回值將被賦值給as后面的變量。當(dāng)with后面的代碼塊全部被執(zhí)行完之后,將調(diào)用前面返回對(duì)象的exit()方法。
實(shí)操代碼:
from multiprocessing import Pool
import os
from time import sleep
def func1(name):
print(f"方法1輸出: 當(dāng)前進(jìn)程的ID:{os.getpid()},{name}")
sleep(2)
return name
if __name__ == "__main__":
with Pool(5) as pool:
args = pool.map(func1, ('進(jìn)程1,', '進(jìn)程2,', '進(jìn)程3,', '進(jìn)程4,', '進(jìn)程5,', '進(jìn)程6,', '進(jìn)程7,', '進(jìn)程8,'))
for a in args:
print(a)
今日冒險(xiǎn)片段下:
這時(shí), 米斯特建議我們應(yīng)該先集中精力先擊敗一只, 然后在2v2時(shí)利用速度優(yōu)勢(shì)將其余兩只擊敗. 了不起覺得可行, 然后利用魔劍奧義首先將武器附上冰屬性. 通過(guò)冰屬性攻擊使其行動(dòng)減緩之后, 二人便使用技能將將泰坦擊敗. 隨后又花了小半天時(shí)間又將剩下的歌利亞, 阿特拉斯擊敗. 就這樣了不起也成功的升級(jí)到lv30. 同時(shí)領(lǐng)主也掉落了物品熔巖核, 一種含有大量火屬性元素的珠子. 這個(gè)珠子現(xiàn)在看起來(lái)沒什么作用, 但在將來(lái)的某一天可能會(huì)是其救命稻草...