高并發(fā)編程/消息傳遞機(jī)制避免鎖提高并發(fā)效率,不懂的趕緊進(jìn)來(lái)(設(shè)計(jì)篇)
在現(xiàn)代軟件開發(fā)中,隨著多核處理器的普及和分布式系統(tǒng)的擴(kuò)展,傳統(tǒng)的基于共享內(nèi)存的并發(fā)模型正面臨越來(lái)越多的挑戰(zhàn)。消息傳遞機(jī)制作為一種替代方案,以其獨(dú)特的異步通信和無(wú)共享狀態(tài)的特性,為構(gòu)建高效、可擴(kuò)展和健壯的系統(tǒng)提供了新的思路。它通過(guò)將數(shù)據(jù)操作封裝在消息中,允許系統(tǒng)組件以松耦合的方式進(jìn)行交互,從而減少了鎖的需求和競(jìng)態(tài)條件的風(fēng)險(xiǎn)。本文將深入探討消息傳遞機(jī)制的原理、優(yōu)勢(shì)以及如何在實(shí)際應(yīng)用中實(shí)現(xiàn)這一模式,幫助讀者理解其在解決并發(fā)問(wèn)題中的重要作用。
1、并發(fā)問(wèn)題
1.1 問(wèn)題描述
在并發(fā)環(huán)境中,兩個(gè)線程同時(shí)對(duì)計(jì)數(shù)器進(jìn)行操作,線程1減少2,線程2減少9。由于缺乏同步,兩個(gè)線程都認(rèn)為計(jì)數(shù)器值大于需要減少的值,最終導(dǎo)致計(jì)數(shù)器變?yōu)?1,這違反了業(yè)務(wù)規(guī)則,因?yàn)閹?kù)存不能為負(fù)數(shù),表示過(guò)度分配。
1.2 解決方案
- 使用原子操作鎖定檢查和遞減步驟,確保操作的原子性。
因?yàn)閭鹘y(tǒng)并發(fā)模式中,共享內(nèi)存是傾向于強(qiáng)一致性弱隔離性的,例如悲觀鎖同步的方式就是使用強(qiáng)一致性的方式控制并發(fā),
- 采用消息傳遞機(jī)制代替共享內(nèi)存,減少鎖的使用。
使用共享數(shù)據(jù)的并發(fā)編程面臨的最大問(wèn)題是數(shù)據(jù)條件競(jìng)爭(zhēng) data race,消息傳遞機(jī)制最大的優(yōu)勢(shì)在于不會(huì)產(chǎn)生數(shù)據(jù)競(jìng)爭(zhēng)狀態(tài)。而實(shí)現(xiàn)消息傳遞有兩種常見類型:基于 channel的消息傳遞、基于 Actor的消息傳遞。
1.3 為什么消息傳遞機(jī)制能減少鎖
消息傳遞機(jī)制能夠減少或消除對(duì)鎖的需求,主要是因?yàn)樗淖兞瞬l(fā)編程的范式,從直接操作共享狀態(tài)轉(zhuǎn)變?yōu)橥ㄟ^(guò)消息傳遞來(lái)協(xié)調(diào)操作。以下是消息傳遞機(jī)制如何實(shí)現(xiàn)這一點(diǎn)的幾個(gè)關(guān)鍵點(diǎn):
- 分解任務(wù):
在消息傳遞模型中,復(fù)雜的任務(wù)被分解成一系列更小的、可以獨(dú)立處理的任務(wù)單元(消息)。這些任務(wù)單元被發(fā)送到消息隊(duì)列中,而不是直接操作共享狀態(tài)。
- 無(wú)共享狀態(tài):
每個(gè)線程或進(jìn)程處理自己的任務(wù)單元,而不直接訪問(wèn)或修改共享狀態(tài)。這樣,就避免了多個(gè)線程同時(shí)修改同一共享變量的情況,從而減少了鎖的需求。
消費(fèi)者處理:
消費(fèi)者線程從消息隊(duì)列中取出任務(wù)單元進(jìn)行處理。由于每個(gè)任務(wù)單元是獨(dú)立的,消費(fèi)者之間不需要同步,因?yàn)樗鼈儾粫?huì)同時(shí)處理同一個(gè)任務(wù)單元。
線程安全:
消息隊(duì)列本身是線程安全的,它保證了消息的順序性和原子性,確保了消息的正確傳遞和處理。
并發(fā)性:
由于任務(wù)單元是獨(dú)立的,多個(gè)消費(fèi)者可以并發(fā)地從消息隊(duì)列中取出任務(wù)單元進(jìn)行處理,提高了系統(tǒng)的并發(fā)性和吞吐量。
解耦合:
消息傳遞機(jī)制使得生產(chǎn)者和消費(fèi)者之間的耦合度降低,它們不需要知道對(duì)方的具體實(shí)現(xiàn),只需要知道如何發(fā)送和接收消息。
容錯(cuò)性:
如果某個(gè)消費(fèi)者處理任務(wù)單元失敗,這不會(huì)影響其他消費(fèi)者處理其他任務(wù)單元。這種機(jī)制提高了系統(tǒng)的容錯(cuò)性。
1.4 消息傳遞機(jī)制的類型
基于Channel的消息傳遞:在Go語(yǔ)言中廣泛使用,通過(guò)channel實(shí)現(xiàn)goroutine之間的通信。
基于Actor的消息傳遞:在Akka框架中實(shí)現(xiàn),每個(gè)Actor是一個(gè)并發(fā)執(zhí)行的實(shí)體,通過(guò)消息傳遞進(jìn)行通信。
1.5 消息傳遞機(jī)制避免鎖模型圖
圖片
說(shuō)明:
- 生產(chǎn)者(Producer) :在業(yè)務(wù)邏輯中,當(dāng)需要減少庫(kù)存時(shí),生產(chǎn)者將減少庫(kù)存的請(qǐng)求封裝成一條消息,并發(fā)送到消息隊(duì)列中,而不是直接操作共享庫(kù)存狀態(tài)。
- 消息隊(duì)列(Message Queue) :消息隊(duì)列是生產(chǎn)者和消費(fèi)者之間的中介,它負(fù)責(zé)存儲(chǔ)和傳遞消息。在這個(gè)例子中,消息隊(duì)列確保了消息的順序性和獨(dú)立性,使得每個(gè)減少庫(kù)存的請(qǐng)求都是獨(dú)立的。
- 消費(fèi)者(Consumer) :消費(fèi)者從消息隊(duì)列中取出消息,并根據(jù)消息內(nèi)容執(zhí)行相應(yīng)的操作(在這個(gè)例子中是減少庫(kù)存)。由于每個(gè)消息都是獨(dú)立的,消費(fèi)者不需要與生產(chǎn)者或其他消費(fèi)者同步,因此避免了鎖的使用。
優(yōu)勢(shì):
- 無(wú)共享狀態(tài):庫(kù)存狀態(tài)不再被多個(gè)線程共享,每個(gè)減少庫(kù)存的操作都是通過(guò)消息傳遞來(lái)協(xié)調(diào)的。
- 線程安全:由于消費(fèi)者處理的是消息隊(duì)列中的消息,而不是直接操作共享狀態(tài),因此不需要使用鎖來(lái)保證線程安全。
- 并發(fā)性:多個(gè)生產(chǎn)者可以并發(fā)地發(fā)送消息,多個(gè)消費(fèi)者也可以并發(fā)地從消息隊(duì)列中取出和處理消息,提高了系統(tǒng)的并發(fā)處理能力。
1.6 消息傳遞機(jī)制避免鎖設(shè)計(jì)案例
業(yè)務(wù):庫(kù)存管理
假設(shè)我們有一個(gè)在線商店,需要管理商品的庫(kù)存。在高并發(fā)環(huán)境下,多個(gè)客戶可能同時(shí)嘗試購(gòu)買同一件商品,這就要求我們確保庫(kù)存的減少是線程安全的,以避免庫(kù)存變?yōu)樨?fù)數(shù)。
傳統(tǒng)解決方案(使用鎖)
在傳統(tǒng)的解決方案中,我們可能會(huì)使用一個(gè)共享的庫(kù)存計(jì)數(shù)器,并在減少庫(kù)存的方法上加上同步鎖:
public class Inventory {
private int stock = 100;
public synchronized void reduceStock(int amount) {
if (stock >= amount) {
stock -= amount;
} else {
throw new IllegalArgumentException("庫(kù)存不足");
}
}
public synchronized int getStock() {
return stock;
}
}
在這個(gè)例子中, reduceStock 和 getStock 方法都被聲明為 synchronized,確保了在同一時(shí)間只有一個(gè)線程可以修改或讀取庫(kù)存。
使用消息傳遞機(jī)制的解決方案
現(xiàn)在,讓我們使用消息傳遞機(jī)制來(lái)重構(gòu)這個(gè)庫(kù)存管理的業(yè)務(wù)邏輯,避免使用鎖:
import java.util.concurrent.ConcurrentLinkedQueue;
public class InventoryManager {
private final ConcurrentLinkedQueue<InventoryCommand> commandQueue = new ConcurrentLinkedQueue<>();
public void processCommands() {
while (!Thread.currentThread().isInterrupted()) {
InventoryCommand command = commandQueue.poll();
if (command != null) {
command.execute();
}
}
}
public void reduceStock(int amount) {
commandQueue.offer(new InventoryCommand(amount));
}
private static class InventoryCommand {
private final int amount;
private int stock = 100; // 每個(gè)命令有自己的庫(kù)存副本
public InventoryCommand(int amount) {
this.amount = amount;
}
public void execute() {
if (stock >= amount) {
stock -= amount;
System.out.println("庫(kù)存減少 " + amount + ",當(dāng)前庫(kù)存 " + stock);
} else {
System.out.println("庫(kù)存不足,無(wú)法減少 " + amount);
}
}
}
}
public class Main {
public static void main(String[] args) {
InventoryManager manager = new InventoryManager();
Thread commandProcessor = new Thread(manager::processCommands);
commandProcessor.start();
// 模擬多個(gè)線程減少庫(kù)存
for (int i = 0; i < 5; i++) {
int finalI = i;
new Thread(() -> manager.reduceStock(20)).start();
}
// 等待命令處理
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
commandProcessor.interrupt();
}
}
解釋
在這個(gè)改進(jìn)的例子中:
- InventoryCommand 是一個(gè)包含庫(kù)存減少邏輯的類,每個(gè)命令都有自己的庫(kù)存副本。這意味著每個(gè)命令處理自己的庫(kù)存狀態(tài),而不是共享一個(gè)全局的庫(kù)存狀態(tài)。
- reduceStock 方法將減少庫(kù)存的操作封裝為一個(gè) InventoryCommand 對(duì)象,并將其添加到命令隊(duì)列中。
- processCommands 方法從隊(duì)列中取出命令并執(zhí)行,由于每個(gè)命令處理自己的庫(kù)存副本,因此不需要使用鎖。
- 這里 privateintstock=100;定義在 InventoryCommand類中,使得每個(gè) InventoryCommand對(duì)象都有自己的庫(kù)存副本,這樣做的主要目的是為了避免鎖的使用,并實(shí)現(xiàn)以下幾個(gè)關(guān)鍵點(diǎn):
- 在消息傳遞模型中,每個(gè)消息(命令)的處理是獨(dú)立的,一個(gè)命令的失敗不會(huì)影響到其他命令的執(zhí)行,從而提高了系統(tǒng)的容錯(cuò)性。
- 避免使用鎖可以減少線程間的協(xié)調(diào)開銷,提高系統(tǒng)的吞吐量和響應(yīng)性。在多核處理器上,無(wú)鎖的設(shè)計(jì)可以更好地利用硬件資源,提高并行處理能力。
- 在傳統(tǒng)的并發(fā)編程中,通常需要使用鎖(如 synchronized塊或 ReentrantLock)來(lái)保護(hù)對(duì)共享資源的訪問(wèn)。通過(guò)為每個(gè)任務(wù)提供獨(dú)立的數(shù)據(jù)副本,可以避免這些復(fù)雜的并發(fā)控制機(jī)制,簡(jiǎn)化編程模型。
- 由于每個(gè)命令操作的是自己的庫(kù)存副本,不存在多個(gè)線程同時(shí)修改同一共享變量的情況,從而避免了并發(fā)修改導(dǎo)致的數(shù)據(jù)不一致問(wèn)題,也就不需要使用鎖來(lái)保證線程安全。
- 每個(gè) InventoryCommand對(duì)象管理自己的庫(kù)存狀態(tài),不依賴于全局共享的庫(kù)存狀態(tài)。這意味著不同的消息(命令)之間不會(huì)直接競(jìng)爭(zhēng)或沖突,因?yàn)樗鼈兏髯圆僮髯约旱臄?shù)據(jù)副本。
- 無(wú)共享狀態(tài):
- 線程安全:
- 簡(jiǎn)化并發(fā)控制:
- 提高性能和可擴(kuò)展性:
- 容錯(cuò)性:
替代方案:使用不可變對(duì)象
另一種避免鎖的方法是使用不可變對(duì)象。不可變對(duì)象一旦創(chuàng)建,其狀態(tài)就不能被改變,因此天生是線程安全的,不需要使用鎖。例如,我們可以定義一個(gè)不可變的庫(kù)存命令對(duì)象:
public final class InventoryCommand {
private final int amount;
private final int newStock;
public InventoryCommand(int amount, int currentStock) {
this.amount = amount;
this.newStock = currentStock - amount;
}
public int getNewStock() {
return newStock;
}
public int getAmount() {
return amount;
}
}
在這個(gè)版本中, InventoryCommand對(duì)象在創(chuàng)建時(shí)就計(jì)算了新的庫(kù)存值,并且這個(gè)值是不可變的。處理命令時(shí),我們只需讀取命令的屬性,而不需要修改它:
public void processCommands() {
while (!Thread.currentThread().isInterrupted()) {
InventoryCommand command = commandQueue.poll();
if (command != null) {
int newStock = command.getNewStock();
System.out.println("庫(kù)存減少 " + command.getAmount() + ",當(dāng)前庫(kù)存 " + newStock);
}
}
}
這種方法進(jìn)一步簡(jiǎn)化了設(shè)計(jì),因?yàn)槊顚?duì)象本身不包含任何可變狀態(tài),從而完全避免了鎖的需求。
1.7. 結(jié)論
消息傳遞機(jī)制通過(guò)改變并發(fā)編程的范式,從直接操作共享狀態(tài)轉(zhuǎn)變?yōu)橥ㄟ^(guò)消息傳遞來(lái)協(xié)調(diào)操作,從而減少了鎖的使用,提高了系統(tǒng)的并發(fā)性和容錯(cuò)性。這種機(jī)制特別適用于需要高吞吐量和高可靠性的分布式系統(tǒng)。