聊聊項目實戰(zhàn)中的異步設計
場景切入
??先來看一個日常生活快遞寄件場景,從寄件人(寄件)到收件人(收件),全流程如下:
圖片
當你準備寄送一個包裹時,通常你可以有兩種寄件方式:
??方案一、你親自前往快遞服務點,填寫寄件單、交付包裹、等待工作人員處理,最后得到一張寄送單據(jù)。你必須在服務點等待直到所有步驟都完成。這個過程是同步的。
??方案二、你可以選擇在線預約快遞上門取件服務,填寫相關(guān)信息后,你的請求就被提交給系統(tǒng)。此時,你可以繼續(xù)進行其他事情,而不需要等待快遞員到達。系統(tǒng)會在后臺異步處理你的請求,安排合適的快遞員前來取件。這樣,你就可以在等待的過程中做其他事情,無需阻塞在快遞服務點。
??這種寄件方式提高了效率,讓用戶可以更加靈活地安排自己的時間。在后臺系統(tǒng)中,快遞公司可以通過合理的任務調(diào)度,處理多個異步請求,提高寄件服務的整體吞吐量。這種方式類似于在后端異步處理任務,而用戶無需等待任務完成,可以繼續(xù)進行其他操作,提高了整個寄件過程的并發(fā)性和響應性。這個過程就是異步。
同步和異步
我們通過這個例子抽象出同步模型和異步模型:
圖片
小結(jié)
同步模型:一個任務做完做下一個任務,阻塞
異步模型:做當前任務,只需要開啟而不需要關(guān)心另一個任務如何執(zhí)行,非阻塞
設計理念
??有了上邊的模型,對于同步和異步的概念就有了初步的認識。事實上,在架構(gòu)設計中,異步思想是指通過異步處理來提高系統(tǒng)的性能、可伸縮性和響應速度。
以下是SpringColud微服務架構(gòu)的基本套件:
圖片
在架構(gòu)設計中,異步思想可以應用在多個方面。常見的異步實踐包括:
- 消息隊列:通過消息隊列實現(xiàn)異步通信,將消息發(fā)送到隊列中,然后由消費者異步地處理這些消息。這種方式可以實現(xiàn)解耦和削峰填谷的效果。
- 事件驅(qū)動:系統(tǒng)中的各個組件通過事件進行通信,當事件發(fā)生時,系統(tǒng)中的其他組件可以異步地響應這些事件,從而實現(xiàn)松耦合和高內(nèi)聚。
- 非阻塞I/O:在網(wǎng)絡編程中,采用非阻塞I/O可以使系統(tǒng)在等待I/O操作完成的同時繼續(xù)處理其他任務,提高系統(tǒng)的并發(fā)能力和吞吐量。
......
場景應用
??接下來,我們針對實際項目中的異步設計逐個探究??赡茏霾坏矫婷婢愕?,但是可以為真實的場景中的方案設計打開思路。
場景一、基于異步非阻塞模型的業(yè)務網(wǎng)關(guān)Spring Cloud Gateway
??Spring Cloud Gateway基于Project Reactor反應式編程和WebFlux框架,通過路由、過濾器、事件等機制實現(xiàn)了靈活的網(wǎng)關(guān)服務。它適用于構(gòu)建微服務架構(gòu)中的業(yè)務網(wǎng)關(guān),具有高性能、可擴展性和豐富的功能。
官網(wǎng)地址:https://spring.io/projects/spring-cloud-gateway/
圖片
性能比較
??對 Zuul/Spring Cloud Gateway 的一些性能分析可以參考 Spring Cloud Gateway 作者 Spencer Gibb 提供的項目:https://github.com/spencergibb/spring-cloud-gateway-bench。
圖片
摘自SpringCloud GateWay作者spencergibb 提供的一個壓測報告
??總的來說,Gateway在處理IO密集型請求場景下有著更大的優(yōu)勢。原因是: 隨著Spring 5 推出的WebFlux,它是完全異步且非阻塞的,底層也是基于Netty實現(xiàn)的。我們分別對Reactor模型和Netty做一個簡單介紹。
- 基于Reactor的反應式編程:
圖片
??其中:mainReactor主要負責連接處理(不參與數(shù)據(jù)處理),而subReactor負責數(shù)據(jù)的讀?。ú粎⑴c連接). 不再是單線程模型那樣,接收請求和處理數(shù)據(jù)都是在一個Reactor下進行。
- WebFlux框架:
核心主要是基于NIO的Netty框架,原理說明如下:
組件關(guān)系:
圖片
概念說明:
- Bootstrap(啟動引導類): 可用于連接遠端服務器,只綁定一個 EventLoopGroup ( Boss) ServerBootStrap 用于服務端啟動綁定本地端口,綁定兩個 EventLoopGroup (Worker)
- channel(通道): 是網(wǎng)絡通信的載體,提供了基本的API用于網(wǎng)絡I/0 操作如register、bind、connect、read、write、flush 等Netty自己實現(xiàn)的 Channel是以JDK NIO Channel為基礎(chǔ)的。
- Handler(處理器): 處理輸入輸出數(shù)據(jù)的邏輯組件。它負責實際處理數(shù)據(jù)的業(yè)務邏輯
- EventLoop(事件循環(huán)): 是 Netty 中處理所有事件的線程,負責處理連接的生命周期中發(fā)生的各種事件。
- Pipeline(管道): 包含了一個 Channel 的處理器鏈。通過 Pipeline 可以將多個處理器按順序組織起來,形成一個處理流程。
每個服務器中都會有一個 Boss(老板),會有一群做事情的WorkerBoss(員工) 會不停地接收新的連接,將連接分配給一個個 Worker 處理連接
執(zhí)行過程:
圖片
Netty 執(zhí)行過程:
- 啟動引導(Bootstrap): 創(chuàng)建并配置一個新的 Netty 應用。設置線程模型、Channel 類型、處理器等。
- 創(chuàng)建 EventLoopGroup: 創(chuàng)建 EventLoopGroup 對象,它包含一個或多個 EventLoop,用于處理連接的生命周期中發(fā)生的各種事件。
- 配置 Channel: 配置 Channel 類型、處理器等,并將 Channel 注冊到 EventLoop 中。
- 創(chuàng)建 ChannelPipeline: 每個 Channel 都有一個與之關(guān)聯(lián)的 ChannelPipeline,用于管理和執(zhí)行所有的 ChannelHandler。
- 添加 ChannelHandler: 將業(yè)務邏輯處理器添加到 ChannelPipeline 中,形成處理鏈。
- 綁定端口: 調(diào)用 bind 方法將 Channel 綁定到指定的端口,開始監(jiān)聽客戶端的連接。
- 接收連接: 當有客戶端連接請求到達時,EventLoop 將會通知 ChannelPipeline 中的第一個 ChannelHandler,從而開始處理連接。
- 數(shù)據(jù)讀寫: 當有數(shù)據(jù)讀寫事件發(fā)生時,ChannelPipeline 中的處理器鏈將被觸發(fā),依次處理數(shù)據(jù)。
- 關(guān)閉連接: 當連接關(guān)閉時,Netty 會釋放相關(guān)的資源,包括關(guān)閉連接、關(guān)閉 EventLoopGroup 等。
關(guān)于SpringCloud GateWay的使用,請自行查閱官網(wǎng)。這里只介紹如何體現(xiàn)NIO異步非阻塞原理的。
場景二、基于消息隊列-數(shù)據(jù)同步
場景分析:
??比如:商城首頁菜單樹。一般這種場景我們允許在一定時間數(shù)據(jù)不一致性。那么就可以使用定時任務+消息隊列。如每隔5分鐘同步一次,達到數(shù)據(jù)最終一致。
圖片
注意事項:
??這種數(shù)據(jù)同步方案主要適用于數(shù)據(jù)實時性要求不高的場景,因為:定時任務處理存在一定時間間隔,會有同步延時。同時在時間窗口期數(shù)據(jù)可能發(fā)生變更。還有就是數(shù)據(jù)最終一致性的保證,主要取決于MQ的可靠性。
場景三、基于消息隊列-數(shù)據(jù)交互
場景分析:
??三方平臺交互,上游系統(tǒng)(A)的數(shù)據(jù)和下游系統(tǒng)(B)的數(shù)據(jù)進行接口規(guī)范轉(zhuǎn)化。此處可能涉及到很多業(yè)務轉(zhuǎn)到同一個平臺或者不同平臺。而我們接口轉(zhuǎn)化的功能是一致的。當然你可以使用Feign直接調(diào)用。但是流量增加、網(wǎng)絡阻塞時可能會出現(xiàn)調(diào)用失敗,導致未能成功送達下游。因此我們可以這樣設計:
注意事項:
??這種異步設計一方面為了系統(tǒng)內(nèi)部服務之間解耦,另一方面起到了削峰填谷的作用。但是引入消息隊列和轉(zhuǎn)化服務,增加了系統(tǒng)的復雜性。因為鏈路較長,出現(xiàn)問題時排查起來比較困難。因此要在數(shù)據(jù)庫中盡可能存留記錄明細,方便審查。另外,也可能出現(xiàn)消息積壓等問題。當然這是消息隊列存在的共性問題。
場景四、基于消息隊列-短信功能
場景分析:
??日常我們會遇到很多這種發(fā)短信的情況。比如,
- 手機訂購流量套餐,發(fā)短信提醒生效日期
- 快遞達到指定地點,短信同步收件信息
- 銀行轉(zhuǎn)賬成功,提醒交易明細
- 上班掃碼刷地鐵,通知扣費情況
- 會員注冊滿一年,會在前一個月發(fā)短信到期提醒。
......
那么對于短信場景,我們?nèi)绾卧O計呢?
圖片
注意事項:這種異步設計一方面為了將發(fā)送短信的功能獨立出來。
場景五、基于消息隊列-日志采集
場景分析:
??在業(yè)務系統(tǒng)中,一般我們會進行日志采集和可視化展示。ELK 是由 Elasticsearch、Logstash 和 Kibana 組成的一套日志管理和分析解決方案。結(jié)合 Kafka 使用時,通常用于搭建一個高效的日志處理系統(tǒng)。
圖片
ELK 工作流程并結(jié)合 Kafka 的工作流程描述:
- 生產(chǎn)者發(fā)送日志到 Kafka:
- 應用程序或系統(tǒng)生成日志,并通過 Kafka 生產(chǎn)者發(fā)送日志消息到 Kafka 集群。
- Kafka 主題(Topic)通常用于組織和分類不同類型的日志數(shù)據(jù)。
- Logstash 消費 Kafka 中的日志:
Logstash 作為 Kafka 消費者,通過 Kafka Input 插件訂閱一個或多個 Kafka 主題。
Logstash 接收到 Kafka 中的日志消息后,可以進行多種操作,如解析日志、添加字段、過濾、轉(zhuǎn)換格式等。
Logstash 處理日志并發(fā)送到 Elasticsearch:
Logstash 通過 Elasticsearch Output 插件將處理后的日志數(shù)據(jù)發(fā)送到 Elasticsearch 集群。
Logstash 可以將日志數(shù)據(jù)根據(jù)配置的索引模式(Index Pattern)劃分到不同的索引中,以便更好地管理和查詢。
Elasticsearch 存儲和索引日志數(shù)據(jù):
Elasticsearch 接收 Logstash 發(fā)送過來的日志數(shù)據(jù),并將其存儲在分布式索引中。
Elasticsearch 提供了強大的全文搜索和分析功能,支持對大量的日志數(shù)據(jù)進行高效的查詢和分析。
Kibana 可視化和查詢:
Kibana 作為 Elasticsearch 的前端界面,提供了豐富的可視化工具和查詢界面。
用戶可以使用 Kibana 創(chuàng)建儀表板、圖表,執(zhí)行復雜的查詢,實時監(jiān)控日志數(shù)據(jù)等。
整個工作流程如下:
+----------------------+ +----------------------+ +----------------------+
| Producer | ----> | Kafka | ----> | Logstash |
| (Log Generator) | | (Message Broker) | | |
+----------------------+ +----------------------+ +----------+-----------+
|
|
v
+----------------------+
| Elasticsearch |
| (Log Storage) |
+----------------------+
|
|
v
+----------------------+
| Kibana |
| (Visualization Tool)|
+----------------------+
注意事項:
- Kafka 作為消息隊列中介,實現(xiàn)了解耦,使生產(chǎn)者與消費者之間的依賴性降低。
- Logstash 提供了靈活的數(shù)據(jù)處理能力,可以根據(jù)具體需求進行配置,包括過濾、解析、字段添加等操作。
- Elasticsearch 提供了高效的全文搜索和分析功能,以及分布式存儲,適用于處理大量的日志數(shù)據(jù)。
- Kibana 提供了直觀的可視化工具,幫助用戶更好地理解和分析日志數(shù)據(jù)。
??整個 ELK + Kafka 的架構(gòu)可以幫助實現(xiàn)高效的日志收集、處理和可視化,適用于大規(guī)模分布式系統(tǒng)中的日志管理。
場景六、基于CompletableFuture 異步多線程批處理任務
??當使用多線程和 CompletableFuture 來執(zhí)行批處理任務時,可以通過將任務分成多個子任務,并使用 CompletableFuture 來異步執(zhí)行這些子任務。主要思想如下:
圖片
假設我們有一個批處理任務,需要對一組數(shù)據(jù)進行處理:
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
public class BatchProcessingExample {
public static void main(String[] args) {
// 模擬一組數(shù)據(jù)
List<Integer> data = generateData(10);
// 定義線程池
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);
// 將數(shù)據(jù)分成多個子任務進行處理
List<CompletableFuture<Void>> futures = new ArrayList<>();
int batchSize = 3;
for (int i = 0; i < data.size(); i += batchSize) {
List<Integer> batch = data.subList(i, Math.min(i + batchSize, data.size()));
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 在這里執(zhí)行批處理的具體邏輯
processBatch(batch);
}, executor);
futures.add(future);
}
// 等待所有子任務完成
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
// 在所有子任務完成后關(guān)閉線程池
allOf.thenRun(executor::shutdown);
try {
// 等待所有任務完成
allOf.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
private static List<Integer> generateData(int size) {
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= size; i++) {
data.add(i);
}
return data;
}
private static void processBatch(List<Integer> batch) {
// 模擬批處理邏輯
for (Integer value : batch) {
System.out.println(Thread.currentThread().getName() + " - Processing: " + value);
}
}
}
??以上我們模擬了一組數(shù)據(jù),然后將數(shù)據(jù)分成多個批次,每個批次使用 CompletableFuture 異步執(zhí)行。CompletableFuture.allOf 用于等待所有子任務完成。
在這個示例中,主要體現(xiàn)了以下異步的思想和操作:
- CompletableFuture 異步執(zhí)行:使用CompletableFuture.runAsync方法,將任務異步提交給CompletableFuture,該任務會在一個線程池中異步執(zhí)行。這允許程序繼續(xù)執(zhí)行而不必等待子任務完成。
CompletableFuture.runAsync(() -> {
// 執(zhí)行異步任務的邏輯
}, executor);
- 等待所有異步任務完成:使用CompletableFuture.allOf方法等待所有的子任務完成。這個方法會返回一個新的CompletableFuture,當所有輸入的CompletableFuture都完成時,這個新的CompletableFuture也會完成。
CompletableFuture<Void> allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
- 在所有異步任務完成后執(zhí)行操作:使用thenRun方法,當所有子任務完成后,執(zhí)行指定的操作。在這個示例中,用于關(guān)閉線程池。
allOf.thenRun(executor::shutdown);
??這些異步操作幫助提高程序的并發(fā)性和響應性,特別在處理批量任務時,可以更有效地利用系統(tǒng)資源。異步編程模型能夠允許程序在等待某些操作完成的同時繼續(xù)執(zhí)行其他操作,從而提高系統(tǒng)的效率。
總結(jié)
??異步設計在處理并發(fā)和提高系統(tǒng)性能方面具有優(yōu)勢,但也帶來了一些可能的問題。以上提供的場景和方案僅供參考。使用過程中應當根據(jù)業(yè)務特征合理選擇具體方案。
優(yōu)勢:
- 并發(fā)性和響應性: 異步設計可以提高系統(tǒng)的并發(fā)性和響應性,允許系統(tǒng)在等待某些操作完成的同時繼續(xù)執(zhí)行其他操作,從而更有效地利用資源。
- 性能提升: 異步操作允許系統(tǒng)并發(fā)地執(zhí)行多個任務,減少了等待時間,提高了系統(tǒng)的性能。尤其在 I/O 密集型任務中,異步操作能夠更充分地利用 CPU。
- 可伸縮性: 異步設計有助于構(gòu)建可伸縮的系統(tǒng),能夠更好地處理大量并發(fā)請求,適應系統(tǒng)負載的變化。
- 降低資源占用: 異步操作可以減少線程或進程的創(chuàng)建和銷毀開銷,從而降低系統(tǒng)資源的占用。
可能產(chǎn)生的問題:
- 數(shù)據(jù)一致性: 異步操作可能導致數(shù)據(jù)一致性的問題,特別是在涉及到多個異步任務的場景。需要采取額外的手段,如事務或事件溯源,來保障數(shù)據(jù)的一致性。
- 冪等性: 異步操作的重試機制可能引入冪等性問題。如果一個操作不是冪等的,重試可能導致不正確的結(jié)果。需要確保異步操作是冪等的,或者采用冪等性保障措施。
- 消息丟失: 在消息傳遞的異步系統(tǒng)中,由于網(wǎng)絡故障或系統(tǒng)故障,消息可能會丟失。需要實施消息確認、重試和持久化等機制,以防止消息丟失。
- 異步調(diào)用鏈的復雜性: 復雜的異步調(diào)用鏈可能使代碼難以理解和維護,需要謹慎設計和文檔化。