自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Python線程安全之鎖、信號量

開發(fā) 前端
Python的線程是一個(gè)并發(fā)框架,線程并行運(yùn)行的時(shí)候,每個(gè)線程執(zhí)行代碼的一部分,Python解釋器在它們之間切換,將執(zhí)行控制權(quán)交給每個(gè)線程。

在Realpython看到一篇關(guān)于線程安全的文章,覺得非常哇塞,分享給大家,今天先講前半部分。

提到線程必須了解兩個(gè)術(shù)語:

? 并發(fā)Concurrency,系統(tǒng)具備處理多個(gè)任務(wù)的能力,它們的在執(zhí)行在時(shí)間上重疊,但不一定同時(shí)發(fā)生。

? 并行Parallelism:多個(gè)任務(wù)利用多核CPU真正同時(shí)執(zhí)行。

Python的線程是一個(gè)并發(fā)框架,線程并行運(yùn)行的時(shí)候,每個(gè)線程執(zhí)行代碼的一部分,Python解釋器在它們之間切換,將執(zhí)行控制權(quán)交給每個(gè)線程。

理解線程并行

先簡單舉個(gè)例子:

import threading
import time
from concurrent.futures import ThreadPoolExecutor

def threaded_function():
    for number in range(3):
        print(f"Printing from {threading.current_thread().name}. {number=}")
        time.sleep(0.1)

with ThreadPoolExecutor(max_workers=4, thread_name_prefix="Worker") as executor:
    for _ in range(4):
        executor.submit(threaded_function)

先打印輸出:

Printing from Worker_0. number=0
Printing from Worker_1. number=0
Printing from Worker_2. number=0
Printing from Worker_3. number=0
Printing from Worker_0. number=1
Printing from Worker_2. number=1
Printing from Worker_1. number=1
Printing from Worker_3. number=1
Printing from Worker_0. number=2
Printing from Worker_2. number=2
Printing from Worker_1. number=2
Printing from Worker_3. number=2

啟動(dòng)了四個(gè)線程,可以觀察到在Worker_0打印number=0之后 ,并不會立刻打印number=1,原因就在于要切換給其他線程運(yùn)行,而且各個(gè)線程的運(yùn)行順序是不一樣的。

如何做到這個(gè)的呢?因?yàn)閜ython解析器會進(jìn)行上下文切換,默認(rèn)的間隔時(shí)間如下列代碼:

import sys
sys.getswitchinterval()

0.005

5毫秒的間隔并不意味著線程會精確地每5毫秒切換一次,而是意味著解釋器會在這些間隔內(nèi)考慮切換到另一個(gè)線程,而代碼中的sleep()是為了增加了在此期間發(fā)生上下文切換的可能性。

什么是線程安全

由于上下文切換,程序在多線程環(huán)境中運(yùn)行時(shí)可能會表現(xiàn)出意外行為,這就導(dǎo)致線程不安全問題,而如果代碼在多線程環(huán)境中運(yùn)行時(shí)表現(xiàn)出確定性并產(chǎn)生期望的輸出,那么它就被認(rèn)為是線程安全的。

線程安全問題通常源于兩個(gè)原因:

? 共享可變數(shù)據(jù):線程共享父進(jìn)程的內(nèi)存,因此所有變量和數(shù)據(jù)結(jié)構(gòu)在各線程之間是共享的。對這些共享數(shù)據(jù)進(jìn)行修改時(shí)可能會引發(fā)錯(cuò)誤。

? 非原子操作:多線程環(huán)境中,涉及多個(gè)步驟的操作可能會被上下文切換中斷,尤其是在執(zhí)行過程中切換到其他線程時(shí),容易導(dǎo)致意外結(jié)果。

1:GIL

在討論P(yáng)ython線程時(shí),Python 3.12之前的GIL(Global Interpreter Lock)不可避免要提到。GIL是一個(gè)互斥鎖,目的是保護(hù)Python對象的訪問,防止多個(gè)線程同時(shí)執(zhí)行Python字節(jié)碼。它阻止了真正的線程并行,尤其在CPU密集型任務(wù)中,多線程性能會受到嚴(yán)重限制。不過,這意味著對于I/O密集型任務(wù),線程并行仍然適用。

由于GIL的存在,當(dāng)某個(gè)操作能在單個(gè)字節(jié)碼指令中完成時(shí),它是原子的。那么,Python是否因此天然線程安全?并非如此。因?yàn)镮/O操作仍然可以并行執(zhí)行,因此即使有GIL,訪問共享可變數(shù)據(jù)時(shí)依然需要鎖等同步機(jī)制確保線程安全。

GIL是否完全消除了Python的多線程并發(fā)能力?并沒有,Python支持通過多進(jìn)程來實(shí)現(xiàn)真正的并行。

值得關(guān)注的是,從Python 3.13開始,Python提供了無GIL的解釋器,實(shí)現(xiàn)了真正的線程并行。但無論是否有GIL,編寫代碼時(shí)始終建議合理地保護(hù)線程安全——也就是說,不依賴GIL,主動(dòng)保證線程安全。

2:競爭

現(xiàn)在來看看第二個(gè)核心概念,競爭條件發(fā)生在程序的結(jié)果依賴于不可控事件的順序或時(shí)間,如線程執(zhí)行順序時(shí),如果沒有適當(dāng)?shù)耐綍?dǎo)致程序出現(xiàn)不可預(yù)測的錯(cuò)誤。

下面的例子就來模擬這種情況,兩個(gè)線程同時(shí)修改一個(gè)屬性:

import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance

    def withdraw(self, amount):
        if self.balance >= amount:
            new_balance = self.balance - amount
            time.sleep(0.1)  
            self.balance = new_balance
        else:
            raise ValueError("Insufficient balance")

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(account.withdraw, 500)
    executor.submit(account.withdraw, 700)

print(f"Final account balance: {account.balance}")

先猜可能的結(jié)果,代碼可能會輸出:

Final account balance: 300

也可能會輸出:

Final account balance: 500

為什么會出現(xiàn)這樣的情況呢?這就是由于線程執(zhí)行順序不一致導(dǎo)致的,如果先扣700,而第二個(gè)線程突然切換過來了,檢查余額夠,最終就扣了500,余額就變成500了,當(dāng)然結(jié)果是錯(cuò)誤的。

同步原語

為了解決線程不安全問題,Python的threading模塊提供了各種同步原語,以防止競爭條件并允許線程之間的協(xié)調(diào)。

同步原語會:

? 控制線程同時(shí)執(zhí)行代碼塊

? 使多個(gè)代碼語句對線程來說是原子的

? 限制線程的并發(fā)訪問

? 在線程之間進(jìn)行協(xié)調(diào),并根據(jù)其他線程的狀態(tài)執(zhí)行操作

接下去使用Python線程鎖和信號實(shí)現(xiàn)互斥。

使用Python線程鎖實(shí)現(xiàn)互斥

鎖是一種同步原語,可用于獨(dú)占訪問資源,一旦一個(gè)線程獲取了鎖,其他線程就不能再獲取它并繼續(xù)執(zhí)行,直到鎖被釋放,可以使用鎖來封裝應(yīng)該原子執(zhí)行的語句或語句組。

python提供兩個(gè)lock相關(guān)的函數(shù):

? 當(dāng)一個(gè)線程調(diào)用.acquire()方法時(shí),如果Lock對象已經(jīng)被另一個(gè)線程鎖定,那么調(diào)用的線程會被阻塞,直到持有鎖的線程釋放鎖。

? release() 會釋放一個(gè)被線程獲取的鎖,如果嘗試釋放一個(gè)未鎖定的鎖,會引發(fā)RuntimeError。

如果使用with語句,Lock 對象可用作上下文管理器,可以自動(dòng)獲取和釋放鎖。

為了解決上面代碼存在的問題,可以:

import threading
import time
from concurrent.futures import ThreadPoolExecutor

class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.account_lock = threading.Lock()

    def withdraw(self, amount):
        with self.account_lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                print(f"Withdrawing {amount}...")
                time.sleep(0.1)  # Simulate a delay
                self.balance = new_balance
            else:
                raise ValueError("Insufficient balance")

    def deposit(self, amount):
        with self.account_lock:
            new_balance = self.balance + amount
            print(f"Depositing {amount}...")
            time.sleep(0.1)  # Simulate a delay
            self.balance = new_balance

account = BankAccount(1000)

with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(account.withdraw, 700)
    executor.submit(account.deposit, 1000)
    executor.submit(account.withdraw, 300)

print(f"Final account balance: {account.balance}")

上述代碼通過鎖成功保證了線性安全。

如果由于代碼中的錯(cuò)誤或疏忽導(dǎo)致鎖未正確釋放,可能會導(dǎo)致死鎖,即線程無限期地等待鎖被釋放。

死鎖的原因包括:

? 嵌套鎖獲?。喝绻粋€(gè)線程嘗試獲取它已經(jīng)持有的鎖,可能會發(fā)生死鎖,同一線程嘗試多次獲取相同的鎖會導(dǎo)致線程阻塞自身,這種情況在沒有外部干預(yù)的情況下無法解決。

? 多重鎖獲?。寒?dāng)使用多個(gè)鎖時(shí),如果線程以不一致的順序獲取這些鎖,可能會發(fā)生死鎖,如果兩個(gè)線程各自持有一個(gè)鎖并等待對方釋放鎖,那么兩個(gè)線程都無法繼續(xù),從而導(dǎo)致死鎖。

對于多重鎖可以使用可重入鎖RLock解決,當(dāng)持有線程再次請求鎖時(shí),它不會阻塞,允許線程在釋放鎖之前多次獲取鎖,這在遞歸函數(shù)或線程需要重新進(jìn)入已鎖定資源的情況下非常有用,相對來說,RLock因?yàn)橐櫷痪€程獲取鎖的次數(shù),會有性能開銷。

Semaphores信號量

在資源數(shù)量有限且多個(gè)線程嘗試訪問這些有限資源時(shí)非常有用,它使用一個(gè)計(jì)數(shù)器來限制多個(gè)線程對臨界區(qū)的訪問,每次調(diào)用.acquire() 都會將信號量的計(jì)數(shù)器減少一個(gè),當(dāng)計(jì)數(shù)器達(dá)到零時(shí),再.acquire() 調(diào)用將被阻塞。

舉一個(gè)例子,多個(gè)客戶在銀行等待有限數(shù)量的柜員服務(wù):

import random
import threading
import time
from concurrent.futures import ThreadPoolExecutor

# Semaphore with a maximum of 2 resources (tellers)
teller_semaphore = threading.Semaphore(2)

def now():
    return time.strftime("%H:%M:%S")

def serve_customer(name):
    print(f"{now()}: {name} is waiting for a teller.")
    with teller_semaphore:
        print(f"{now()}: {name} is being served by a teller.")
        time.sleep(random.randint(1, 3))
        print(f"{now()}: {name} is done being served.")

customers = [
    "Customer 1",
    "Customer 2",
    "Customer 3",
    "Customer 4",
    "Customer 5",
]

with ThreadPoolExecutor(max_workers=3) as executor:
    for customer_name in customers:
        thread = executor.submit(serve_customer, customer_name)

print(f"{now()}: All customers have been served.")

代碼很好理解,當(dāng)某個(gè)線程達(dá)到計(jì)數(shù)器上限后,它會被阻塞,直到其他線程在with語句中因?yàn)橥瓿煞?wù)而釋放,但不管怎么樣,每次只有三個(gè)客戶被服務(wù)。

參考:https://realpython.com/python-thread-lock/#using-python-threading-locks-for-mutual-exclusion

責(zé)任編輯:武曉燕 來源: 虞大膽的嘰嘰喳喳
相關(guān)推薦

2016-11-23 16:08:24

Python處理器分布式系統(tǒng)

2023-11-23 08:31:51

競爭鎖共享字段

2010-07-15 15:32:10

Perl線程

2009-12-08 12:14:43

2010-03-16 17:52:27

Java多線程信號量

2020-11-10 15:25:26

SemaphoreLinux翻譯

2025-04-16 08:50:00

信號量隔離線程池隔離并發(fā)控制

2021-04-13 09:20:15

鴻蒙HarmonyOS應(yīng)用開發(fā)

2010-04-21 16:50:31

Unix信號量

2020-11-05 09:59:24

Linux內(nèi)核信號量

2025-04-23 11:00:00

Hystrix隔離模式信號量

2010-04-21 16:25:13

Unix信號量

2010-04-21 16:42:48

Unix信號量

2021-09-07 07:53:42

Semaphore 信號量源碼

2010-04-21 15:37:38

Unix信號量

2020-09-25 07:34:40

Linux系統(tǒng)編程信號量

2024-07-25 11:53:53

2023-12-08 07:40:07

并發(fā)控制

2019-11-19 09:00:38

JavaAND信號量

2010-03-17 16:36:10

Java信號量模型
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號