Python線程安全之三大同步原語(yǔ)
使用同步原語(yǔ)進(jìn)行通信和協(xié)調(diào)
在這些方法中,使用事件、條件和屏障對(duì)象等synchronization原語(yǔ)可以促進(jìn)多個(gè)線程之間的通信和協(xié)調(diào)。
1:事件信號(hào)
可以使用事件對(duì)象進(jìn)行信號(hào)通信,讓一個(gè)線程向一個(gè)或多個(gè)線程通知某個(gè)操作,具體操作如下,先創(chuàng)建一個(gè)Event事件,事件對(duì)象有一個(gè)內(nèi)部標(biāo)記,默認(rèn)為False,可以使用.set()設(shè)置標(biāo)記為T(mén)rue,也可以使用.clear() 將其重置為False,當(dāng)其它線程調(diào)用.wait() 方法時(shí),它會(huì)阻塞,直到事件對(duì)象的內(nèi)部標(biāo)志被設(shè)置為T(mén)rue。
舉個(gè)例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
bank_open = threading.Event()
transactions_open = threading.Event()
def serve_customer(customer_data):
print(f"{customer_data['name']} 正在等待銀行開(kāi)門(mén)。")
bank_open.wait()
print(f"{customer_data['name']} 進(jìn)入了銀行")
if customer_data["type"] == "WITHDRAW_MONEY":
print(f"{customer_data['name']} 正在等待交易開(kāi)始。")
transactions_open.wait()
print(f"{customer_data['name']} 開(kāi)始交易。")
# 模擬執(zhí)行交易的時(shí)間
time.sleep(2)
print(f"{customer_data['name']} 完成交易并離開(kāi)了銀行")
else:
# 模擬其他銀行業(yè)務(wù)的時(shí)間
time.sleep(2)
print(f"{customer_data['name']} 已離開(kāi)銀行")
customers = [
{"name": "客戶 1", "type": "WITHDRAW_MONEY"},
{"name": "客戶 2", "type": "CHECK_BALANCE"},
{"name": "客戶 3", "type": "WITHDRAW_MONEY"},
{"name": "客戶 4", "type": "WITHDRAW_MONEY"},
{"name": "客戶 5", "type": "WITHDRAW_MONEY"},
{"name": "客戶 6", "type": "WITHDRAW_MONEY"},
]
with ThreadPoolExecutor(max_workers=4) as executor:
for customer_data in customers:
executor.submit(serve_customer, customer_data)
print("銀行經(jīng)理正在準(zhǔn)備開(kāi)門(mén)。")
time.sleep(2)
print("銀行現(xiàn)在開(kāi)門(mén)了!")
bank_open.set() # 發(fā)出銀行開(kāi)門(mén)的信號(hào)
time.sleep(3)
print("交易現(xiàn)在開(kāi)放!")
transactions_open.set()
print("所有客戶已完成交易。")
猜猜結(jié)果是什么:
? 事件控制:bank_open和transactions_open兩個(gè)事件標(biāo)記,控制銀行何時(shí)開(kāi)門(mén)以及交易何時(shí)開(kāi)始,所有客戶在銀行開(kāi)門(mén)前會(huì)被阻塞,等待bank_open.set(),而需取款的客戶會(huì)繼續(xù)等待transactions_open.set() 才能執(zhí)行取款操作。
? 線程池的使用:ThreadPoolExecutor限制了同時(shí)執(zhí)行的線程數(shù),最多服務(wù)4個(gè)客戶,當(dāng)一個(gè)客戶完成服務(wù)后,線程池會(huì)釋放一個(gè)線程,這樣新客戶可以繼續(xù)進(jìn)入銀行。
? CHECK_BALANC類型的客戶不需要等待transactions_open事件,因此會(huì)在銀行開(kāi)門(mén)后直接完成操作并離開(kāi)。
客戶 1 正在等待銀行開(kāi)門(mén)。
客戶 2 正在等待銀行開(kāi)門(mén)。
客戶 3 正在等待銀行開(kāi)門(mén)。
客戶 4 正在等待銀行開(kāi)門(mén)。
客戶 5 正在等待銀行開(kāi)門(mén)。
客戶 6 正在等待銀行開(kāi)門(mén)。
銀行經(jīng)理正在準(zhǔn)備開(kāi)門(mén)。
銀行現(xiàn)在開(kāi)門(mén)了!
客戶 1 進(jìn)入了銀行
客戶 2 進(jìn)入了銀行
客戶 3 進(jìn)入了銀行
客戶 4 進(jìn)入了銀行
客戶 1 正在等待交易開(kāi)始。
客戶 3 正在等待交易開(kāi)始。
客戶 4 正在等待交易開(kāi)始。
客戶 2 已離開(kāi)銀行
客戶 5 進(jìn)入了銀行
客戶 5 正在等待交易開(kāi)始。
客戶 6 進(jìn)入了銀行
客戶 6 正在等待交易開(kāi)始。
交易現(xiàn)在開(kāi)放!
客戶 1 開(kāi)始交易。
客戶 3 開(kāi)始交易。
客戶 4 開(kāi)始交易。
客戶 5 開(kāi)始交易。
客戶 1 完成交易并離開(kāi)了銀行
客戶 3 完成交易并離開(kāi)了銀行
客戶 4 完成交易并離開(kāi)了銀行
客戶 6 開(kāi)始交易。
客戶 5 完成交易并離開(kāi)了銀行
客戶 6 完成交易并離開(kāi)了銀行
所有客戶已完成交易。
在需要同時(shí)向多個(gè)等待線程發(fā)出狀態(tài)變化信號(hào)的情況下,事件對(duì)象尤其有用。
Conditions條件等待
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
customer_available_condition = threading.Condition()
# Customers waiting to be served by the Teller
customer_queue = []
def now():
return time.strftime("%H:%M:%S")
def serve_customers():
while True:
with customer_available_condition:
# Wait for a customer to arrive
while not customer_queue:
print(f"{now()}: Teller is waiting for a customer.")
customer_available_condition.wait()
# Serve the customer
customer = customer_queue.pop(0)
print(f"{now()}: Teller is serving {customer}.")
# Simulate the time taken to serve the customer
time.sleep(random.randint(1, 5))
print(f"{now()}: Teller has finished serving {customer}.")
def add_customer_to_queue(name):
with customer_available_condition:
print(f"{now()}: {name} has arrived at the bank.")
customer_queue.append(name)
customer_available_condition.notify()
customer_names = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=6) as executor:
teller_thread = executor.submit(serve_customers)
for name in customer_names:
# Simulate customers arriving at random intervals
time.sleep(random.randint(1, 3))
executor.submit(add_customer_to_queue, name)
利用條件對(duì)象condition來(lái)協(xié)調(diào)生產(chǎn)者-消費(fèi)者模型中的線程通信,使線程在特定條件滿足時(shí)再繼續(xù)執(zhí)行,從而有效管理多線程中的執(zhí)行流程。
Condition對(duì)象(customer_available_condition)既用作鎖來(lái)保護(hù)共享資源(customer_queue),也用作線程間的通信工具。通過(guò)wait()和notify()方法,柜員可以等待客戶到來(lái),客戶到達(dá)后再通知柜員開(kāi)始服務(wù),從而避免了“忙等”。
在with上下文管理器中,condition對(duì)象確保在臨界區(qū)內(nèi)自動(dòng)加鎖和釋放鎖,保護(hù)共享資源customer_queue,serve_customers()中的無(wú)限循環(huán)讓柜員可以持續(xù)服務(wù)來(lái)訪的客戶,而在隊(duì)列為空時(shí),通過(guò)wait()等待,避免無(wú)效的資源占用,使用condition實(shí)現(xiàn)同步,使得只有在客戶隊(duì)列非空時(shí)柜員才會(huì)服務(wù),避免了資源的浪費(fèi)和繁瑣的輪詢。
可能的輸出如下:
10:15:08: Teller is waiting for a customer.
10:15:09: Customer 1 has arrived at the bank.
10:15:09: Teller is serving Customer 1.
10:15:11: Customer 2 has arrived at the bank.
10:15:12: Teller has finished serving Customer 1.
10:15:12: Teller is serving Customer 2.
10:15:13: Teller has finished serving Customer 2.
10:15:13: Teller is waiting for a customer.
10:15:14: Customer 3 has arrived at the bank.
10:15:14: Teller is serving Customer 3.
10:15:15: Customer 4 has arrived at the bank.
10:15:17: Customer 5 has arrived at the bank.
10:15:18: Teller has finished serving Customer 3.
10:15:18: Teller is serving Customer 4.
10:15:22: Teller has finished serving Customer 4.
10:15:22: Teller is serving Customer 5.
10:15:25: Teller has finished serving Customer 5.
10:15:25: Teller is waiting for a customer.
Barriers
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
teller_barrier = threading.Barrier(3)
def now():
return time.strftime("%H:%M:%S")
def prepare_for_work(name):
print(f"{now()}: {name} is preparing their counter.")
# Simulate the delay to prepare the counter
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} has finished preparing.")
# Wait for all tellers to finish preparing
teller_barrier.wait()
print(f"{now()}: {name} is now ready to serve customers.")
tellers = ["Teller 1", "Teller 2", "Teller 3"]
with ThreadPoolExecutor(max_workers=4) as executor:
for teller_name in tellers:
executor.submit(prepare_for_work, teller_name)
print(f"{now()}: All tellers are ready to serve customers.")
Barrier用于多線程場(chǎng)景中,當(dāng)多個(gè)線程都到達(dá)指定的同步點(diǎn)(即wait()方法)后,所有線程才能繼續(xù)執(zhí)行,在銀行場(chǎng)景中,Barrier確保所有柜員準(zhǔn)備就緒后才能開(kāi)始為客戶服務(wù)。
Barrier(3)指定了屏障點(diǎn)需要3個(gè)線程才能通過(guò),確保所有3個(gè)柜員必須完成準(zhǔn)備才會(huì)繼續(xù),一旦最后一個(gè)柜員完成準(zhǔn)備,所有線程(柜員)同時(shí)通過(guò)屏障,開(kāi)始為客戶服務(wù)。
總結(jié)
在多線程編碼中,因?yàn)橛缮舷挛那袚Q,所以某個(gè)代碼塊需要作為一個(gè)原子單元執(zhí)行(即不可分割),就需要使用鎖的機(jī)制來(lái)保護(hù);同時(shí)在修改共享可變數(shù)據(jù)的時(shí)候,一定也要通過(guò)鎖機(jī)制保護(hù);另外使用的第三方庫(kù)可能不是線程安全的;不確定線程安全性時(shí)使用互斥鎖是一種最佳實(shí)踐。
同步工具包括:
- ? Lock 和 RLock:用于實(shí)現(xiàn)互斥鎖,確保某段代碼在一個(gè)線程執(zhí)行時(shí)不被其他線程打斷。
- ? Semaphore:用于限制資源的并發(fā)訪問(wèn)次數(shù),可以控制同時(shí)運(yùn)行的線程數(shù)量。
- ? Event:用于在線程間發(fā)送信號(hào),通知某些條件已滿足。
- ? Condition:允許線程等待特定條件并在條件滿足時(shí)繼續(xù)執(zhí)行,常用于生產(chǎn)者-消費(fèi)者模型。
- ? Barrier:用于協(xié)調(diào)多個(gè)線程的執(zhí)行步調(diào),確保所有線程在同步點(diǎn)上會(huì)合后一起繼續(xù)。