Python線程安全之鎖、信號量
在Realpython看到一篇關(guān)于線程安全的文章,覺得非常哇塞,分享給大家,今天先講前半部分。
提到線程必須了解兩個(gè)術(shù)語:
? 并發(fā)Concurrency,系統(tǒng)具備處理多個(gè)任務(wù)的能力,它們的在執(zhí)行在時(shí)間上重疊,但不一定同時(shí)發(fā)生。
? 并行Parallelism:多個(gè)任務(wù)利用多核CPU真正同時(shí)執(zhí)行。
Python的線程是一個(gè)并發(fā)框架,線程并行運(yùn)行的時(shí)候,每個(gè)線程執(zhí)行代碼的一部分,Python解釋器在它們之間切換,將執(zhí)行控制權(quán)交給每個(gè)線程。
理解線程并行
先簡單舉個(gè)例子:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
def threaded_function():
for number in range(3):
print(f"Printing from {threading.current_thread().name}. {number=}")
time.sleep(0.1)
with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
for _ in range(4):
executor.submit(threaded_function)
先打印輸出:
Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2
啟動(dòng)了四個(gè)線程,可以觀察到在Worker_0打印number=0之后 ,并不會立刻打印number=1,原因就在于要切換給其他線程運(yùn)行,而且各個(gè)線程的運(yùn)行順序是不一樣的。
如何做到這個(gè)的呢?因?yàn)閜ython解析器會進(jìn)行上下文切換,默認(rèn)的間隔時(shí)間如下列代碼:
import sys
sys.getswitchinterval()
0.005
5毫秒的間隔并不意味著線程會精確地每5毫秒切換一次,而是意味著解釋器會在這些間隔內(nèi)考慮切換到另一個(gè)線程,而代碼中的sleep()是為了增加了在此期間發(fā)生上下文切換的可能性。
什么是線程安全
由于上下文切換,程序在多線程環(huán)境中運(yùn)行時(shí)可能會表現(xiàn)出意外行為,這就導(dǎo)致線程不安全問題,而如果代碼在多線程環(huán)境中運(yùn)行時(shí)表現(xiàn)出確定性并產(chǎn)生期望的輸出,那么它就被認(rèn)為是線程安全的。
線程安全問題通常源于兩個(gè)原因:
? 共享可變數(shù)據(jù):線程共享父進(jìn)程的內(nèi)存,因此所有變量和數(shù)據(jù)結(jié)構(gòu)在各線程之間是共享的。對這些共享數(shù)據(jù)進(jìn)行修改時(shí)可能會引發(fā)錯(cuò)誤。
? 非原子操作:多線程環(huán)境中,涉及多個(gè)步驟的操作可能會被上下文切換中斷,尤其是在執(zhí)行過程中切換到其他線程時(shí),容易導(dǎo)致意外結(jié)果。
1:GIL
在討論P(yáng)ython線程時(shí),Python 3.12之前的GIL(Global Interpreter Lock)不可避免要提到。GIL是一個(gè)互斥鎖,目的是保護(hù)Python對象的訪問,防止多個(gè)線程同時(shí)執(zhí)行Python字節(jié)碼。它阻止了真正的線程并行,尤其在CPU密集型任務(wù)中,多線程性能會受到嚴(yán)重限制。不過,這意味著對于I/O密集型任務(wù),線程并行仍然適用。
由于GIL的存在,當(dāng)某個(gè)操作能在單個(gè)字節(jié)碼指令中完成時(shí),它是原子的。那么,Python是否因此天然線程安全?并非如此。因?yàn)镮/O操作仍然可以并行執(zhí)行,因此即使有GIL,訪問共享可變數(shù)據(jù)時(shí)依然需要鎖等同步機(jī)制確保線程安全。
GIL是否完全消除了Python的多線程并發(fā)能力?并沒有,Python支持通過多進(jìn)程來實(shí)現(xiàn)真正的并行。
值得關(guān)注的是,從Python 3.13開始,Python提供了無GIL的解釋器,實(shí)現(xiàn)了真正的線程并行。但無論是否有GIL,編寫代碼時(shí)始終建議合理地保護(hù)線程安全——也就是說,不依賴GIL,主動(dòng)保證線程安全。
2:競爭
現(xiàn)在來看看第二個(gè)核心概念,競爭條件發(fā)生在程序的結(jié)果依賴于不可控事件的順序或時(shí)間,如線程執(zhí)行順序時(shí),如果沒有適當(dāng)?shù)耐綍?dǎo)致程序出現(xiàn)不可預(yù)測的錯(cuò)誤。
下面的例子就來模擬這種情況,兩個(gè)線程同時(shí)修改一個(gè)屬性:
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
new_balance = self.balance - amount
time.sleep(0.1)
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(account.withdraw, 500)
executor.submit(account.withdraw, 700)
print(f"Final account balance: {account.balance}")
先猜可能的結(jié)果,代碼可能會輸出:
Final account balance: 300
也可能會輸出:
Final account balance: 500
為什么會出現(xiàn)這樣的情況呢?這就是由于線程執(zhí)行順序不一致導(dǎo)致的,如果先扣700,而第二個(gè)線程突然切換過來了,檢查余額夠,最終就扣了500,余額就變成500了,當(dāng)然結(jié)果是錯(cuò)誤的。
同步原語
為了解決線程不安全問題,Python的threading模塊提供了各種同步原語,以防止競爭條件并允許線程之間的協(xié)調(diào)。
同步原語會:
? 控制線程同時(shí)執(zhí)行代碼塊
? 使多個(gè)代碼語句對線程來說是原子的
? 限制線程的并發(fā)訪問
? 在線程之間進(jìn)行協(xié)調(diào),并根據(jù)其他線程的狀態(tài)執(zhí)行操作
接下去使用Python線程鎖和信號實(shí)現(xiàn)互斥。
使用Python線程鎖實(shí)現(xiàn)互斥
鎖是一種同步原語,可用于獨(dú)占訪問資源,一旦一個(gè)線程獲取了鎖,其他線程就不能再獲取它并繼續(xù)執(zhí)行,直到鎖被釋放,可以使用鎖來封裝應(yīng)該原子執(zhí)行的語句或語句組。
python提供兩個(gè)lock相關(guān)的函數(shù):
? 當(dāng)一個(gè)線程調(diào)用.acquire()方法時(shí),如果Lock對象已經(jīng)被另一個(gè)線程鎖定,那么調(diào)用的線程會被阻塞,直到持有鎖的線程釋放鎖。
? release() 會釋放一個(gè)被線程獲取的鎖,如果嘗試釋放一個(gè)未鎖定的鎖,會引發(fā)RuntimeError。
如果使用with語句,Lock 對象可用作上下文管理器,可以自動(dòng)獲取和釋放鎖。
為了解決上面代碼存在的問題,可以:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.account_lock = threading.Lock()
def withdraw(self, amount):
with self.account_lock:
if self.balance >= amount:
new_balance = self.balance - amount
print(f"Withdrawing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
else:
raise ValueError("Insufficient balance")
def deposit(self, amount):
with self.account_lock:
new_balance = self.balance + amount
print(f"Depositing {amount}...")
time.sleep(0.1) # Simulate a delay
self.balance = new_balance
account = BankAccount(1000)
with ThreadPoolExecutor(max_workers=3) as executor:
executor.submit(account.withdraw, 700)
executor.submit(account.deposit, 1000)
executor.submit(account.withdraw, 300)
print(f"Final account balance: {account.balance}")
上述代碼通過鎖成功保證了線性安全。
如果由于代碼中的錯(cuò)誤或疏忽導(dǎo)致鎖未正確釋放,可能會導(dǎo)致死鎖,即線程無限期地等待鎖被釋放。
死鎖的原因包括:
? 嵌套鎖獲?。喝绻粋€(gè)線程嘗試獲取它已經(jīng)持有的鎖,可能會發(fā)生死鎖,同一線程嘗試多次獲取相同的鎖會導(dǎo)致線程阻塞自身,這種情況在沒有外部干預(yù)的情況下無法解決。
? 多重鎖獲?。寒?dāng)使用多個(gè)鎖時(shí),如果線程以不一致的順序獲取這些鎖,可能會發(fā)生死鎖,如果兩個(gè)線程各自持有一個(gè)鎖并等待對方釋放鎖,那么兩個(gè)線程都無法繼續(xù),從而導(dǎo)致死鎖。
對于多重鎖可以使用可重入鎖RLock解決,當(dāng)持有線程再次請求鎖時(shí),它不會阻塞,允許線程在釋放鎖之前多次獲取鎖,這在遞歸函數(shù)或線程需要重新進(jìn)入已鎖定資源的情況下非常有用,相對來說,RLock因?yàn)橐櫷痪€程獲取鎖的次數(shù),會有性能開銷。
Semaphores信號量
在資源數(shù)量有限且多個(gè)線程嘗試訪問這些有限資源時(shí)非常有用,它使用一個(gè)計(jì)數(shù)器來限制多個(gè)線程對臨界區(qū)的訪問,每次調(diào)用.acquire() 都會將信號量的計(jì)數(shù)器減少一個(gè),當(dāng)計(jì)數(shù)器達(dá)到零時(shí),再.acquire() 調(diào)用將被阻塞。
舉一個(gè)例子,多個(gè)客戶在銀行等待有限數(shù)量的柜員服務(wù):
import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor
# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)
def now():
return time.strftime("%H:%M:%S")
def serve_customer(name):
print(f"{now()}: {name} is waiting for a teller.")
with teller_semaphore:
print(f"{now()}: {name} is being served by a teller.")
time.sleep(random.randint(1, 3))
print(f"{now()}: {name} is done being served.")
customers = [
"Customer 1",
"Customer 2",
"Customer 3",
"Customer 4",
"Customer 5",
]
with ThreadPoolExecutor(max_workers=3) as executor:
for customer_name in customers:
thread = executor.submit(serve_customer, customer_name)
print(f"{now()}: All customers have been served.")
代碼很好理解,當(dāng)某個(gè)線程達(dá)到計(jì)數(shù)器上限后,它會被阻塞,直到其他線程在with語句中因?yàn)橥瓿煞?wù)而釋放,但不管怎么樣,每次只有三個(gè)客戶被服務(wù)。
參考:https://realpython.com/python-thread-lock/#using-python-threading-locks-for-mutual-exclusion