聊聊 Python 中的同步原語,為什么有了 GIL 還需要同步原語
前言
? 在前面的文章中我們介紹了 Python 中的全局解釋器鎖 GIL,我們知道 GIL 可以保證在多線程場(chǎng)景下同一時(shí)刻只有一個(gè)線程運(yùn)行,但是并不能保證線程安全(所謂線程安全簡(jiǎn)單來說就是程序在多線程環(huán)境中運(yùn)行時(shí),線程在交替運(yùn)行時(shí)能正常的訪問共享資源,不會(huì)造成數(shù)據(jù)不一致或者死鎖,最后都能達(dá)到預(yù)期的結(jié)果),比如我們看下面的兩個(gè)例子:
對(duì) counter 進(jìn)行累加
import threading
import time
counter = 0
temp_count = 0
def increment():
global counter, temp_count
for _ in range(1000):
counter += 1
temp = temp_count
time.sleep(0.0001)
temp_count = temp + 1
start = time.time()
threads = []
for _ in range(10):
t = threading.Thread(target=increment)
threads.append(t)
t.start()
for t in threads:
t.join()
end = time.time()
print("Final counter value:", counter)
print("Final temp_count value:", temp_count)
print(f"總共耗時(shí):{end - start}")
# 運(yùn)行結(jié)果
Final counter value: 10000
Final temp_count value: 1001
總共耗時(shí):0.5465419292449951
? 上面我們對(duì) counter 做多線程累積時(shí),盡管 counter += 1 是非原子操作,但是由于 CPU 執(zhí)行太快,因此我們很難復(fù)現(xiàn)線程不安全的情況,因此我們使用 temp_count 寫法進(jìn)行手動(dòng)模擬。
賬戶取款
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
def withdraw(self, amount):
if self.balance >= amount:
# 發(fā)生線程切換
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
def deposit(self, amount):
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個(gè)線程進(jìn)行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
? 上面的代碼同樣是線程不安全的,考慮這個(gè)場(chǎng)景,如果此時(shí)賬戶余額中剩余200,線程1執(zhí)行完 self.balance >= amount 后切換到線程2,線程2正常取款200,然后切換回線程1,導(dǎo)致此時(shí)余額為-2200。
使用同步原語保證線程安全
? 從上面的兩個(gè)案例中我們可以看出,GIL 并不能保證線程安全,我們需要使用同步原語來進(jìn)行線程同步保證線程安全。
locked、release 顯式獲取鎖和釋放鎖
? 在一些比較老的 python 代碼中,我們可以看到很多使用 locked、release 顯式獲取鎖和釋放鎖 的用法。
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
self.lock.locked()
if self.balance >= amount:
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
self.lock.release()
def deposit(self, amount):
self.lock.locked()
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
self.lock.release()
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個(gè)線程進(jìn)行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
使用 with 語句同步原理
? 相比于這種顯式調(diào)用的方法,with 語句更加優(yōu)雅,也更不容易出錯(cuò),特別是程序員可能會(huì)忘記調(diào)用 release() 方法或者程序在獲得鎖之后產(chǎn)生異常這兩種情況(使用 with 語句可以保證在這兩種情況下仍能正確釋放鎖)。
import threading
class BankAccount:
def __init__(self, balance):
self.balance = balance
self.lock = threading.Lock()
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
print(f"Withdrawal successful. Balance: {self.balance}")
else:
print("Insufficient funds")
def deposit(self, amount):
with self.lock:
self.balance += amount
print(f"Deposit successful. Balance: {self.balance}")
if __name__ == "__main__":
account = BankAccount(1000)
# 創(chuàng)建多個(gè)線程進(jìn)行取款存款操作
threads = []
for _ in range(5):
t = threading.Thread(target=account.withdraw, args=(account, 200))
threads.append(t)
t.start()
for t in threads:
t.join()
其它支持同步原語:RLock 和 Semaphore
RLock
? 一個(gè) RLock (可重入鎖)可以被同一個(gè)線程多次獲取,主要用來實(shí)現(xiàn)基于監(jiān)測(cè)對(duì)象模式的鎖定和同步。在使用這種鎖的情況下,當(dāng)鎖被持有時(shí),只有一個(gè)線程可以使用完整的函數(shù)或者類中的方法。
import threading
class SharedCounter:
'''
A counter object that can be shared by multiple threads.
'''
_lock = threading.RLock()
def __init__(self, initial_value = 0):
self._value = initial_value
def incr(self,delta=1):
'''
Increment the counter with locking
'''
with SharedCounter._lock:
self._value += delta
def decr(self,delta=1):
'''
Decrement the counter with locking
'''
with SharedCounter._lock:
self.incr(-delta)
? 在上邊這個(gè)例子中,沒有對(duì)每一個(gè)實(shí)例中的可變對(duì)象加鎖,取而代之的是一個(gè)被所有實(shí)例共享的類級(jí)鎖。這個(gè)鎖用來同步類方法,具體來說就是,這個(gè)鎖可以保證一次只有一個(gè)線程可以調(diào)用這個(gè)類方法。不過,與一個(gè)標(biāo)準(zhǔn)的鎖不同的是,已經(jīng)持有這個(gè)鎖的方法在調(diào)用同樣使用這個(gè)鎖的方法時(shí),無需再次獲取鎖。比如 decr 方法。 這種實(shí)現(xiàn)方式的一個(gè)特點(diǎn)是,無論這個(gè)類有多少個(gè)實(shí)例都只用一個(gè)鎖。因此在需要大量使用計(jì)數(shù)器的情況下內(nèi)存效率更高。不過這樣做也有缺點(diǎn),就是在程序中使用大量線程并頻繁更新計(jì)數(shù)器時(shí)會(huì)有爭(zhēng)用鎖的問題。
Semaphore
? 信號(hào)量對(duì)象是一個(gè)建立在共享計(jì)數(shù)器基礎(chǔ)上的同步原語。如果計(jì)數(shù)器不為0,with 語句將計(jì)數(shù)器減1,線程被允許執(zhí)行。with 語句執(zhí)行結(jié)束后,計(jì)數(shù)器加1。如果計(jì)數(shù)器為0,線程將被阻塞,直到其他線程結(jié)束將計(jì)數(shù)器加1。
import urllib.request
from threading import Semaphore
# At most, five threads allowed to run at once
_fetch_url_sema = Semaphore(5)
def fetch_url(url):
with _fetch_url_sema:
return urllib.request.urlopen(url)