深度解密協(xié)程鎖、信號(hào)量以及線程鎖的實(shí)現(xiàn)原理
關(guān)于什么是信號(hào)量,相信大家都知道,那么本文便從源碼的角度來看看信號(hào)量是怎么實(shí)現(xiàn)的。不過在說信號(hào)量之前,必須先剖析一下鎖,理解了鎖才能更好地理解信號(hào)量。
那什么是鎖呢?如果程序中某個(gè)部分在并發(fā)操作時(shí)會(huì)出現(xiàn)意想不到的結(jié)果(比如操作一個(gè)共享的數(shù)據(jù)結(jié)構(gòu)),那么該部分就需要通過鎖保護(hù)起來,而被鎖保護(hù)起來的部分叫做臨界區(qū)。
線程在進(jìn)入臨界區(qū)之前必須先獲取鎖,然后才能操作共享資源。而鎖一旦被獲取,那么其它線程再嘗試獲取鎖,就會(huì)陷入阻塞,直到鎖被釋放。
圖片
通過鎖,我們能確保同一時(shí)刻只能有一個(gè)線程操作共享資源,從而很好地解決資源競(jìng)爭(zhēng)問題。這里的鎖指的是互斥鎖,也被稱為排它鎖。
而在 Python 里面,鎖可以通過 asyncio 和 threading 模塊來創(chuàng)建,這兩個(gè)模塊都提供了鎖,一個(gè)是協(xié)程鎖,一個(gè)是線程鎖,當(dāng)然也包括信號(hào)量。
import asyncio
import threading
lock1 = asyncio.Lock()
lock2 = threading.Lock()
當(dāng)我們對(duì)類 Lock 實(shí)例化,便可以得到鎖,然后鎖有兩個(gè)常用方法。
- acquire():獲取鎖;
- release():釋放鎖;
API 非常簡(jiǎn)單,我們先來看看協(xié)程里面的鎖,以及信號(hào)量。
協(xié)程鎖和信號(hào)量
之前在介紹 asyncio 的 Future 和 Task 時(shí)說過,F(xiàn)uture 對(duì)象可以看作是一個(gè)容器,它保存了在未來某個(gè)時(shí)刻才會(huì)出現(xiàn)的結(jié)果。
如果 Future 對(duì)象里面還沒有結(jié)果集,那么它就處于未完成狀態(tài),否則處于已完成狀態(tài)。
import asyncio
future = asyncio.Future()
# 是否完成
print(future.done())
"""
False
"""
# 因?yàn)?future 此時(shí)還沒有結(jié)果集,所以是未完成狀態(tài)(PENDING)
# 設(shè)置結(jié)果集
future.set_result("S 老師不希望你們?yōu)榱怂鴥蓴【銈?)
# 由于設(shè)置了結(jié)果集,所以變成已完成狀態(tài)(FINISHED)
print(future.done())
"""
True
"""
# 獲取結(jié)果
print(future.result())
"""
S 老師不希望你們?yōu)榱怂鴥蓴【銈?"""
問題來了,如何在 future 完成時(shí)立刻拿到結(jié)果呢?總不能一直調(diào)用 done 方法輪詢吧。
很簡(jiǎn)單,我們可以對(duì) future 使用 await 表達(dá)式,如果 future 內(nèi)部還沒有結(jié)果集,那么 await 會(huì)處于阻塞狀態(tài),否則不會(huì)阻塞,并且還會(huì)將值取出來。
import asyncio
async def delay(future, seconds):
await asyncio.sleep(seconds)
print("給 future 設(shè)置結(jié)果集")
future.set_result(666)
async def main():
# 創(chuàng)建一個(gè) future
future = asyncio.Future()
loop = asyncio.get_running_loop()
# 創(chuàng)建一個(gè)任務(wù),扔到事件循環(huán)
loop.create_task(delay(future, 3))
print("await future 會(huì)陷入阻塞,因?yàn)樗鼉?nèi)部還沒有結(jié)果集")
# 該表達(dá)式會(huì)返回 666,因?yàn)榻o future 設(shè)置的結(jié)果是 666
await future
print(f"3 秒后結(jié)束阻塞,因?yàn)?delay 協(xié)程內(nèi)部給 future 設(shè)置了結(jié)果集")
asyncio.run(main())
"""
await future 會(huì)陷入阻塞,因?yàn)樗鼉?nèi)部還沒有結(jié)果集
給 future 設(shè)置結(jié)果集
3 秒后結(jié)束阻塞,因?yàn)?delay 協(xié)程內(nèi)部給 future 設(shè)置了結(jié)果集
"""
而協(xié)程在進(jìn)入事件循環(huán)時(shí)會(huì)自動(dòng)創(chuàng)建一個(gè) future,并將協(xié)程和 future 組合起來得到任務(wù),而 await 一個(gè)任務(wù)等價(jià)于 await future。當(dāng)協(xié)程沒有執(zhí)行完畢時(shí)會(huì)處于阻塞,而協(xié)程執(zhí)行完畢時(shí)會(huì)將返回值設(shè)置在 future 中,然后 await 表達(dá)式會(huì)拿到里面的結(jié)果。
在實(shí)際編碼中,我們一般很少手動(dòng)創(chuàng)建 Future 對(duì)象(future),但 Future 和 asyncio 的實(shí)現(xiàn)密切相關(guān),其中就包括了鎖。
當(dāng)協(xié)程在獲取鎖時(shí),如果發(fā)現(xiàn)鎖已被獲取,那么如何陷入阻塞呢?當(dāng)鎖被釋放時(shí),它又如何解除阻塞呢?答案就是通過 future。
假設(shè)協(xié)程 1 和協(xié)程 2 都要獲取鎖,它們都會(huì)調(diào)用鎖的 acquire 方法。其中協(xié)程 1 先獲取到,那么協(xié)程 2 就會(huì)創(chuàng)建一個(gè) future 并 await。由于 future 內(nèi)部還沒有結(jié)果集,因此協(xié)程 2 會(huì)處于阻塞。當(dāng)協(xié)程 1 釋放鎖時(shí),會(huì)給協(xié)程 2 創(chuàng)建的 future 設(shè)置一個(gè)結(jié)果,從而讓協(xié)程 2 解除阻塞、獲取到鎖。
我們手動(dòng)實(shí)現(xiàn)一下鎖。
import asyncio
from collections import deque
class Lock:
def __init__(self):
# 保存創(chuàng)建的 future
self._waiters = deque()
# 鎖是否已被獲取
self._locked = False
async def acquire(self):
# 如果鎖沒有被獲取,那么獲取鎖
if not self._locked:
self._locked = True
return True
# 否則說明鎖已被獲取,創(chuàng)建一個(gè) future
future = asyncio.Future()
# 將它放在雙端隊(duì)列里面
self._waiters.append(future)
# 此時(shí)獲取鎖的協(xié)程就會(huì)陷入阻塞,等待其它協(xié)程喚醒
await future
# 如果解除阻塞,意味著該協(xié)程獲取到鎖了
self._locked = True
return True
def release(self):
# 釋放鎖,如果發(fā)現(xiàn)鎖沒被獲取,說明對(duì)鎖進(jìn)行了二次釋放
if not self._locked:
raise RuntimeError("鎖沒有被獲取")
# 將鎖的狀態(tài)改成 False,表示鎖被釋放了
self._locked = False
if len(self._waiters) == 0:
return
# 從雙端隊(duì)列 deque 的左側(cè)彈出 future
# 這個(gè) future 就是某個(gè)協(xié)程在獲取不到鎖時(shí)創(chuàng)建的
# 并通過 await future 讓自身陷入阻塞狀態(tài),等待被喚醒
future = self._waiters.popleft()
# 拿到 future 之后,執(zhí)行 future.set_result(),也就是設(shè)置結(jié)果集
# 那么對(duì)應(yīng)的協(xié)程就會(huì)解除阻塞,從而獲取鎖
future.set_result(True)
# 注意:因?yàn)?future 是從右邊添加的,所以要從 deque 的左側(cè)彈出
# 因?yàn)橄全@取鎖的協(xié)程要優(yōu)先解除阻塞
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.release()
整個(gè)過程非常簡(jiǎn)單,就是在獲取不到鎖時(shí),創(chuàng)建一個(gè) Future 對(duì)象并 await,此時(shí)就會(huì)陷入阻塞。當(dāng)然獲取鎖的協(xié)程可能有很多,它們創(chuàng)建的 future 會(huì)保存在一個(gè)雙端隊(duì)列里面。
而拿到鎖的協(xié)程,在操作完臨界區(qū)并釋放鎖時(shí),會(huì)從雙端隊(duì)列的左側(cè)彈出一個(gè) future,并為其設(shè)置結(jié)果集。那么創(chuàng)建該 future 的協(xié)程就會(huì)解除阻塞,從而獲取到鎖。
因此這就是 asyncio 鎖的實(shí)現(xiàn)方式,一點(diǎn)都不神秘。當(dāng)然 asyncio 內(nèi)部還做了一些異常檢測(cè),以及檢測(cè) future 是否已取消等等,我們這里省略了。有興趣可以看一看 asyncio 內(nèi)部鎖的實(shí)現(xiàn)細(xì)節(jié),整體邏輯和我們這里基本一致,并且我們這里手動(dòng)實(shí)現(xiàn)的鎖在大部分場(chǎng)景下和 asyncio 的鎖都是等效的。
然后補(bǔ)充一點(diǎn),你在使用 asyncio 鎖的時(shí)候,一定不要以全局變量的形式創(chuàng)建。
import asyncio
lock = asyncio.Lock()
async def a():
async with lock:
print("協(xié)程 a 成功獲取了鎖, 并進(jìn)入臨界區(qū)執(zhí)行操作")
await asyncio.sleep(2)
print("協(xié)程 a 釋放了鎖")
async def b():
async with lock:
print("協(xié)程 b 成功獲取了鎖, 并進(jìn)入臨界區(qū)執(zhí)行操作")
await asyncio.sleep(2)
print("協(xié)程 b 釋放了鎖")
async def main():
await asyncio.gather(a(), b())
asyncio.run(main())
如果這樣做,很快會(huì)看到崩潰的發(fā)生,并報(bào)告多個(gè)事件循環(huán)的錯(cuò)誤:
RuntimeError: ..... attached to a different loop
這是 asyncio 庫的一個(gè)令人困惑的地方,而且這種現(xiàn)象也不是鎖特有的,asyncio 中的大多數(shù)對(duì)象在創(chuàng)建時(shí)都會(huì)提供一個(gè)可選的 loop 參數(shù),允許你指定要運(yùn)行的事件循環(huán)。
當(dāng)未提供此參數(shù)時(shí),asyncio 嘗試獲取當(dāng)前正在運(yùn)行的事件循環(huán),如果沒有,則創(chuàng)建一個(gè)新的事件循環(huán)。在上例中,創(chuàng)建鎖的同時(shí)會(huì)創(chuàng)建一個(gè)事件循環(huán),因?yàn)閯?chuàng)建鎖時(shí)還沒有事件循環(huán)。然后 asyncio.run(main()) 會(huì)創(chuàng)建第二個(gè)事件循環(huán),試圖使用鎖時(shí),這兩個(gè)獨(dú)立的事件循環(huán)就會(huì)混合在一起導(dǎo)致崩潰。
這種行為比較棘手,因此在 Python 3.10 中會(huì)移除 loop 參數(shù),這種令人困惑的行為也會(huì)消失。但在 3.10 之前,在使用全局 asyncio 變量時(shí)需要認(rèn)真考慮這些情況。
說完了鎖,再來說說信號(hào)量。鎖負(fù)責(zé)保證同一時(shí)刻只能有一個(gè)協(xié)程去操作臨界區(qū),而信號(hào)量在創(chuàng)建時(shí)會(huì)接收一個(gè)初始值 value,可以保證同一時(shí)刻最多有 value 個(gè)協(xié)程去操作臨界區(qū)。
因此可以把鎖看成是初始值 value 等于 1 的信號(hào)量,它在源碼中的實(shí)現(xiàn)和鎖基本是類似的,我們也手動(dòng)實(shí)現(xiàn)一下。
import asyncio
from collections import deque
class Semaphore:
def __init__(self, value=1):
self._waiters = deque()
# 可以把 self._value 看成是令牌的數(shù)量
# 每當(dāng)一個(gè)協(xié)程進(jìn)入臨界區(qū),令牌數(shù)減 1,離開臨界區(qū),令牌數(shù)加 1
# 如果 self._value 小于等于 0,說明令牌用光了,此時(shí)就不允許進(jìn)入臨界區(qū)
self._value = value
@property
def locked(self):
return self._value <= 0
async def acquire(self):
# 如果 self._value > 0,說明可以進(jìn)入臨界區(qū)
if not self.locked:
self._value -= 1 # self._value 要減 1
return True
# 如果 self._value <= 0,說明此時(shí)不能進(jìn)去臨界區(qū),必須等待某個(gè)協(xié)程從臨界區(qū)出來
# 那么和鎖一樣,也是創(chuàng)建一個(gè) future 并放在雙端隊(duì)列里面
future = asyncio.Future()
self._waiters.append(future)
# 此時(shí)獲取信號(hào)量的協(xié)程會(huì)陷入阻塞
await future
# 解除阻塞,意味著該協(xié)程獲取到信號(hào)量了
self._value -= 1
return True
def release(self):
# 釋放信號(hào)量,說白了就是將 self._value 加 1
self._value += 1
if len(self._waiters) == 0:
return
future = self._waiters.popleft()
future.set_result(True)
async def __aenter__(self):
await self.acquire()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.release()
信號(hào)量和鎖的實(shí)現(xiàn)方式是一樣的,鎖可以看成是 value 為 1 的信號(hào)量。當(dāng)協(xié)程進(jìn)入臨界區(qū),value 的值會(huì)減少 1,離開臨界區(qū) value 的值會(huì)增加 1。如果 value 為 0,那么后續(xù)協(xié)程就不允許進(jìn)入臨界區(qū)了,必須等到某個(gè)協(xié)程從臨界區(qū)出來。
說到這,再來補(bǔ)充一個(gè)有界信號(hào)量,因?yàn)樾盘?hào)量有一個(gè)問題。
import asyncio
from asyncio import Semaphore
import time
async def bar(sem: Semaphore):
async with sem:
await asyncio.sleep(3)
async def main():
# 每次允許兩個(gè)協(xié)程進(jìn)入臨界區(qū)
sem = Semaphore(2)
# 創(chuàng)建 4 個(gè)任務(wù)
task = [asyncio.create_task(bar(sem)) for _ in range(4)]
# 直接對(duì) sem 執(zhí)行 release
sem.release()
sem.release()
await asyncio.gather(*task)
start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"總耗時(shí): {end - start}")
"""
總耗時(shí): 3.003426834
"""
創(chuàng)建了 4 個(gè)任務(wù),每次只允許兩個(gè)協(xié)程進(jìn)入臨界區(qū),因此總耗時(shí)應(yīng)該是 6 秒才對(duì)。但問題是我們創(chuàng)建完信號(hào)量之后,調(diào)用了兩次 release 方法,將內(nèi)部的 value 值增加了 2,此時(shí)信號(hào)量就變成了同時(shí)允許 4 個(gè)協(xié)程進(jìn)入臨界區(qū)。
因此和鎖不一樣,鎖一旦被釋放,就不能再二次釋放。而信號(hào)量被釋放,其實(shí)就是將內(nèi)部的 value 加 1,并且不會(huì)對(duì)內(nèi)部的 value 進(jìn)行檢測(cè)。
import asyncio
from asyncio import Semaphore
async def main():
sem = Semaphore(2)
print(f"before value: {sem._value}")
for _ in range(100):
sem.release()
print(f"after value: {sem._value}")
asyncio.run(main())
"""
before value: 2
after value: 102
"""
不過這個(gè)問題基本很少發(fā)生,當(dāng)然也可以使用 async with 語句,這樣獲取和釋放一定是成對(duì)出現(xiàn)的。
而有界信號(hào)量在信號(hào)量的基礎(chǔ)上做了一層檢測(cè),如果在 release 的時(shí)候發(fā)現(xiàn) value 已經(jīng)達(dá)到了初始值,那么會(huì)報(bào)錯(cuò)。
圖片
有界信號(hào)量會(huì)將初始值 value 單獨(dú)保存起來,如果釋放時(shí)發(fā)現(xiàn) value 大于等于初始值,那么報(bào)錯(cuò)。但是注意:有界信號(hào)量依舊可以多次 release,不過我們基本不會(huì)這么干,因?yàn)楂@取和釋放應(yīng)該是成對(duì)出現(xiàn)的。
以上我們就說完了協(xié)程里面的鎖和信號(hào)量,再來看看線程提供的。
線程鎖和信號(hào)量
線程鎖可以通過 threading 模塊創(chuàng)建。
import threading
lock = threading.Lock()
注意:Lock 并不是一個(gè)類,而是一個(gè)函數(shù),看一下源代碼。
Lock = _allocate_lock
# threading.Lock() 其實(shí)就是 _thread.allocate_lock()
_allocate_lock = _thread.allocate_lock
調(diào)用 _thread.allocate_lock() 時(shí)會(huì)在內(nèi)部創(chuàng)建鎖,而鎖是由 _thread 模塊實(shí)現(xiàn)的。
import threading
import _thread
lock = threading.Lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
lock = _thread.allocate_lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
所以線程鎖其實(shí)是一個(gè) _thread.lock 對(duì)象。
補(bǔ)充一下,Python 有很多的模塊是由 C 實(shí)現(xiàn)的,因?yàn)樗鼈兒托阅苊芮邢嚓P(guān),編譯之后會(huì)內(nèi)嵌在解釋器里面。舉個(gè)例子:
import random, _random
import re, _sre
import ssl, _ssl
import io, _io
import bisect, _bisect
import heapq, _heapq
import asyncio, _asyncio
import threading, _thread
這些 C 實(shí)現(xiàn)的模塊,名字前面一般會(huì)帶有一個(gè)下滑線,它們內(nèi)嵌在解釋器里面,你在 Lib 目錄下是找不到的。但我們不需要直接使用這些模塊,解釋器會(huì)提供相應(yīng)的 Python 模塊對(duì)其進(jìn)行封裝。
我們只需要導(dǎo)入 Python 模塊即可,在內(nèi)部會(huì)調(diào)用具體的 C 實(shí)現(xiàn),以 io 模塊為例。
圖片
這些類都是 _io 實(shí)現(xiàn)的,而 io 只是做了一層封裝,因此在實(shí)際編碼時(shí)會(huì)使用 C 實(shí)現(xiàn)的 _io 模塊里的邏輯。
再比如內(nèi)置函數(shù) open,它其實(shí)就是 io.open,而 io 里面的 open 是從 _io 導(dǎo)入進(jìn)來的。
import io
import _io
print(open is io.open is _io.open) # True
好了,說了這么多只是想表示線程鎖的具體實(shí)現(xiàn)不在 threading 里面,而是在 _thread 里面。_thread 是一個(gè) C 實(shí)現(xiàn)的模塊,我們需要到解釋器里面才能看到具體實(shí)現(xiàn)。
在 Modules/_threadmodule.c 中,有一個(gè)結(jié)構(gòu)體實(shí)例 Locktype,它便是 _thread.lock 這個(gè)類的底層實(shí)現(xiàn)。
圖片
_thread.lock 實(shí)例化后會(huì)得到鎖,鎖在底層對(duì)應(yīng)的是 lockobject 結(jié)構(gòu)體。
// _threadmodule.c
typedef struct {
PyObject_HEAD
PyThread_type_lock lock_lock;
PyObject *in_weakreflist;
char locked;
} lockobject;
// pythread.h
typedef void *PyThread_type_lock;
解釋一下這個(gè)結(jié)構(gòu)體。
PyObject_HEAD
每個(gè)對(duì)象都具備的頭部信息,它包含了對(duì)象的引用計(jì)數(shù)和類型。
lock_lock
PyThread_type_lock 是 void * 的類型別名,所以 lock_lock 是一個(gè) void * 類型的指針,該指針指向了真正的鎖,這個(gè)鎖是底層操作系統(tǒng)提供的。
和協(xié)程鎖不同,由于操作系統(tǒng)感知不到協(xié)程,因此協(xié)程鎖是基于 Future 對(duì)象實(shí)現(xiàn)的。但線程鎖則是基于操作系統(tǒng)實(shí)現(xiàn)的,當(dāng) Python 代碼創(chuàng)建鎖、獲取鎖、解鎖時(shí),會(huì)通過 lock_lock 指針將這些操作轉(zhuǎn)發(fā)到具體的鎖實(shí)現(xiàn)上。
in_weakreflist
用于創(chuàng)建弱引用,關(guān)于什么是弱引用,我在之前的文章中介紹過。
locked
用于標(biāo)記鎖狀態(tài),把它當(dāng)成 Python 的布爾值即可,值為 1 表示鎖已被獲?。ㄒ焰i定),0 表示未被獲?。ㄎ存i定)。
這幾個(gè)字段應(yīng)該很好理解,然后我們來看一下鎖的具體方法,那么方法都定義在哪呢?我們說過,實(shí)例對(duì)象有哪些行為,取決于類型對(duì)象定義了哪些操作。
因此鎖的操作都定義在 Locktype 里面,由內(nèi)部的 tp_methods 字段負(fù)責(zé)維護(hù)。
圖片
該字段被賦值為 lock_methods,所以鎖的方法都在 lock_methods 數(shù)組中。
圖片
以上就是鎖能夠使用的方法,我們來驗(yàn)證一下。
import threading
lock = threading.Lock()
# acquire_lock 和 acquire 基本是等價(jià)的
# release_lock 和 release 也基本是等價(jià)的
# 不過我們一般都會(huì)使用 acquire 和 lock
lock.acquire_lock() # 獲取鎖
lock.release_lock() # 釋放鎖
lock.acquire() # 獲取鎖
lock.release() # 釋放鎖
# 同理 locked_lock 和 locked 也是等價(jià)的
# 表示鎖是否被獲?。ㄒ焰i定),不過我們一般使用 locked
print(lock.locked_lock())
print(lock.locked())
lock.acquire()
print(lock.locked_lock())
print(lock.locked())
lock.release()
"""
False
False
True
True
"""
# 還提供了上下文管理,等價(jià)于 lock.acquire + lock.release
with lock:
pass
好了,接下來我們看看 acquire 方法,也就是鎖是怎么獲取的。
static PyObject *
lock_PyThread_acquire_lock(
lockobject *self,
PyObject *args,
PyObject *kwds
){
_PyTime_t timeout; // 超時(shí)時(shí)間
// 一個(gè)枚舉,表示鎖狀態(tài),有三個(gè)可選值
// PY_LOCK_FAILURE:表示因?yàn)殒i已被持有,而獲取失敗
// PY_LOCK_ACQUIRED:表示鎖可用,并成功獲取鎖
// PY_LOCK_INTR:表示獲取鎖的操作被中斷,比如抵達(dá)超時(shí)時(shí)間
PyLockStatus r;
// 參數(shù)解析,該方法接收一個(gè) timeout 參數(shù)
if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
return NULL;
// 獲取鎖,并指定一個(gè)超時(shí)時(shí)間,不傳則表示沒有超時(shí)時(shí)間
// 那么在獲取不到鎖時(shí),會(huì)無限等待
r = acquire_timed(self->lock_lock, timeout);
// 如果返回的狀態(tài)為 PY_LOCK_INTR,說明達(dá)到超時(shí)時(shí)間
// 因此獲取鎖的操作被中斷,并且會(huì)拋出異常
if (r == PY_LOCK_INTR) {
return NULL;
}
// 如果返回的狀態(tài)為 PY_LOCK_ACQUIRED,表示鎖獲取成功
// 將鎖的 locked 字段設(shè)置為 1,表示鎖已被獲取
if (r == PY_LOCK_ACQUIRED)
self->locked = 1;
// 如果以上兩種狀態(tài)都不是,那么說明獲取失敗了
// 將 r == PY_LOCK_ACQUIRED 轉(zhuǎn)成布爾值返回
// 獲取成功返回 True,獲取失敗返回 False
return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}
整個(gè)過程仍然很簡(jiǎn)單,因此我們看到協(xié)程鎖和線程鎖的實(shí)現(xiàn)是類似的,它們都有一個(gè) locked 字段用于表示鎖是否已被獲取。
只不過協(xié)程鎖是基于 Future 對(duì)象實(shí)現(xiàn)的,當(dāng) await future 陷入阻塞時(shí),表示鎖已被其它協(xié)程獲取。當(dāng)解除阻塞時(shí),代表鎖被釋放了,自己獲取到鎖。
而線程鎖是基于操作系統(tǒng)實(shí)現(xiàn)的,它本質(zhì)上是對(duì)操作系統(tǒng)提供的鎖做了一個(gè)封裝。Python 線程在獲取鎖時(shí),底層會(huì)獲取操作系統(tǒng)的鎖。
而操作系統(tǒng)的鎖是怎么獲取的呢?在源碼中使用的是 acquire_time 函數(shù),它接收一個(gè)指針和一個(gè)超時(shí)時(shí)間。該指針便是 lockobject 的 lock_lock 字段,類型是 void *,它指向了操作系統(tǒng)提供的鎖實(shí)現(xiàn)。
圖片
acquire_time 函數(shù)做了一些參數(shù)處理后,又調(diào)用了 PyThread_acquire_lock_timed 函數(shù),顯然獲取鎖的邏輯位于該函數(shù)里面。
PyThread_acquire_lock_timed 函數(shù)在不同平臺(tái)有著不同的實(shí)現(xiàn),因?yàn)椴煌僮飨到y(tǒng)的鎖實(shí)現(xiàn)是不是一樣的,所以源碼中使用 void *。
圖片
我們以 Windows 系統(tǒng)為例:
圖片
雖然不同系統(tǒng)的函數(shù)實(shí)現(xiàn)不一樣,但參數(shù)是一致的。
- aLock:void * 指針,指向操作系統(tǒng)提供的鎖;
- microseconds:等待鎖的時(shí)間,以微妙為單位。如果值是負(fù)數(shù),表示無限等待,直到獲取鎖;
- intr_flag:如果設(shè)置為 1,那么當(dāng)?shù)却^程中出現(xiàn)了信號(hào)中斷時(shí),函數(shù)會(huì)提前返回。
函數(shù)的核心實(shí)現(xiàn)如下:
圖片
又調(diào)用了 EnterNonRecursiveMutex 函數(shù),該函數(shù)是真正獲取鎖的邏輯,參數(shù) aLock 指向了操作系統(tǒng)的互斥鎖。前面說過,不同系統(tǒng)有著不同的鎖實(shí)現(xiàn),所以具體使用時(shí)需要轉(zhuǎn)換。在 Windows 系統(tǒng)上,它被轉(zhuǎn)成了 PNRMUTEX。
typedef struct _NRMUTEX
{
// 對(duì)操作系統(tǒng)互斥鎖的封裝
PyMUTEX_T cs;
// 對(duì)條件變量的封裝,用于線程間的同步
// 允許線程在條件不滿足時(shí)等待,條件滿足時(shí)由其它線程通知等待的線程
// 條件變量一般和互斥鎖一起使用,避免競(jìng)爭(zhēng)條件和死鎖
PyCOND_T cv;
// 標(biāo)記互斥鎖是否已被獲取,1 表示已被獲取,0 表示未被獲取
int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;
所以 lockobject 的 lock_lock 指針指向的其實(shí)依舊不是 OS 互斥鎖,而是一個(gè)結(jié)構(gòu)體實(shí)例,結(jié)構(gòu)體內(nèi)部的字段 cs 封裝的才是 OS 互斥鎖。
圖片
lockobject 是線程鎖,也就是 Python 代碼中使用的鎖的底層實(shí)現(xiàn),而 NRMUTEX 則是封裝了操作系統(tǒng)提供的互斥鎖。注意這里面的兩個(gè) locked,它們都用于標(biāo)記鎖是否已被獲取。
最后來看看 EnterNonRecursiveMutex 函數(shù)的具體邏輯。
DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex,
DWORD milliseconds)
{
DWORD result = WAIT_OBJECT_0;
// 對(duì) OS 互斥鎖進(jìn)行鎖定,用于保護(hù)共享數(shù)據(jù),如果鎖定失敗直接返回
if (PyMUTEX_LOCK(&mutex->cs))
return WAIT_FAILED;
// 如果鎖定成功,那么將 locked 字段設(shè)置為 1,表示互斥鎖被獲取
// 但如果發(fā)現(xiàn) locked 已經(jīng)為 1 了,則說明已有別的線程將 locked 修改為 1
// 那么當(dāng)前線程就要等待,直到 locked 不為 1(鎖被釋放)
if (milliseconds == INFINITE) {
// 無限等待
while (mutex->locked) {
if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
result = WAIT_FAILED;
break;
}
}
} else if (milliseconds != 0) {
// 有時(shí)間限制的等待
ULONGLONG now, target = GetTickCount64() + milliseconds;
while (mutex->locked) {
if (PyCOND_TIMEDWAIT(
&mutex->cv, &mutex->cs,
(long long)milliseconds*1000) < 0)
{
result = WAIT_FAILED;
break;
}
now = GetTickCount64();
if (target <= now)
break;
milliseconds = (DWORD)(target-now);
}
}
// 在被喚醒之后,說明當(dāng)前線程獲取互斥鎖成功,于是將 locked 改成 1
if (!mutex->locked) {
mutex->locked = 1;
result = WAIT_OBJECT_0;
} else if (result == WAIT_OBJECT_0)
result = WAIT_TIMEOUT;
// 這里必須將操作系統(tǒng)的鎖釋放掉,因?yàn)閷?duì)于外界的線程而言,
// 鎖是否被獲?。ㄦi定),取決于 locked 字段是否為 1
PyMUTEX_UNLOCK(&mutex->cs);
return result;
}
代碼邏輯有一些讓人疑惑的地方,下面解釋一下。Python 里面調(diào)用 lock.acquire() 方法時(shí),表示要獲取線程鎖。但獲取線程鎖之前,要先獲取 OS 互斥鎖,如果獲取不到,那么壓根不允許進(jìn)入臨界區(qū)。
但解釋器在互斥鎖的基礎(chǔ)上又封裝了一層,如果獲取到了互斥鎖,還要將 locked 字段修改為 1。因?yàn)閺拇a邏輯上講,無論是線程鎖還是互斥鎖,只有當(dāng)它們內(nèi)部的 locked 字段為 1 時(shí),才算是獲取了鎖。
所以將互斥鎖的 locked 字段修改為 1 之后,后續(xù)還要將線程鎖的 locked 字段修改為 1,這樣才算是獲取了線程鎖。
到這里估計(jì)可能有人會(huì)產(chǎn)生一個(gè)疑問,為啥函數(shù)在一開始要獲取系統(tǒng)的互斥鎖,最后又釋放掉,這豈不是多此一舉?
if (PyMUTEX_LOCK(&mutex->cs))
return WAIT_FAILED;
//...
PyMUTEX_UNLOCK(&mutex->cs);
直接檢測(cè) locked 字段是否等于 1 不就行了嗎?其實(shí)原因有三個(gè):
- 保護(hù)共享狀態(tài):操作系統(tǒng)的互斥鎖 mutex-> cs 用于保護(hù)共享狀態(tài) mutex -> locked 的讀寫,在多線程環(huán)境中,任何對(duì)共享狀態(tài)的訪問都要同步,以防止競(jìng)態(tài)條件;
- 條件變量的同步:在使用條件變量 mutex -> cv 時(shí),通常需要結(jié)合互斥鎖使用,條件變量的等待和通知需要在互斥鎖的保護(hù)下進(jìn)行,以保證操作的原子性;
- 避免忙等待:如果只使用 mutex -> locked 進(jìn)行檢查,可能會(huì)陷入忙等待,即不斷地檢查鎖狀態(tài)而占用 CPU 資源。使用互斥鎖和條件變量可以讓線程在等待時(shí)被掛起,從而更有效地利用 CPU;
所以解釋器為 OS 互斥鎖引入了一個(gè)自定義的鎖狀態(tài) locked,OS 互斥鎖提供了對(duì) locked 的基本保護(hù),因?yàn)槎鄠€(gè)線程都要修改它。而自定義的鎖狀態(tài) locked 則用于實(shí)現(xiàn)同步邏輯,如果 locked 為 1,我們就認(rèn)為鎖被獲取了,locked 為 0,鎖就沒有被獲取。
協(xié)程鎖和線程鎖都是如此,所謂的獲取鎖、釋放鎖都是在修改 locked 字段的值。只不過在等待的時(shí)候,協(xié)程鎖使用的是 Future 對(duì)象,而線程鎖使用的是操作系統(tǒng)提供的互斥鎖和條件變量。
所以上面代碼中的 PyMUTEX_LOCK 通過之后,還要檢測(cè) locked 字段是否等于 1,代碼片段如下。
while (mutex->locked) {
if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
result = WAIT_FAILED;
break;
}
//...
如果 locked 是 1,說明互斥鎖已經(jīng)被獲取了,當(dāng)前線程要進(jìn)行等待,直到 locked 字段的值為 0。當(dāng)其它線程釋放鎖時(shí),會(huì)將 locked 字段修改為 0,并通過條件變量喚醒當(dāng)前線程。
該線程醒來后檢測(cè)到 locked 為 0,就知道互斥鎖已被釋放,自己可以獲取了,于是再將 locked 字段修改為 1。
說完了線程鎖的獲取,再來看看線程鎖的釋放,所謂釋放,其實(shí)就是將 locked 字段修改為 0 而已。
圖片
釋放互斥鎖的邏輯最終會(huì)調(diào)用如下函數(shù):
圖片
修改 locked 是不安全的,需要加鎖保護(hù)。所以 OS 互斥鎖就是為了保護(hù) locked 變量的修改,再配合條件變量實(shí)現(xiàn)阻塞等待以及自動(dòng)喚醒,但從代碼邏輯上講,將 locked 字段設(shè)置為 0,才算是真正釋放了鎖。
這部分邏輯稍微有點(diǎn)繞,總之記住一個(gè)重點(diǎn):所謂的鎖,它的核心就是結(jié)構(gòu)體的一個(gè)字段,這里是 locked。如果字段的值為 1,表示鎖被獲取了,字段的值為 0,表示鎖沒有被獲取。
- 而獲取鎖,本質(zhì)上就是將 locked 字段修改為 1;
- 而釋放鎖,本質(zhì)上就是將 locked 字段修改為 0;
當(dāng)鎖沒有被獲取時(shí),那么線程在獲取鎖和釋放鎖時(shí)的邏輯可以簡(jiǎn)化為如下:
圖片
但實(shí)際情況會(huì)有多個(gè)線程一起競(jìng)爭(zhēng)鎖,因此為了保護(hù)這個(gè)共享字段,以及實(shí)現(xiàn)阻塞等待和自動(dòng)喚醒,解釋器使用了操作系統(tǒng)的互斥鎖和條件變量。
小結(jié)
以上我們就剖析了協(xié)程鎖、信號(hào)量以及線程鎖的實(shí)現(xiàn)原理,至于線程里面的信號(hào)量,它的原理和協(xié)程的信號(hào)量是一樣的,只是實(shí)現(xiàn)方式不一樣。
圖片
線程的信號(hào)量包含了一個(gè)初始值 value,但它在實(shí)現(xiàn)阻塞等待以及喚醒的時(shí)候用的是條件變量,而條件變量的實(shí)現(xiàn)依賴于鎖。簡(jiǎn)單來說,獲取信號(hào)量的時(shí)候,self._value 會(huì)減 1,釋放信號(hào)量的時(shí)候,self._value 會(huì)加 1。
當(dāng) self._value 為 0 時(shí),獲取信號(hào)量會(huì)陷入阻塞,而當(dāng)某個(gè)線程退出臨界區(qū)釋放信號(hào)量的時(shí)候,會(huì)通過條件變量的 notify 機(jī)制喚醒阻塞的線程。
關(guān)于條件變量,我們以后再分析,有點(diǎn)餓了。
另外進(jìn)程也有鎖和信號(hào)量,這里也先不討論了,有點(diǎn)困了。