自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

深度解密協(xié)程鎖、信號(hào)量以及線程鎖的實(shí)現(xiàn)原理

開發(fā) 前端
實(shí)際情況會(huì)有多個(gè)線程一起競(jìng)爭(zhēng)鎖,因此為了保護(hù)這個(gè)共享字段,以及實(shí)現(xiàn)阻塞等待和自動(dòng)喚醒,解釋器使用了操作系統(tǒng)的互斥鎖和條件變量。

關(guān)于什么是信號(hào)量,相信大家都知道,那么本文便從源碼的角度來看看信號(hào)量是怎么實(shí)現(xiàn)的。不過在說信號(hào)量之前,必須先剖析一下鎖,理解了鎖才能更好地理解信號(hào)量。

那什么是鎖呢?如果程序中某個(gè)部分在并發(fā)操作時(shí)會(huì)出現(xiàn)意想不到的結(jié)果(比如操作一個(gè)共享的數(shù)據(jù)結(jié)構(gòu)),那么該部分就需要通過鎖保護(hù)起來,而被鎖保護(hù)起來的部分叫做臨界區(qū)。

線程在進(jìn)入臨界區(qū)之前必須先獲取鎖,然后才能操作共享資源。而鎖一旦被獲取,那么其它線程再嘗試獲取鎖,就會(huì)陷入阻塞,直到鎖被釋放。

圖片圖片

通過鎖,我們能確保同一時(shí)刻只能有一個(gè)線程操作共享資源,從而很好地解決資源競(jìng)爭(zhēng)問題。這里的鎖指的是互斥鎖,也被稱為排它鎖。

而在 Python 里面,鎖可以通過 asyncio 和 threading 模塊來創(chuàng)建,這兩個(gè)模塊都提供了鎖,一個(gè)是協(xié)程鎖,一個(gè)是線程鎖,當(dāng)然也包括信號(hào)量。

import asyncio
import threading

lock1 = asyncio.Lock()
lock2 = threading.Lock()

當(dāng)我們對(duì)類 Lock 實(shí)例化,便可以得到鎖,然后鎖有兩個(gè)常用方法。

  • acquire():獲取鎖;
  • release():釋放鎖;

API 非常簡(jiǎn)單,我們先來看看協(xié)程里面的鎖,以及信號(hào)量。

協(xié)程鎖和信號(hào)量

之前在介紹 asyncio 的 Future 和 Task 時(shí)說過,F(xiàn)uture 對(duì)象可以看作是一個(gè)容器,它保存了在未來某個(gè)時(shí)刻才會(huì)出現(xiàn)的結(jié)果。

如果 Future 對(duì)象里面還沒有結(jié)果集,那么它就處于未完成狀態(tài),否則處于已完成狀態(tài)。

import asyncio

future = asyncio.Future()
# 是否完成
print(future.done())
"""
False
"""
# 因?yàn)?future 此時(shí)還沒有結(jié)果集,所以是未完成狀態(tài)(PENDING)
# 設(shè)置結(jié)果集
future.set_result("S 老師不希望你們?yōu)榱怂鴥蓴【銈?)
# 由于設(shè)置了結(jié)果集,所以變成已完成狀態(tài)(FINISHED)
print(future.done())
"""
True
"""
# 獲取結(jié)果
print(future.result())
"""
S 老師不希望你們?yōu)榱怂鴥蓴【銈?"""

問題來了,如何在 future 完成時(shí)立刻拿到結(jié)果呢?總不能一直調(diào)用 done 方法輪詢吧。

很簡(jiǎn)單,我們可以對(duì) future 使用 await 表達(dá)式,如果 future 內(nèi)部還沒有結(jié)果集,那么 await 會(huì)處于阻塞狀態(tài),否則不會(huì)阻塞,并且還會(huì)將值取出來。

import asyncio

async def delay(future, seconds):
    await asyncio.sleep(seconds)
    print("給 future 設(shè)置結(jié)果集")
    future.set_result(666)

async def main():
    # 創(chuàng)建一個(gè) future
    future = asyncio.Future()
    loop = asyncio.get_running_loop()
    # 創(chuàng)建一個(gè)任務(wù),扔到事件循環(huán)
    loop.create_task(delay(future, 3))
    print("await future 會(huì)陷入阻塞,因?yàn)樗鼉?nèi)部還沒有結(jié)果集")
    # 該表達(dá)式會(huì)返回 666,因?yàn)榻o future 設(shè)置的結(jié)果是 666
    await future
    print(f"3 秒后結(jié)束阻塞,因?yàn)?delay 協(xié)程內(nèi)部給 future 設(shè)置了結(jié)果集")

asyncio.run(main())
"""
await future 會(huì)陷入阻塞,因?yàn)樗鼉?nèi)部還沒有結(jié)果集
給 future 設(shè)置結(jié)果集
3 秒后結(jié)束阻塞,因?yàn)?delay 協(xié)程內(nèi)部給 future 設(shè)置了結(jié)果集
"""

而協(xié)程在進(jìn)入事件循環(huán)時(shí)會(huì)自動(dòng)創(chuàng)建一個(gè) future,并將協(xié)程和 future 組合起來得到任務(wù),而 await 一個(gè)任務(wù)等價(jià)于 await future。當(dāng)協(xié)程沒有執(zhí)行完畢時(shí)會(huì)處于阻塞,而協(xié)程執(zhí)行完畢時(shí)會(huì)將返回值設(shè)置在 future 中,然后 await 表達(dá)式會(huì)拿到里面的結(jié)果。

在實(shí)際編碼中,我們一般很少手動(dòng)創(chuàng)建 Future 對(duì)象(future),但 Future 和 asyncio 的實(shí)現(xiàn)密切相關(guān),其中就包括了鎖。

當(dāng)協(xié)程在獲取鎖時(shí),如果發(fā)現(xiàn)鎖已被獲取,那么如何陷入阻塞呢?當(dāng)鎖被釋放時(shí),它又如何解除阻塞呢?答案就是通過 future。

假設(shè)協(xié)程 1 和協(xié)程 2 都要獲取鎖,它們都會(huì)調(diào)用鎖的 acquire 方法。其中協(xié)程 1 先獲取到,那么協(xié)程 2 就會(huì)創(chuàng)建一個(gè) future 并 await。由于 future 內(nèi)部還沒有結(jié)果集,因此協(xié)程 2 會(huì)處于阻塞。當(dāng)協(xié)程 1 釋放鎖時(shí),會(huì)給協(xié)程 2 創(chuàng)建的 future 設(shè)置一個(gè)結(jié)果,從而讓協(xié)程 2 解除阻塞、獲取到鎖。

我們手動(dòng)實(shí)現(xiàn)一下鎖。

import asyncio
from collections import deque

class Lock:

    def __init__(self):
        # 保存創(chuàng)建的 future
        self._waiters = deque()
        # 鎖是否已被獲取
        self._locked = False

    async def acquire(self):
        # 如果鎖沒有被獲取,那么獲取鎖
        if not self._locked:
            self._locked = True
            return True
        # 否則說明鎖已被獲取,創(chuàng)建一個(gè) future
        future = asyncio.Future()
        # 將它放在雙端隊(duì)列里面
        self._waiters.append(future)
        # 此時(shí)獲取鎖的協(xié)程就會(huì)陷入阻塞,等待其它協(xié)程喚醒
        await future
        # 如果解除阻塞,意味著該協(xié)程獲取到鎖了
        self._locked = True
        return True

    def release(self):
        # 釋放鎖,如果發(fā)現(xiàn)鎖沒被獲取,說明對(duì)鎖進(jìn)行了二次釋放
        if not self._locked:
            raise RuntimeError("鎖沒有被獲取")
        # 將鎖的狀態(tài)改成 False,表示鎖被釋放了
        self._locked = False
        if len(self._waiters) == 0:
            return
        # 從雙端隊(duì)列 deque 的左側(cè)彈出 future
        # 這個(gè) future 就是某個(gè)協(xié)程在獲取不到鎖時(shí)創(chuàng)建的
        # 并通過 await future 讓自身陷入阻塞狀態(tài),等待被喚醒
        future = self._waiters.popleft()
        # 拿到 future 之后,執(zhí)行 future.set_result(),也就是設(shè)置結(jié)果集
        # 那么對(duì)應(yīng)的協(xié)程就會(huì)解除阻塞,從而獲取鎖
        future.set_result(True)
        # 注意:因?yàn)?future 是從右邊添加的,所以要從 deque 的左側(cè)彈出
        # 因?yàn)橄全@取鎖的協(xié)程要優(yōu)先解除阻塞

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

整個(gè)過程非常簡(jiǎn)單,就是在獲取不到鎖時(shí),創(chuàng)建一個(gè) Future 對(duì)象并 await,此時(shí)就會(huì)陷入阻塞。當(dāng)然獲取鎖的協(xié)程可能有很多,它們創(chuàng)建的 future 會(huì)保存在一個(gè)雙端隊(duì)列里面。

而拿到鎖的協(xié)程,在操作完臨界區(qū)并釋放鎖時(shí),會(huì)從雙端隊(duì)列的左側(cè)彈出一個(gè) future,并為其設(shè)置結(jié)果集。那么創(chuàng)建該 future 的協(xié)程就會(huì)解除阻塞,從而獲取到鎖。

因此這就是 asyncio 鎖的實(shí)現(xiàn)方式,一點(diǎn)都不神秘。當(dāng)然 asyncio 內(nèi)部還做了一些異常檢測(cè),以及檢測(cè) future 是否已取消等等,我們這里省略了。有興趣可以看一看 asyncio 內(nèi)部鎖的實(shí)現(xiàn)細(xì)節(jié),整體邏輯和我們這里基本一致,并且我們這里手動(dòng)實(shí)現(xiàn)的鎖在大部分場(chǎng)景下和 asyncio 的鎖都是等效的。

然后補(bǔ)充一點(diǎn),你在使用 asyncio 鎖的時(shí)候,一定不要以全局變量的形式創(chuàng)建。

import asyncio

lock = asyncio.Lock()

async def a():
    async with lock:
        print("協(xié)程 a 成功獲取了鎖, 并進(jìn)入臨界區(qū)執(zhí)行操作")
        await asyncio.sleep(2)
    print("協(xié)程 a 釋放了鎖")

async def b():
    async with lock:
        print("協(xié)程 b 成功獲取了鎖, 并進(jìn)入臨界區(qū)執(zhí)行操作")
        await asyncio.sleep(2)
    print("協(xié)程 b 釋放了鎖")

async def main():
    await asyncio.gather(a(), b())

asyncio.run(main())

如果這樣做,很快會(huì)看到崩潰的發(fā)生,并報(bào)告多個(gè)事件循環(huán)的錯(cuò)誤:

RuntimeError: ..... attached to a different loop

這是 asyncio 庫的一個(gè)令人困惑的地方,而且這種現(xiàn)象也不是鎖特有的,asyncio 中的大多數(shù)對(duì)象在創(chuàng)建時(shí)都會(huì)提供一個(gè)可選的 loop 參數(shù),允許你指定要運(yùn)行的事件循環(huán)。

當(dāng)未提供此參數(shù)時(shí),asyncio 嘗試獲取當(dāng)前正在運(yùn)行的事件循環(huán),如果沒有,則創(chuàng)建一個(gè)新的事件循環(huán)。在上例中,創(chuàng)建鎖的同時(shí)會(huì)創(chuàng)建一個(gè)事件循環(huán),因?yàn)閯?chuàng)建鎖時(shí)還沒有事件循環(huán)。然后 asyncio.run(main()) 會(huì)創(chuàng)建第二個(gè)事件循環(huán),試圖使用鎖時(shí),這兩個(gè)獨(dú)立的事件循環(huán)就會(huì)混合在一起導(dǎo)致崩潰。

這種行為比較棘手,因此在 Python 3.10 中會(huì)移除 loop 參數(shù),這種令人困惑的行為也會(huì)消失。但在 3.10 之前,在使用全局 asyncio 變量時(shí)需要認(rèn)真考慮這些情況。

說完了鎖,再來說說信號(hào)量。鎖負(fù)責(zé)保證同一時(shí)刻只能有一個(gè)協(xié)程去操作臨界區(qū),而信號(hào)量在創(chuàng)建時(shí)會(huì)接收一個(gè)初始值 value,可以保證同一時(shí)刻最多有 value 個(gè)協(xié)程去操作臨界區(qū)。

因此可以把鎖看成是初始值 value 等于 1 的信號(hào)量,它在源碼中的實(shí)現(xiàn)和鎖基本是類似的,我們也手動(dòng)實(shí)現(xiàn)一下。

import asyncio
from collections import deque

class Semaphore:

    def __init__(self, value=1):
        self._waiters = deque()
        # 可以把 self._value 看成是令牌的數(shù)量
        # 每當(dāng)一個(gè)協(xié)程進(jìn)入臨界區(qū),令牌數(shù)減 1,離開臨界區(qū),令牌數(shù)加 1
        # 如果 self._value 小于等于 0,說明令牌用光了,此時(shí)就不允許進(jìn)入臨界區(qū)
        self._value = value

    @property
    def locked(self):
        return self._value <= 0

    async def acquire(self):
        # 如果 self._value > 0,說明可以進(jìn)入臨界區(qū)
        if not self.locked:
            self._value -= 1  # self._value 要減 1
            return True
        # 如果 self._value <= 0,說明此時(shí)不能進(jìn)去臨界區(qū),必須等待某個(gè)協(xié)程從臨界區(qū)出來
        # 那么和鎖一樣,也是創(chuàng)建一個(gè) future 并放在雙端隊(duì)列里面
        future = asyncio.Future()
        self._waiters.append(future)
        # 此時(shí)獲取信號(hào)量的協(xié)程會(huì)陷入阻塞
        await future
        # 解除阻塞,意味著該協(xié)程獲取到信號(hào)量了
        self._value -= 1
        return True

    def release(self):
        # 釋放信號(hào)量,說白了就是將 self._value 加 1
        self._value += 1
        if len(self._waiters) == 0:
            return
        future = self._waiters.popleft()
        future.set_result(True)

    async def __aenter__(self):
        await self.acquire()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.release()

信號(hào)量和鎖的實(shí)現(xiàn)方式是一樣的,鎖可以看成是 value 為 1 的信號(hào)量。當(dāng)協(xié)程進(jìn)入臨界區(qū),value 的值會(huì)減少 1,離開臨界區(qū) value 的值會(huì)增加 1。如果 value 為 0,那么后續(xù)協(xié)程就不允許進(jìn)入臨界區(qū)了,必須等到某個(gè)協(xié)程從臨界區(qū)出來。

說到這,再來補(bǔ)充一個(gè)有界信號(hào)量,因?yàn)樾盘?hào)量有一個(gè)問題。

import asyncio
from asyncio import Semaphore
import time

async def bar(sem: Semaphore):
    async with sem:
        await asyncio.sleep(3)

async def main():
    # 每次允許兩個(gè)協(xié)程進(jìn)入臨界區(qū)
    sem = Semaphore(2)
    # 創(chuàng)建 4 個(gè)任務(wù)
    task = [asyncio.create_task(bar(sem)) for _ in range(4)]
    # 直接對(duì) sem 執(zhí)行 release
    sem.release()
    sem.release()
    await asyncio.gather(*task)

start = time.perf_counter()
asyncio.run(main())
end = time.perf_counter()
print(f"總耗時(shí): {end - start}")
"""
總耗時(shí): 3.003426834
"""

創(chuàng)建了 4 個(gè)任務(wù),每次只允許兩個(gè)協(xié)程進(jìn)入臨界區(qū),因此總耗時(shí)應(yīng)該是 6 秒才對(duì)。但問題是我們創(chuàng)建完信號(hào)量之后,調(diào)用了兩次 release 方法,將內(nèi)部的 value 值增加了 2,此時(shí)信號(hào)量就變成了同時(shí)允許 4 個(gè)協(xié)程進(jìn)入臨界區(qū)。

因此和鎖不一樣,鎖一旦被釋放,就不能再二次釋放。而信號(hào)量被釋放,其實(shí)就是將內(nèi)部的 value 加 1,并且不會(huì)對(duì)內(nèi)部的 value 進(jìn)行檢測(cè)。

import asyncio
from asyncio import Semaphore

async def main():
    sem = Semaphore(2)
    print(f"before value: {sem._value}")
    for _ in range(100):
        sem.release()
    print(f"after value: {sem._value}")

asyncio.run(main())
"""
before value: 2
after value: 102
"""

不過這個(gè)問題基本很少發(fā)生,當(dāng)然也可以使用 async with 語句,這樣獲取和釋放一定是成對(duì)出現(xiàn)的。

而有界信號(hào)量在信號(hào)量的基礎(chǔ)上做了一層檢測(cè),如果在 release 的時(shí)候發(fā)現(xiàn) value 已經(jīng)達(dá)到了初始值,那么會(huì)報(bào)錯(cuò)。

圖片圖片

有界信號(hào)量會(huì)將初始值 value 單獨(dú)保存起來,如果釋放時(shí)發(fā)現(xiàn) value 大于等于初始值,那么報(bào)錯(cuò)。但是注意:有界信號(hào)量依舊可以多次 release,不過我們基本不會(huì)這么干,因?yàn)楂@取和釋放應(yīng)該是成對(duì)出現(xiàn)的。

以上我們就說完了協(xié)程里面的鎖和信號(hào)量,再來看看線程提供的。

線程鎖和信號(hào)量

線程鎖可以通過 threading 模塊創(chuàng)建。

import threading

lock = threading.Lock()

注意:Lock 并不是一個(gè)類,而是一個(gè)函數(shù),看一下源代碼。

Lock = _allocate_lock
# threading.Lock() 其實(shí)就是 _thread.allocate_lock()
_allocate_lock = _thread.allocate_lock

調(diào)用 _thread.allocate_lock() 時(shí)會(huì)在內(nèi)部創(chuàng)建鎖,而鎖是由 _thread 模塊實(shí)現(xiàn)的。

import threading
import _thread

lock = threading.Lock()
print(type(lock))
"""
<class '_thread.lock'>
"""
lock = _thread.allocate_lock()
print(type(lock))
"""
<class '_thread.lock'>
"""

所以線程鎖其實(shí)是一個(gè) _thread.lock 對(duì)象。

補(bǔ)充一下,Python 有很多的模塊是由 C 實(shí)現(xiàn)的,因?yàn)樗鼈兒托阅苊芮邢嚓P(guān),編譯之后會(huì)內(nèi)嵌在解釋器里面。舉個(gè)例子:

import random, _random
import re, _sre
import ssl, _ssl
import io, _io
import bisect, _bisect
import heapq, _heapq
import asyncio, _asyncio
import threading, _thread

這些 C 實(shí)現(xiàn)的模塊,名字前面一般會(huì)帶有一個(gè)下滑線,它們內(nèi)嵌在解釋器里面,你在 Lib 目錄下是找不到的。但我們不需要直接使用這些模塊,解釋器會(huì)提供相應(yīng)的 Python 模塊對(duì)其進(jìn)行封裝。

我們只需要導(dǎo)入 Python 模塊即可,在內(nèi)部會(huì)調(diào)用具體的 C 實(shí)現(xiàn),以 io 模塊為例。

圖片圖片

這些類都是 _io 實(shí)現(xiàn)的,而 io 只是做了一層封裝,因此在實(shí)際編碼時(shí)會(huì)使用 C 實(shí)現(xiàn)的 _io 模塊里的邏輯。

再比如內(nèi)置函數(shù) open,它其實(shí)就是 io.open,而 io 里面的 open 是從 _io 導(dǎo)入進(jìn)來的。

import io
import _io

print(open is io.open is _io.open)  # True

好了,說了這么多只是想表示線程鎖的具體實(shí)現(xiàn)不在 threading 里面,而是在 _thread 里面。_thread 是一個(gè) C 實(shí)現(xiàn)的模塊,我們需要到解釋器里面才能看到具體實(shí)現(xiàn)。

在 Modules/_threadmodule.c 中,有一個(gè)結(jié)構(gòu)體實(shí)例 Locktype,它便是 _thread.lock 這個(gè)類的底層實(shí)現(xiàn)。

圖片圖片

_thread.lock 實(shí)例化后會(huì)得到鎖,鎖在底層對(duì)應(yīng)的是 lockobject 結(jié)構(gòu)體。

// _threadmodule.c
typedef struct {
    PyObject_HEAD
    PyThread_type_lock lock_lock;
    PyObject *in_weakreflist;
    char locked;
} lockobject;
// pythread.h
typedef void *PyThread_type_lock;

解釋一下這個(gè)結(jié)構(gòu)體。

PyObject_HEAD

每個(gè)對(duì)象都具備的頭部信息,它包含了對(duì)象的引用計(jì)數(shù)和類型。

lock_lock

PyThread_type_lock 是 void * 的類型別名,所以 lock_lock 是一個(gè) void * 類型的指針,該指針指向了真正的鎖,這個(gè)鎖是底層操作系統(tǒng)提供的。

和協(xié)程鎖不同,由于操作系統(tǒng)感知不到協(xié)程,因此協(xié)程鎖是基于 Future 對(duì)象實(shí)現(xiàn)的。但線程鎖則是基于操作系統(tǒng)實(shí)現(xiàn)的,當(dāng) Python 代碼創(chuàng)建鎖、獲取鎖、解鎖時(shí),會(huì)通過 lock_lock 指針將這些操作轉(zhuǎn)發(fā)到具體的鎖實(shí)現(xiàn)上。

in_weakreflist

用于創(chuàng)建弱引用,關(guān)于什么是弱引用,我在之前的文章中介紹過。

locked

用于標(biāo)記鎖狀態(tài),把它當(dāng)成 Python 的布爾值即可,值為 1 表示鎖已被獲?。ㄒ焰i定),0 表示未被獲?。ㄎ存i定)。

這幾個(gè)字段應(yīng)該很好理解,然后我們來看一下鎖的具體方法,那么方法都定義在哪呢?我們說過,實(shí)例對(duì)象有哪些行為,取決于類型對(duì)象定義了哪些操作。

因此鎖的操作都定義在 Locktype 里面,由內(nèi)部的 tp_methods 字段負(fù)責(zé)維護(hù)。

圖片圖片

該字段被賦值為 lock_methods,所以鎖的方法都在 lock_methods 數(shù)組中。

圖片圖片

以上就是鎖能夠使用的方法,我們來驗(yàn)證一下。

import threading

lock = threading.Lock()

# acquire_lock 和 acquire 基本是等價(jià)的
# release_lock 和 release 也基本是等價(jià)的
# 不過我們一般都會(huì)使用 acquire 和 lock
lock.acquire_lock()  # 獲取鎖
lock.release_lock()  # 釋放鎖

lock.acquire()  # 獲取鎖
lock.release()  # 釋放鎖

# 同理 locked_lock 和 locked 也是等價(jià)的
# 表示鎖是否被獲?。ㄒ焰i定),不過我們一般使用 locked
print(lock.locked_lock())
print(lock.locked())
lock.acquire()
print(lock.locked_lock())
print(lock.locked())
lock.release()
"""
False
False
True
True
"""
# 還提供了上下文管理,等價(jià)于 lock.acquire + lock.release
with lock:
    pass

好了,接下來我們看看 acquire 方法,也就是鎖是怎么獲取的。

static PyObject *
lock_PyThread_acquire_lock(
    lockobject *self, 
    PyObject *args, 
    PyObject *kwds
){
    _PyTime_t timeout;  // 超時(shí)時(shí)間
    // 一個(gè)枚舉,表示鎖狀態(tài),有三個(gè)可選值
    // PY_LOCK_FAILURE:表示因?yàn)殒i已被持有,而獲取失敗
    // PY_LOCK_ACQUIRED:表示鎖可用,并成功獲取鎖
    // PY_LOCK_INTR:表示獲取鎖的操作被中斷,比如抵達(dá)超時(shí)時(shí)間
    PyLockStatus r;
    
    // 參數(shù)解析,該方法接收一個(gè) timeout 參數(shù)
    if (lock_acquire_parse_args(args, kwds, &timeout) < 0)
        return NULL;
    
    // 獲取鎖,并指定一個(gè)超時(shí)時(shí)間,不傳則表示沒有超時(shí)時(shí)間
    // 那么在獲取不到鎖時(shí),會(huì)無限等待
    r = acquire_timed(self->lock_lock, timeout);
    // 如果返回的狀態(tài)為 PY_LOCK_INTR,說明達(dá)到超時(shí)時(shí)間
    // 因此獲取鎖的操作被中斷,并且會(huì)拋出異常
    if (r == PY_LOCK_INTR) {
        return NULL;
    }
    // 如果返回的狀態(tài)為 PY_LOCK_ACQUIRED,表示鎖獲取成功
    // 將鎖的 locked 字段設(shè)置為 1,表示鎖已被獲取
    if (r == PY_LOCK_ACQUIRED)
        self->locked = 1;
    // 如果以上兩種狀態(tài)都不是,那么說明獲取失敗了
    // 將 r == PY_LOCK_ACQUIRED 轉(zhuǎn)成布爾值返回
    // 獲取成功返回 True,獲取失敗返回 False
    return PyBool_FromLong(r == PY_LOCK_ACQUIRED);
}

整個(gè)過程仍然很簡(jiǎn)單,因此我們看到協(xié)程鎖和線程鎖的實(shí)現(xiàn)是類似的,它們都有一個(gè) locked 字段用于表示鎖是否已被獲取。

只不過協(xié)程鎖是基于 Future 對(duì)象實(shí)現(xiàn)的,當(dāng) await future 陷入阻塞時(shí),表示鎖已被其它協(xié)程獲取。當(dāng)解除阻塞時(shí),代表鎖被釋放了,自己獲取到鎖。

而線程鎖是基于操作系統(tǒng)實(shí)現(xiàn)的,它本質(zhì)上是對(duì)操作系統(tǒng)提供的鎖做了一個(gè)封裝。Python 線程在獲取鎖時(shí),底層會(huì)獲取操作系統(tǒng)的鎖。

而操作系統(tǒng)的鎖是怎么獲取的呢?在源碼中使用的是 acquire_time 函數(shù),它接收一個(gè)指針和一個(gè)超時(shí)時(shí)間。該指針便是 lockobject 的 lock_lock 字段,類型是 void *,它指向了操作系統(tǒng)提供的鎖實(shí)現(xiàn)。

圖片圖片

acquire_time 函數(shù)做了一些參數(shù)處理后,又調(diào)用了 PyThread_acquire_lock_timed  函數(shù),顯然獲取鎖的邏輯位于該函數(shù)里面。

PyThread_acquire_lock_timed 函數(shù)在不同平臺(tái)有著不同的實(shí)現(xiàn),因?yàn)椴煌僮飨到y(tǒng)的鎖實(shí)現(xiàn)是不是一樣的,所以源碼中使用 void *。

圖片圖片

我們以 Windows 系統(tǒng)為例:

圖片圖片

雖然不同系統(tǒng)的函數(shù)實(shí)現(xiàn)不一樣,但參數(shù)是一致的。

  • aLock:void * 指針,指向操作系統(tǒng)提供的鎖;
  • microseconds:等待鎖的時(shí)間,以微妙為單位。如果值是負(fù)數(shù),表示無限等待,直到獲取鎖;
  • intr_flag:如果設(shè)置為 1,那么當(dāng)?shù)却^程中出現(xiàn)了信號(hào)中斷時(shí),函數(shù)會(huì)提前返回。

函數(shù)的核心實(shí)現(xiàn)如下:

圖片圖片

又調(diào)用了 EnterNonRecursiveMutex 函數(shù),該函數(shù)是真正獲取鎖的邏輯,參數(shù) aLock 指向了操作系統(tǒng)的互斥鎖。前面說過,不同系統(tǒng)有著不同的鎖實(shí)現(xiàn),所以具體使用時(shí)需要轉(zhuǎn)換。在 Windows 系統(tǒng)上,它被轉(zhuǎn)成了 PNRMUTEX。

typedef struct _NRMUTEX
{   
    // 對(duì)操作系統(tǒng)互斥鎖的封裝
    PyMUTEX_T cs;
    // 對(duì)條件變量的封裝,用于線程間的同步
    // 允許線程在條件不滿足時(shí)等待,條件滿足時(shí)由其它線程通知等待的線程
    // 條件變量一般和互斥鎖一起使用,避免競(jìng)爭(zhēng)條件和死鎖
    PyCOND_T cv;
    // 標(biāo)記互斥鎖是否已被獲取,1 表示已被獲取,0 表示未被獲取
    int locked;
} NRMUTEX;
typedef NRMUTEX *PNRMUTEX;

所以 lockobject 的 lock_lock 指針指向的其實(shí)依舊不是 OS 互斥鎖,而是一個(gè)結(jié)構(gòu)體實(shí)例,結(jié)構(gòu)體內(nèi)部的字段 cs 封裝的才是 OS 互斥鎖。

圖片圖片

lockobject 是線程鎖,也就是 Python 代碼中使用的鎖的底層實(shí)現(xiàn),而 NRMUTEX 則是封裝了操作系統(tǒng)提供的互斥鎖。注意這里面的兩個(gè) locked,它們都用于標(biāo)記鎖是否已被獲取。

最后來看看 EnterNonRecursiveMutex 函數(shù)的具體邏輯。

DWORD
EnterNonRecursiveMutex(PNRMUTEX mutex, 
                       DWORD milliseconds)
{
    
    DWORD result = WAIT_OBJECT_0;
    // 對(duì) OS 互斥鎖進(jìn)行鎖定,用于保護(hù)共享數(shù)據(jù),如果鎖定失敗直接返回
    if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    // 如果鎖定成功,那么將 locked 字段設(shè)置為 1,表示互斥鎖被獲取
    // 但如果發(fā)現(xiàn) locked 已經(jīng)為 1 了,則說明已有別的線程將 locked 修改為 1
    // 那么當(dāng)前線程就要等待,直到 locked 不為 1(鎖被釋放)
    if (milliseconds == INFINITE) {
        // 無限等待
        while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        }
    } else if (milliseconds != 0) {
        // 有時(shí)間限制的等待
        ULONGLONG now, target = GetTickCount64() + milliseconds;
        while (mutex->locked) {
            if (PyCOND_TIMEDWAIT(
                &mutex->cv, &mutex->cs, 
                (long long)milliseconds*1000) < 0) 
            {
                result = WAIT_FAILED;
                break;
            }
            now = GetTickCount64();
            if (target <= now)
                break;
            milliseconds = (DWORD)(target-now);
        }
    }
    // 在被喚醒之后,說明當(dāng)前線程獲取互斥鎖成功,于是將 locked 改成 1
    if (!mutex->locked) {
        mutex->locked = 1;
        result = WAIT_OBJECT_0;
    } else if (result == WAIT_OBJECT_0)
        result = WAIT_TIMEOUT;
    // 這里必須將操作系統(tǒng)的鎖釋放掉,因?yàn)閷?duì)于外界的線程而言,
    // 鎖是否被獲?。ㄦi定),取決于 locked 字段是否為 1
    PyMUTEX_UNLOCK(&mutex->cs); 
    return result;
}

代碼邏輯有一些讓人疑惑的地方,下面解釋一下。Python 里面調(diào)用 lock.acquire() 方法時(shí),表示要獲取線程鎖。但獲取線程鎖之前,要先獲取 OS 互斥鎖,如果獲取不到,那么壓根不允許進(jìn)入臨界區(qū)。

但解釋器在互斥鎖的基礎(chǔ)上又封裝了一層,如果獲取到了互斥鎖,還要將 locked 字段修改為 1。因?yàn)閺拇a邏輯上講,無論是線程鎖還是互斥鎖,只有當(dāng)它們內(nèi)部的 locked 字段為 1 時(shí),才算是獲取了鎖。

所以將互斥鎖的 locked 字段修改為 1 之后,后續(xù)還要將線程鎖的 locked 字段修改為 1,這樣才算是獲取了線程鎖。

到這里估計(jì)可能有人會(huì)產(chǎn)生一個(gè)疑問,為啥函數(shù)在一開始要獲取系統(tǒng)的互斥鎖,最后又釋放掉,這豈不是多此一舉?

if (PyMUTEX_LOCK(&mutex->cs))
        return WAIT_FAILED;
    //...
    PyMUTEX_UNLOCK(&mutex->cs);

直接檢測(cè) locked 字段是否等于 1 不就行了嗎?其實(shí)原因有三個(gè):

  • 保護(hù)共享狀態(tài):操作系統(tǒng)的互斥鎖 mutex-> cs 用于保護(hù)共享狀態(tài) mutex -> locked 的讀寫,在多線程環(huán)境中,任何對(duì)共享狀態(tài)的訪問都要同步,以防止競(jìng)態(tài)條件;
  • 條件變量的同步:在使用條件變量 mutex -> cv 時(shí),通常需要結(jié)合互斥鎖使用,條件變量的等待和通知需要在互斥鎖的保護(hù)下進(jìn)行,以保證操作的原子性;
  • 避免忙等待:如果只使用 mutex -> locked 進(jìn)行檢查,可能會(huì)陷入忙等待,即不斷地檢查鎖狀態(tài)而占用 CPU 資源。使用互斥鎖和條件變量可以讓線程在等待時(shí)被掛起,從而更有效地利用 CPU;

所以解釋器為 OS 互斥鎖引入了一個(gè)自定義的鎖狀態(tài) locked,OS 互斥鎖提供了對(duì) locked 的基本保護(hù),因?yàn)槎鄠€(gè)線程都要修改它。而自定義的鎖狀態(tài) locked 則用于實(shí)現(xiàn)同步邏輯,如果 locked 為 1,我們就認(rèn)為鎖被獲取了,locked 為 0,鎖就沒有被獲取。

協(xié)程鎖和線程鎖都是如此,所謂的獲取鎖、釋放鎖都是在修改 locked 字段的值。只不過在等待的時(shí)候,協(xié)程鎖使用的是 Future 對(duì)象,而線程鎖使用的是操作系統(tǒng)提供的互斥鎖和條件變量。

所以上面代碼中的 PyMUTEX_LOCK 通過之后,還要檢測(cè) locked 字段是否等于 1,代碼片段如下。

while (mutex->locked) {
            if (PyCOND_WAIT(&mutex->cv, &mutex->cs)) {
                result = WAIT_FAILED;
                break;
            }
        //...

如果 locked 是 1,說明互斥鎖已經(jīng)被獲取了,當(dāng)前線程要進(jìn)行等待,直到 locked 字段的值為 0。當(dāng)其它線程釋放鎖時(shí),會(huì)將 locked 字段修改為 0,并通過條件變量喚醒當(dāng)前線程。

該線程醒來后檢測(cè)到 locked 為 0,就知道互斥鎖已被釋放,自己可以獲取了,于是再將 locked 字段修改為 1。

說完了線程鎖的獲取,再來看看線程鎖的釋放,所謂釋放,其實(shí)就是將 locked 字段修改為 0 而已。

圖片圖片

釋放互斥鎖的邏輯最終會(huì)調(diào)用如下函數(shù):

圖片圖片

修改 locked 是不安全的,需要加鎖保護(hù)。所以 OS 互斥鎖就是為了保護(hù) locked 變量的修改,再配合條件變量實(shí)現(xiàn)阻塞等待以及自動(dòng)喚醒,但從代碼邏輯上講,將 locked 字段設(shè)置為 0,才算是真正釋放了鎖。

這部分邏輯稍微有點(diǎn)繞,總之記住一個(gè)重點(diǎn):所謂的鎖,它的核心就是結(jié)構(gòu)體的一個(gè)字段,這里是 locked。如果字段的值為 1,表示鎖被獲取了,字段的值為 0,表示鎖沒有被獲取。

  • 而獲取鎖,本質(zhì)上就是將 locked 字段修改為 1;
  • 而釋放鎖,本質(zhì)上就是將 locked 字段修改為 0;

當(dāng)鎖沒有被獲取時(shí),那么線程在獲取鎖和釋放鎖時(shí)的邏輯可以簡(jiǎn)化為如下:

圖片圖片

但實(shí)際情況會(huì)有多個(gè)線程一起競(jìng)爭(zhēng)鎖,因此為了保護(hù)這個(gè)共享字段,以及實(shí)現(xiàn)阻塞等待和自動(dòng)喚醒,解釋器使用了操作系統(tǒng)的互斥鎖和條件變量。

小結(jié)

以上我們就剖析了協(xié)程鎖、信號(hào)量以及線程鎖的實(shí)現(xiàn)原理,至于線程里面的信號(hào)量,它的原理和協(xié)程的信號(hào)量是一樣的,只是實(shí)現(xiàn)方式不一樣。

圖片圖片

線程的信號(hào)量包含了一個(gè)初始值 value,但它在實(shí)現(xiàn)阻塞等待以及喚醒的時(shí)候用的是條件變量,而條件變量的實(shí)現(xiàn)依賴于鎖。簡(jiǎn)單來說,獲取信號(hào)量的時(shí)候,self._value 會(huì)減 1,釋放信號(hào)量的時(shí)候,self._value 會(huì)加 1。

當(dāng) self._value 為 0 時(shí),獲取信號(hào)量會(huì)陷入阻塞,而當(dāng)某個(gè)線程退出臨界區(qū)釋放信號(hào)量的時(shí)候,會(huì)通過條件變量的 notify 機(jī)制喚醒阻塞的線程。

關(guān)于條件變量,我們以后再分析,有點(diǎn)餓了。

另外進(jìn)程也有鎖和信號(hào)量,這里也先不討論了,有點(diǎn)困了。

責(zé)任編輯:武曉燕 來源: 古明地覺的編程教室
相關(guān)推薦

2024-10-29 15:23:45

Python線程安全

2023-12-05 13:46:09

解密協(xié)程線程隊(duì)列

2016-11-23 16:08:24

Python處理器分布式系統(tǒng)

2025-04-16 08:50:00

信號(hào)量隔離線程池隔離并發(fā)控制

2024-07-25 11:53:53

2009-12-08 12:14:43

2020-11-10 15:25:26

SemaphoreLinux翻譯

2010-07-15 15:32:10

Perl線程

2010-04-21 16:25:13

Unix信號(hào)量

2010-04-21 16:42:48

Unix信號(hào)量

2017-05-11 14:05:25

Consul分布式信號(hào)量

2010-03-16 17:52:27

Java多線程信號(hào)量

2020-09-22 07:35:06

Linux線程進(jìn)程

2021-04-13 09:20:15

鴻蒙HarmonyOS應(yīng)用開發(fā)

2010-04-21 16:50:31

Unix信號(hào)量

2020-11-05 09:59:24

Linux內(nèi)核信號(hào)量

2023-12-08 07:40:07

并發(fā)控制

2025-04-23 11:00:00

Hystrix隔離模式信號(hào)量

2021-09-07 07:53:42

Semaphore 信號(hào)量源碼

2017-04-13 10:51:09

Consul分布式
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)