SSE打扮你的AI應(yīng)用,讓它美美噠!
前言
老粉絲都知道,我們有一個(gè)文檔問答的AI產(chǎn)品,然后有一個(gè)前端要求就是模仿ChatGPT展示后端返回的數(shù)據(jù)信息(打字效果)。剛開始呢,由于問答比較簡單,只是一些簡單的文本類型,并且后端返回的結(jié)果也有限,加上工期比較緊(反正就是各種原因),我們選擇了最原始的前后端數(shù)據(jù)交互方法。
前端發(fā)送問題,后端接入模型分析數(shù)據(jù),然后將最后的結(jié)果一股腦的返回給前端。就這樣歲月靜好的度過了一段時(shí)間,但是由于需求的變更。后端返回的信息又臭又長,然后還是沿用之前的數(shù)據(jù)獲取和展示方式,就顯得捉襟見肘了。
所以,此時(shí)我們就從我們知識百寶箱中搜索,然后一眼就相中SSE。之前在寫一個(gè)類ChatGPT應(yīng)用,前后端數(shù)據(jù)交互有哪幾種文章中,我們就對其有過簡單的介紹。
今天我們就來聊聊,如何實(shí)現(xiàn)基于SSE的前后端項(xiàng)目。(我們講主要邏輯,有些細(xì)節(jié)例如樣式等就不那么考究了)
效果展示
最終,我們就會(huì)得到一個(gè)類似下面的程序。
圖片
好了,天不早了,干點(diǎn)正事哇。
我們能所學(xué)到的知識點(diǎn)
- SSE是個(gè)啥?
- 用Node實(shí)現(xiàn)一個(gè)SSE服務(wù)
- SSE前端部分(React版本)
- 實(shí)現(xiàn)一個(gè)打字組件
1. SSE是個(gè)啥?
[服務(wù)器發(fā)送事件]((https://developer.mozilla.org/zh-CN/docs/Web/API/Server-sent_events "服務(wù)器發(fā)送事件"))(Server-Sent Events,SSE)提供了一種標(biāo)準(zhǔn)方法,通過 HTTP 將服務(wù)器數(shù)據(jù)推送到客戶端。與 WebSockets 不同,SSE 專門設(shè)計(jì)用于服務(wù)器到客戶端的單向通信,使其非常適用于實(shí)時(shí)信息的更新或者那些在不向服務(wù)器發(fā)送數(shù)據(jù)的情況下實(shí)時(shí)更新客戶端的情況。
服務(wù)器發(fā)送事件 (SSE) 允許服務(wù)器在任何時(shí)候向?yàn)g覽器推送數(shù)據(jù):
- 瀏覽器仍然會(huì)發(fā)出初始請求以建立連接。
- 服務(wù)器返回一個(gè)事件流響應(yīng)并保持連接打開。
- 服務(wù)器可以使用這個(gè)連接在任何時(shí)候發(fā)送文本消息。
- 傳入的數(shù)據(jù)在瀏覽器中觸發(fā)一個(gè) JavaScript 事件。事件處理程序函數(shù)可以解析數(shù)據(jù)并更新 DOM。
?
本質(zhì)上,SSE 是一個(gè)無盡的數(shù)據(jù)流??梢詫⑵湟暈橄螺d一個(gè)無限大的文件,以小塊形式攔截和讀取。(類比我們之前講過的大文件分片上傳和分片下載)
SSE 首次實(shí)現(xiàn)于 2006 年,所有主要瀏覽器都支持這個(gè)標(biāo)準(zhǔn)。它可能不如 WebSockets[1] 知名,但SSE更簡單,使用標(biāo)準(zhǔn) HTTP,支持單向通信,并提供自動(dòng)重新連接功能。
SSE組件
我們可以將服務(wù)器發(fā)送事件視為單個(gè) HTTP 請求,其中后端不會(huì)立即發(fā)送整個(gè)主體,而是保持連接打開,并通過每次發(fā)送事件時(shí)發(fā)送單個(gè)行來逐步傳輸答復(fù)。
圖片
SSE是一個(gè)由兩個(gè)組件組成的標(biāo)準(zhǔn):
- 瀏覽器中的 EventSource 接口[2],允許客戶端訂閱事件:它提供了一種通過抽象較低級別的連接和消息處理來訂閱事件流的便捷方法。
- 事件流協(xié)議:描述服務(wù)器發(fā)送的事件必須遵循的標(biāo)準(zhǔn)純文本格式,以便 EventSource 客戶端理解和傳播它們
EventSource
作為核心的組件,EventSource的兼容性良好。
圖片
工作原理
服務(wù)端部分
服務(wù)器需要設(shè)置 HTTP 頭部 Content-Type: text/event-stream 并保持連接不斷開,以持續(xù)發(fā)送事件。典型的服務(wù)器發(fā)送事件的格式如下:
data: 這是一個(gè)事件消息
data: 這是另一個(gè)事件消息
可以包含多個(gè)字段:
id: 1234
event: customEvent
data: {"message": "這是一個(gè)自定義事件"}
retry: 10000
- id:事件 ID,客戶端會(huì)自動(dòng)保存這個(gè) ID,并在重連時(shí)發(fā)送 Last-Event-ID 頭部。
- event:事件類型,客戶端可以根據(jù)類型進(jìn)行不同處理。
- data:事件數(shù)據(jù)。
- retry:建議客戶端重新連接的時(shí)間間隔(毫秒)。
客戶端部分
客戶端使用 JavaScript 創(chuàng)建一個(gè) EventSource 對象并監(jiān)聽事件:
const eventSource = new EventSource('server-url');
eventSource.onmessage = function(event) {
console.log('收到事件數(shù)據(jù):', event.data);
};
eventSource.onerror = function(event) {
console.log('事件源連接錯(cuò)誤:', event);
};
eventSource.addEventListener('customEvent', function(event) {
console.log('收到自定義事件:', event.data);
});
更高級用法
在單個(gè)頻道上發(fā)送不同的數(shù)據(jù)
服務(wù)器發(fā)送的消息可以有一個(gè)相關(guān)的事件:在 data: 行上方傳遞,以識別特定類型的信息:
event: React
data: React is great!
event: Rust
data: { "Rust": "我很喜歡", }
event: AI
data: { "name": "OpenAI" }
這些不會(huì)觸發(fā)客戶端的 message 事件處理程序。我們必須為每種類型的事件添加處理程序。例如:
// react 消息處理程序
source.addEventListener('React', e => {
document.getElementById('React')
.textContent = e.data;
});
// Rust 消息處理程序
source.addEventListener('Rust', e => {
const r = JSON.parse(e.data);
document.getElementById('Rust')
.textContent = `${r.Rust}`;
});
// AI 消息處理程序
source.addEventListener('AI', e => {
const ai = JSON.parse(e.data);
document.getElementById(`ai`)
.textContent = `${ai.name}`;
});
使用數(shù)據(jù)標(biāo)識符
可選地,服務(wù)器也可以在 data: 行之后發(fā)送一個(gè) id::
event: React
data: React is great!
id: 42
如果連接斷開,瀏覽器會(huì)在 Last-Event-ID HTTP 頭中發(fā)送最后的 id,以便服務(wù)器可以重新發(fā)送任何丟失的消息。
最新的 ID 也可以在客戶端的事件對象的 .lastEventId 屬性中獲?。?/p>
// news 消息處理程序
source.addEventListener('React', e => {
console.log(`last ID: ${e.lastEventId}`);
document.getElementById('React')
.textContent = e.data;
});
指定重試延遲
雖然重新連接是自動(dòng)的,但我們的服務(wù)器可能知道在特定時(shí)間段內(nèi)不會(huì)有新數(shù)據(jù),因此無需保持活動(dòng)的通信通道。服務(wù)器可以發(fā)送一個(gè)包含毫秒值的 retry: 響應(yīng),無論是單獨(dú)發(fā)送還是作為最終消息的一部分。例如:
retry: 60000
data: 你很好,這段時(shí)間我們還是別聯(lián)系了!
收到后,瀏覽器會(huì)斷開 SSE 連接,并在延遲期過后嘗試重新連接。
其他事件處理程序
除了 message 和命名事件,我們還可以在客戶端 JavaScript 中創(chuàng)建 open 和 error 處理程序。
open 事件在服務(wù)器連接建立時(shí)觸發(fā)。可以用于運(yùn)行額外的配置代碼或初始化 DOM 元素:
const source = new EventSource('/sse1');
source.addEventListener('open', e => {
console.log('SSE connection established.');
});
error 事件在服務(wù)器連接失敗或終止時(shí)觸發(fā)。我們可以檢查事件對象的 .eventPhase 屬性以查看發(fā)生了什么:
source.addEventListener('error', e => {
if (e.eventPhase === EventSource.CLOSED) {
console.log('SSE connection closed');
} else {
console.log('error', e);
}
});
無需重新連接:它會(huì)自動(dòng)進(jìn)行。
終止 SSE 通信
瀏覽器可以使用 EventSource 對象的 .close() 方法終止 SSE 通信。例如:
const source = new EventSource('/sse1');
// 一小時(shí)后關(guān)閉
setTimeout(() => source.close(), 3600000);
服務(wù)器可以通過以下方式終止連接:
- 觸發(fā) res.end() 或發(fā)送一個(gè) retry: 延遲,然后
- 當(dāng)相同的瀏覽器嘗試重新連接時(shí)返回 HTTP 狀態(tài) 204。
只有瀏覽器可以通過創(chuàng)建一個(gè)新的 EventSource 對象重新建立連接。
優(yōu)點(diǎn)
優(yōu)點(diǎn) | 描述 |
簡單性 | 比 WebSocket 更簡單的 API 設(shè)計(jì) |
自動(dòng)管理重連 | 內(nèi)置的重連機(jī)制使開發(fā)更簡便 |
瀏覽器支持 | 現(xiàn)代瀏覽器普遍支持 EventSource |
缺點(diǎn)
缺點(diǎn) | 描述 |
單向通信 | 無法從客戶端向服務(wù)器發(fā)送數(shù)據(jù) |
基于 HTTP | 相比 WebSocket,SSE 在處理高頻率數(shù)據(jù)傳輸時(shí)性能可能較低 |
受限于同源策略 | 跨域通信需要額外配置 CORS(跨域資源共享) |
在講代碼前,我們來簡單說一下我們要實(shí)現(xiàn)的交互
- 前端輸入信息
- 通過Post接口傳人后端
- 后端處理請求,拼接數(shù)據(jù),返回SSE格式數(shù)據(jù)
- 前端通過EventSource事件接收數(shù)據(jù)
2. 用Node實(shí)現(xiàn)一個(gè)SSE服務(wù)
如果想了解一個(gè)事物的全貌,那就是接近它,了解它,實(shí)現(xiàn)它。
那么,我們就來自己用Node實(shí)現(xiàn)一個(gè)SSE服務(wù)。我們使用express[3]來搭建后端服務(wù)。
在我們心儀的目錄下,執(zhí)行如下命令
mkdir SSE &&
cd SSE &&
mkdir Server &&
cd Server &&
npm init
構(gòu)建一個(gè)簡單的Node項(xiàng)目。
然后,更新我們的package.json,這里就偷懶了,我直接把本地的內(nèi)容復(fù)制下來了。
{
"name": "server",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"build": "tsc && node -v",
"dev": "tsc && tsc-watch --onSuccess \"node dist/index.js\""
},
"dependencies": {
"@types/uuid": "^10.0.0",
"body-parser": "^1.20.2",
"cors": "^2.8.5",
"express": "^4.18.2",
"uuid": "^10.0.0"
},
"devDependencies": {
"@types/cors": "^2.8.13",
"@types/express": "^4.17.17",
"tsc-watch": "^6.0.4",
"typescript": "^5.1.6"
},
"author": "Front789",
"license": "ISC"
}
處理主要邏輯
我們將只要的邏輯方式在src/index.ts中。
圖片
讓我們來挑幾個(gè)重要的點(diǎn)來解釋一下:
導(dǎo)入依賴和初始化Express
import express from "express";
import cors from "cors";
import bodyParser from "body-parser";
import { v4 as uuidv4 } from "uuid";
const app = express();
app.use(cors());
app.use(bodyParser.json());
const port = 4000;
這是一段實(shí)例化Express的代碼。不做過多解釋。我們是用了兩個(gè)中間件
- app.use(cors()): 應(yīng)用 CORS 中間件,使服務(wù)器能夠處理跨域請求。
- app.use(bodyParser.json()): 應(yīng)用 Body Parser 中間件,自動(dòng)解析請求體中的 JSON 數(shù)據(jù),并將其存儲在 req.body 中。
處理SSE鏈接
// SSE連接處理
app.get('/api/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 發(fā)送頭部信息到客戶端
const clientId = uuidv4();
const newClient = {
id: clientId,
res,
};
clients.push(newClient);
req.on('close', () => {
console.log(`${clientId} Connection closed`);
clients = clients.filter((client) => client.id !== clientId);
});
});
這部分其實(shí)也很簡單,但是呢,我們要特別注意一下res.setHeader()部分。
- Content-Type: text/event-stream: 設(shè)置內(nèi)容類型為 text/event-stream,表明這是一個(gè) SSE 連接。
- Cache-Control: no-cache: 禁用緩存,以確保實(shí)時(shí)數(shù)據(jù)傳輸。
- Connection: keep-alive: 保持連接不斷開。
- res.flushHeaders(): 立即將響應(yīng)頭部發(fā)送給客戶端,確保連接保持活躍狀態(tài)。
當(dāng)我們每次接收到/api/events時(shí),沒有立馬向請求方返回?cái)?shù)據(jù),而是構(gòu)建一個(gè)newClient,并且將其push到一個(gè)全局變量clients中。
當(dāng)客戶端關(guān)閉連接時(shí),從列表中移除相應(yīng)的客戶端,我們在close中執(zhí)行對應(yīng)的移除操作。
處理Post請求
// 處理POST請求
app.post('/api/message', (req, res) => {
const userInput = req.body.message;
// 模擬處理消息并推送給所有客戶端
const responses = generateChatGPTResponse(userInput);
let index = 0;
const intervalId = setInterval(() => {
if (index < responses.length) {
clients.forEach((client) => client.res.write(`data: ${JSON.stringify({ message: responses[index] })}\n\n`));
index++;
} else {
clearInterval(intervalId);
res.end();
}
}, 1000); // 每秒發(fā)送一個(gè)響應(yīng)
res.status(200).send();
});
function generateChatGPTResponse(input:string) {
// 模擬AI模型的響應(yīng),這里可以替換為實(shí)際的模型調(diào)用
return [
`你說的是: ${input}`,
"這是AI模型的第一段響應(yīng)。",
"這是AI模型的第二段響應(yīng)。",
"這是AI模型的第三段響應(yīng)。",
];
}
該段代碼代碼也是我們常見的用于處理Post請求的方法。有幾點(diǎn)需要額外注意一下
- 使用 req.body.message 獲取客戶端發(fā)送的消息內(nèi)容,這需要 body-parser 中間件來解析請求體中的 JSON 數(shù)據(jù)
- 使用 setInterval 定時(shí)器每秒推送一條消息給所有 SSE 連接的客戶端
- 在消息推送開始之前,立即向發(fā)送 POST 請求的客戶端返回一個(gè) 200 狀態(tài)碼,表示請求已成功接收。
服務(wù)啟動(dòng)
然后我們就可以使用yarn dev在port為4000的端口中啟動(dòng)一個(gè)SSE服務(wù),此時(shí)坐等對應(yīng)的請求到來即可。
3. SSE前端部分(React版本)
既然,SSE后端服務(wù)已經(jīng)有了,那么我們來在前端接入對應(yīng)的服務(wù)。
我們在SSE目錄下,使用我們的腳手架在生成一個(gè)前端服務(wù)。
npx f_cli_f create Client
然后選擇自己擅長的技術(shù)即可。然后按照對應(yīng)的提示按照并啟動(dòng)服務(wù)即可。如果對我們的腳手架還不了解,可以翻看之前的文章Rust 賦能前端-開發(fā)一款屬于你的前端腳手架
最后,我們在SSE目錄下,就會(huì)生成如下的目錄信息。
---SSE
---Client(前端項(xiàng)目)
---Server (后端服務(wù))
前端代碼邏輯
我們在Client/src/pages/新建一個(gè)ChatComponent組件。
UI部分
<div className='flex flex-col justify-center items-center w-full h-full'>
<div className='flex flex-col justify-center items-center flex-1 w-full'>
<Typewriter text={'大家好,我是柒八九。一個(gè)專注于前端開發(fā)技術(shù)/Rust及AI應(yīng)用知識分享的Coder'} delay={100} infinite={false} />
{messages.map((msg, index) => (
// <p key={index}>{msg}</p>
<Typewriter text={msg} delay={100} infinite={false} key={index}/>
))}
</div>
<Form form={form} className='w-9/12'>
<Form.Item className={styles['message-item']} name="message">
<Input.TextArea
autoSize={{ minRows: 1, maxRows: 3 }}
placeholder="輸入內(nèi)容開始對話(Shift + Enter 換行)"
onPressEnter={handleTextAreaKeyDown}
/>
</Form.Item>
</Form>
</div>
UI部分呢,我們大致分為兩部分
- 展示后端返回信息
- TextArea收集用戶輸入信息
然后,我們在TextArea的PressEnter中執(zhí)行向后端發(fā)送的操作。
注冊EventSource
我們在Effect中注冊EventSource相關(guān)事件。
useEffect(() => {
const eventSource = new EventSource('http://localhost:4000/api/events');
eventSource.onmessage = function (event) {
const data = JSON.parse(event.data);
const { message } = data;
setMessages((prevMessages) => [...prevMessages, message]);
};
return () => {
eventSource.close();
};
}, []);
有幾點(diǎn)需要說明
- 我們是在組件初始化的時(shí)候,注冊EventSource
- 由于我們在上一節(jié)中已經(jīng)在http://localhost:4000中啟用了SSE服務(wù),所以在EventSource中傳人的是對應(yīng)的SSE地址
- 在onmessage中我們解析從后端返回的數(shù)據(jù),并存入到state-message中。
當(dāng)數(shù)據(jù)返回后,對應(yīng)的state-message發(fā)生變化,那也就觸發(fā)了React的重新渲染。就可以在UI部分看到后端返回的信息。
handleTextAreaKeyDown
這部分是調(diào)用指定的后端接口,將用戶信息傳遞給后端服務(wù),用于做指定信息的處理。
const handleTextAreaKeyDown= async (event) => {
const { keyCode, shiftKey } = event;
if (keyCode == 13 && !shiftKey) {
event.preventDefault();
const message = form.getFieldValue('message');
if (message && message.trim().length > 0) {
if (message.trim().length > 0) {
await fetch('http://localhost:4000/api/message', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({ message }),
});
}
form.setFieldValue('message', '');
}
}
};
在PressEnter事件中,我們還判斷event的keyCode和shiftKey來實(shí)現(xiàn)在TextArea中換行的操作。也就是只有在單純的觸發(fā)Enter才會(huì)向后端傳遞數(shù)據(jù)。
我們之所以選擇用Post來向后端發(fā)起情況,因?yàn)槲覀冇脩糨斎氲男畔?,不單單是文本信息,也可以是PDF/Word/Text等文本資源。
最終,我們就會(huì)得到一個(gè)和本文開頭的那個(gè)效果。
圖片
求豆麻袋,好像有一個(gè)東西沒給大家展示,那就是實(shí)現(xiàn)打字效果。別著急,我們這就說。
4. 實(shí)現(xiàn)一個(gè)打字組件
其實(shí)呢,針對一個(gè)完整的應(yīng)用,我們不僅僅需要處理純文本信息,我們還需要處理類似Table/Code/Img等富文本的展示。
此時(shí),最好的后端數(shù)據(jù)返回是啥呢,MarkDown。沒錯(cuò),ChatGPT也是這種格式,只不過它在前端顯示的時(shí)候,用了針對這類信息的展示處理。
而,我們今天的主要方向是講SSE,而針對其他類型的信息展示不在此篇文章內(nèi)。如果大家有興趣了解,后面我們也可以針對此處的內(nèi)容展開聊聊。
話題扯的有點(diǎn)遠(yuǎn)了,我們現(xiàn)在進(jìn)入這節(jié)的主題,寫一個(gè)純前端的打字效果。
其實(shí)呢,針對現(xiàn)成的打字效果有很多。例如
- typed-js[4]
- react-typed[5]
但是呢,本著知識探索的精神,我們今天來實(shí)現(xiàn)一個(gè)屬于自己的打字效果。
在ChatComponent目錄下,新建一個(gè)Typewriter文件夾。
然后新建三個(gè)文件
- index.tsx:只要邏輯
- Cursor.tsx:處理光標(biāo)邏輯
- index.module.scss:存放樣式信息。
下面我們就來看看它們各自的實(shí)現(xiàn)邏輯。
index.tsx
import React, { useState, useEffect } from 'react';
import style from './index.module.scss';
import Cursor from './Cursor';
interface TypewriterProps {
text: string | React.ReactNode;
delay: number;
infinite?: boolean;
}
const Typewriter: React.FC<TypewriterProps> = ({ text, delay, infinite }) => {
const [currentText, setCurrentText] = useState<string>('');
const [currentIndex, setCurrentIndex] = useState<number>(0);
useEffect(() => {
let timeout: number;
if (currentIndex < text.length) {
timeout = setTimeout(() => {
setCurrentText((prevText) => prevText + text[currentIndex]);
setCurrentIndex((prevIndex) => prevIndex + 1);
}, delay);
} else if (infinite) {
setCurrentIndex(0);
setCurrentText('');
}
return () => clearTimeout(timeout);
}, [currentIndex, delay, infinite, text]);
return (
<span className={style['text-writer-wrapper']}>
<span dangerouslySetInnerHTML={{ __html: currentText }}></span>
{currentIndex < text.length && <Cursor />}
</span>
);
};
export default Typewriter;
其實(shí)呢,上面的邏輯很簡單。
Typewriter接收三個(gè)參數(shù)
- text:要顯示的文本,可以是字符串或 React 節(jié)點(diǎn)。
- delay:每個(gè)字符之間的延遲時(shí)間(以毫秒為單位)。
- infinite:是否無限循環(huán)顯示文本,默認(rèn)為 false。
使用 useEffect 鉤子在每次 currentIndex 改變時(shí)運(yùn)行:
- 如果 currentIndex 小于 text 的長度:
設(shè)置一個(gè) setTimeout 以延遲添加下一個(gè)字符到 currentText。
遞增 currentIndex。
- 否則如果 infinite 為 true:
重置 currentIndex 和 currentText 以開始新的循環(huán)。
返回一個(gè)清除定時(shí)器的函數(shù),以避免內(nèi)存泄漏。
然后,我們使用dangerouslySetInnerHTML來更新文本信息。
Cursor.tsx
這個(gè)組件就更簡單了,就是繪制了一個(gè)svg,用于在文本輸入過程中顯示光標(biāo)。
import style from './index.module.scss';
export default function CursorSVG() {
return (
<svg viewBox="8 4 8 16" xmlns="http://www.w3.org/2000/svg" className={style['cursor']}>
<rect x="10" y="6" width="2" height="100%" fill="black" />
</svg>
);
}
index.module.scss
.text-writer-wrapper{
@keyframes flicker {
0% {
opacity: 0;
}
50% {
opacity: 1;
}
100% {
opacity: 0;
}
}
.cursor {
display: inline-block;
width: 1ch;
animation: flicker 0.5s infinite;
}
}
這段代碼主要用于創(chuàng)建打字機(jī)效果中的光標(biāo)閃爍效果:
- @keyframes flicker 動(dòng)畫定義了光標(biāo)的閃爍效果,通過改變透明度來實(shí)現(xiàn)閃爍。
- .cursor 類應(yīng)用了閃爍動(dòng)畫,并設(shè)置了寬度,使其顯示為一個(gè)閃爍的光標(biāo)。
最終效果是在 .text-writer-wrapper 中顯示的光標(biāo)會(huì)每 0.5 秒閃爍一次,模擬文本編輯器中的光標(biāo)效果。