聊聊SpringAI流式輸出的底層實現(xiàn)?
在 Spring AI 中,流式輸出(Streaming Output)是一種逐步返回 AI 模型生成結(jié)果的技術(shù),允許服務(wù)器將響應(yīng)內(nèi)容分批次實時傳輸給客戶端,而不是等待全部內(nèi)容生成完畢后再一次性返回。
這種機制能顯著提升用戶體驗,尤其適用于大模型響應(yīng)較慢的場景(如生成長文本或復雜推理結(jié)果)。
技術(shù)實現(xiàn)
在 Spring AI 中流式輸出的實現(xiàn)有以下兩種方式:
- 通過 ChatModel 實現(xiàn)流式輸出。
- 通過 ChatClient 實現(xiàn)流式輸出。
ChatModel 流式輸出
Spring AI 中的流式輸出實現(xiàn)非常簡單,使用 ChatModel 中的 stream 即可實現(xiàn):
@RequestMapping(value = "/streamChat", produces = "text/event-stream")
public Flux<String> streamChat(@RequestParam(value = "msg") String msg) {
return chatModel.stream(msg);
}
ChatClient 流式輸出
ChatClient 流式輸出實現(xiàn)也很簡單,也是調(diào)用 stream().content() 返回 Flux 對象即可:
@RequestMapping("/stream")
public Flux<String> stream(String question) {
return chatClient.prompt(question)
.stream()
.content();
}
底層實現(xiàn)
那么問題來了流式輸出的底層實現(xiàn)究竟是啥呢?
根據(jù)以往的經(jīng)驗我們知道,流式輸出的實現(xiàn)技術(shù)基本有兩種:
- Spring MVC(Servlet)+ SSE 實現(xiàn)流式輸出。
- Spring WebFlux Reactor 模型實現(xiàn)流式輸出。
SSE 介紹
SSE(Server-Sent Events)是一種允許服務(wù)器向瀏覽器或其他客戶端推送實時更新的技術(shù)。它是一種單向通信機制,服務(wù)器可以主動向客戶端發(fā)送數(shù)據(jù),而客戶端無需頻繁輪詢服務(wù)器請求數(shù)據(jù)。SSE 是基于 HTTP 協(xié)議的,使用標準的 text/event-stream
MIME 類型來傳輸數(shù)據(jù)。
SSE 主要特點
- 單向通信:SSE 僅支持服務(wù)器到客戶端的單向通信,客戶端不能向服務(wù)器發(fā)送消息。如果需要雙向通信,可以結(jié)合 WebSocket 或其他技術(shù)。
- 基于 HTTP:SSE 使用標準的 HTTP 協(xié)議,不需要額外的協(xié)議支持,因此兼容性較好。
- 自動重連:客戶端在連接中斷后會自動嘗試重新連接。
- 數(shù)據(jù)格式:SSE 數(shù)據(jù)以特定的格式發(fā)送,每條消息以 data: 開頭,以兩個換行符 \n\n 結(jié)尾。
- 事件類型:可以為每條消息指定事件類型,客戶端可以通過監(jiān)聽特定事件類型來處理不同的消息。
Spring MVC(Spring Web)底層是基于 Servlet 實現(xiàn)的,它是使用 SseEmitter 技術(shù)實現(xiàn) SSE 協(xié)議實現(xiàn)流式輸出的。
SseEmitter 基本用法
這里提供一個 SseEmitter 的簡單使用案例,實現(xiàn)流式輸出,讓大家更好的理解這個技術(shù)點:
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@RestController
public class SseDemoController {
@GetMapping(value = "/sse-demo", produces = "text/event-stream")
public SseEmitter streamData() {
// 設(shè)置超時時間(單位:毫秒)
SseEmitter emitter = new SseEmitter(30_000L); // 30秒超時
// 異步任務(wù)模擬流式輸出
new Thread(() -> {
try {
for (int i = 1; i <= 5; i++) {
String message = "第 " + i + " 條消息";
emitter.send(message);
Thread.sleep(1000); // 每秒發(fā)送一次
}
emitter.complete(); // 完成推送
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e); // 異常處理
}
}).start();
return emitter;
}
}
Spring WebFlux 介紹
Spring WebFlux 是 Spring Framework 5 引入的響應(yīng)式 Web 框架,旨在解決高并發(fā)場景下傳統(tǒng)同步阻塞模型(如 Spring MVC)的性能瓶頸。其核心目標是通過非阻塞異步編程模型提升系統(tǒng)吞吐量,適用于 I/O 密集型任務(wù)(如微服務(wù)通信、實時數(shù)據(jù)流處理)。
Spring WebFlux 與 Spring MVC 不同,它基于 Reactive Streams 規(guī)范實現(xiàn)的,支持背壓機制(Backpressure),防止數(shù)據(jù)生產(chǎn)者壓垮消費者。
“
背壓機制:通過訂閱者主動控制數(shù)據(jù)流速,避免內(nèi)存溢出。例如,消費者可動態(tài)調(diào)整請求量,生產(chǎn)者根據(jù)反饋調(diào)整數(shù)據(jù)生成速度.
Spring AI 流式輸出
說完了前置知識,咱們回到主題:Spring AI 是如何實現(xiàn)流式輸出的?
要搞清楚這個問題,我們需要看流式輸出對象 Flux 的實現(xiàn)源碼:
查看 Flux 源碼我們發(fā)現(xiàn)它是屬于 reactor.core.publisher 包下的抽象類:
并且看類注釋和類所在的 jar 包我們就明白了:
Spring AI 中的流式輸出是通過 Reactor Streams 模型實現(xiàn)的,和 Spring WebFlux 的底層實現(xiàn)是一樣的技術(shù)。
Reactor 介紹
Reactor 是一種事件驅(qū)動的高性能網(wǎng)絡(luò)編程模型,主要用于處理高并發(fā)的網(wǎng)絡(luò) I/O 請求。其核心思想是通過一個或多個線程監(jiān)聽事件,并將事件分發(fā)給相應(yīng)的處理程序,從而實現(xiàn)高效的并發(fā)處理。
Reactor 模型的主要特征如下:
- 事件驅(qū)動:所有 I/O 操作都由事件觸發(fā)并處理。
- 非阻塞:操作不會因為 I/O 而掛起,避免了線程等待的開銷。
- 高效資源利用:通過少量線程處理大量并發(fā)連接,提升性能。
- 組件分離:將事件監(jiān)聽(Reactor)、事件分發(fā)(Dispatcher)和事件處理(Handler)解耦,使代碼結(jié)構(gòu)更清晰。
Reactor 實現(xiàn)方式有三種:
- 單線程 Reactor 模型:所有操作在一個線程完成,適用于低并發(fā)場景。
- 多線程 Reactor 模型:主線程處理連接,子線程池處理 I/O 和業(yè)務(wù)。
- 主從 Reactor 模型:主線程池處理連接,子線程池處理 I/O(進一步優(yōu)化資源分配)。
生產(chǎn)級別使用的 Reactor 基本都是主從 Reactor 模型,它的執(zhí)行流程如下:
小結(jié)
Spring AI 中的流式輸出有兩種實現(xiàn),而通過查看這兩種流式輸出的實現(xiàn)源碼可知,Spring AI 中的流式輸出是通過 Reactor Streams 技術(shù)實現(xiàn)的,和 Spring WebFlux 的底層實現(xiàn)技術(shù)一樣。