Spring 實現(xiàn)三種異步流式接口,干掉接口超時煩惱
如何處理比較耗時的接口?
這題我熟,直接上異步接口,使用 Callable、WebAsyncTask 和 DeferredResult、CompletableFuture等均可實現(xiàn)。
但這些方法有局限性,處理結(jié)果僅返回單個值。在某些場景下,如果需要接口異步處理的同時,還持續(xù)不斷地向客戶端響應處理結(jié)果,這些方法就不夠看了。
Spring 框架提供了多種工具支持異步流式接口,如 ResponseBodyEmitter、SseEmitter 和 StreamingResponseBody。這些工具的用法簡單,接口中直接返回相應的對象或泛型響應實體 ResponseEntity<xxxx>,如此這些接口就是異步的,且執(zhí)行耗時操作亦不會阻塞 Servlet 的請求線程,不影響系統(tǒng)的響應能力。
下面將逐一介紹每個工具的使用及其應用場景。
ResponseBodyEmitter
ResponseBodyEmitter適用于要動態(tài)生成內(nèi)容并逐步發(fā)送給客戶端的場景,例如:文件上傳進度、實時日志等,可以在任務執(zhí)行過程中逐步向客戶端發(fā)送更新。
舉個例子,經(jīng)常用GPT你會發(fā)現(xiàn)當你提問后,得到的答案并不是一次性響應呈現(xiàn)的,而是逐步動態(tài)顯示。這樣做的好處是,讓你感覺它在認真思考,交互體驗比直接返回完整答案更為生動和自然。
圖片
使用ResponseBodyEmitter來實現(xiàn)下這個效果,創(chuàng)建 ResponseBodyEmitter 發(fā)送器對象,模擬耗時操作逐步調(diào)用 send 方法發(fā)送消息。
注意:ResponseBodyEmitter 的超時時間,如果設(shè)置為 0 或 -1,則表示連接不會超時;如果不設(shè)置,到達默認的超時時間后連接會自動斷開。其他兩種工具也是同樣的用法,后邊不在贅述了
@GetMapping("/bodyEmitter")
public ResponseBodyEmitter handle() {
// 創(chuàng)建一個ResponseBodyEmitter,-1代表不超時
ResponseBodyEmitter emitter = new ResponseBodyEmitter(-1L);
// 異步執(zhí)行耗時操作
CompletableFuture.runAsync(() -> {
try {
// 模擬耗時操作
for (int i = 0; i < 10000; i++) {
System.out.println("bodyEmitter " + i);
// 發(fā)送數(shù)據(jù)
emitter.send("bodyEmitter " + i + " @ " + new Date() + "\n");
Thread.sleep(2000);
}
// 完成
emitter.complete();
} catch (Exception e) {
// 發(fā)生異常時結(jié)束接口
emitter.completeWithError(e);
}
});
return emitter;
}
實現(xiàn)代碼非常簡單。通過模擬每2秒響應一次結(jié)果,請求接口時可以看到頁面數(shù)據(jù)在動態(tài)生成。效果與 GPT 回答基本一致。
圖片
SseEmitter
SseEmitter 是 ResponseBodyEmitter 的一個子類,它同樣能夠?qū)崿F(xiàn)動態(tài)內(nèi)容生成,不過主要將它用在服務器向客戶端推送實時數(shù)據(jù),如實時消息推送、狀態(tài)更新等場景。在我之前的一篇文章 我有 7種 實現(xiàn)web實時消息推送的方案 中詳細介紹了 Server-Sent Events (SSE) 技術(shù),感興趣的可以回顧下。
圖片
SSE在服務器和客戶端之間打開一個單向通道,服務端響應的不再是一次性的數(shù)據(jù)包而是text/event-stream類型的數(shù)據(jù)流信息,在有數(shù)據(jù)變更時從服務器流式傳輸?shù)娇蛻舳恕?/p>
圖片
整體的實現(xiàn)思路有點類似于在線視頻播放,視頻流會連續(xù)不斷的推送到瀏覽器,你也可以理解成,客戶端在完成一次用時很長(網(wǎng)絡(luò)不暢)的下載。
客戶端JS實現(xiàn),通過一次 HTTP 請求建立連接后,等待接收消息。此時,服務端為每個連接創(chuàng)建一個 SseEmitter 對象,通過這個通道向客戶端發(fā)送消息。
<body>
<div id="content" style="text-align: center;">
<h1>SSE 接收服務端事件消息數(shù)據(jù)</h1>
<div id="message">等待連接...</div>
</div>
<script>
let source = null;
let userId = 7777
function setMessageInnerHTML(message) {
const messageDiv = document.getElementById("message");
const newParagraph = document.createElement("p");
newParagraph.textContent = message;
messageDiv.appendChild(newParagraph);
}
if (window.EventSource) {
// 建立連接
source = new EventSource('http://127.0.0.1:9033/subSseEmitter/'+userId);
setMessageInnerHTML("連接用戶=" + userId);
/**
* 連接一旦建立,就會觸發(fā)open事件
* 另一種寫法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立連接。。。");
}, false);
/**
* 客戶端收到服務器發(fā)來的數(shù)據(jù)
* 另一種寫法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
} else {
setMessageInnerHTML("你的瀏覽器不支持SSE");
}
</script>
</body>
在服務端,我們將 SseEmitter 發(fā)送器對象進行持久化,以便在消息產(chǎn)生時直接取出對應的 SseEmitter 發(fā)送器,并調(diào)用 send 方法進行推送。
private static final Map<String, SseEmitter> EMITTER_MAP = new ConcurrentHashMap<>();
@GetMapping("/subSseEmitter/{userId}")
public SseEmitter sseEmitter(@PathVariable String userId) {
log.info("sseEmitter: {}", userId);
SseEmitter emitterTmp = new SseEmitter(-1L);
EMITTER_MAP.put(userId, emitterTmp);
CompletableFuture.runAsync(() -> {
try {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("sseEmitter" + userId + " @ " + LocalTime.now())
.id(String.valueOf(userId))
.name("sseEmitter");
emitterTmp.send(event);
} catch (Exception ex) {
emitterTmp.completeWithError(ex);
}
});
return emitterTmp;
}
@GetMapping("/sendSseMsg/{userId}")
public void sseEmitter(@PathVariable String userId, String msg) throws IOException {
SseEmitter sseEmitter = EMITTER_MAP.get(userId);
if (sseEmitter == null) {
return;
}
sseEmitter.send(msg);
}
接下來向 userId=7777 的用戶發(fā)送消息,127.0.0.1:9033/sendSseMsg/7777?msg=歡迎關(guān)注-->程序員小富,該消息可以在頁面上實時展示。
圖片
而且SSE有一點比較好,客戶端與服務端一旦建立連接,即便服務端發(fā)生重啟,也可以做到自動重連。
圖片
StreamingResponseBody
StreamingResponseBody 與其他響應處理方式略有不同,主要用于處理大數(shù)據(jù)量或持續(xù)數(shù)據(jù)流的傳輸,支持將數(shù)據(jù)直接寫入OutputStream。
例如,當我們需要下載一個超大文件時,使用 StreamingResponseBody 可以避免將文件數(shù)據(jù)一次性加載到內(nèi)存中,而是持續(xù)不斷的把文件流發(fā)送給客戶端,從而解決下載大文件時常見的內(nèi)存溢出問題。
接口實現(xiàn)直接返回 StreamingResponseBody 對象,將數(shù)據(jù)寫入輸出流并刷新,調(diào)用一次flush就會向客戶端寫入一次數(shù)據(jù)。
@GetMapping("/streamingResponse")
public ResponseEntity<StreamingResponseBody> handleRbe() {
StreamingResponseBody stream = out -> {
String message = "streamingResponse";
for (int i = 0; i < 1000; i++) {
try {
out.write(((message + i) + "\r\n").getBytes());
out.write("\r\n".getBytes());
//調(diào)用一次flush就會像前端寫入一次數(shù)據(jù)
out.flush();
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
return ResponseEntity.ok().contentType(MediaType.TEXT_HTML).body(stream);
}
demo這里輸出的是簡單的文本流,如果是下載文件那么轉(zhuǎn)換成文件流效果是一樣的。
圖片
總結(jié)
這篇介紹三種實現(xiàn)異步流式接口的工具,算是 Spring 知識點的掃盲。使用起來比較簡單,沒有什么難點,但它們在實際業(yè)務中的應用場景還是很多的,通過這些工具,可以有效提高系統(tǒng)的性能和響應能力。
文中 Demo Github 地址:https://github.com/chengxy-nds/Springboot-Notebook/tree/master/springboot101/%E9%80%9A%E7%94%A8%E5%8A%9F%E8%83%BD/springboot-streaming