打字機(jī)效果
最近在搭建chat gpt代理的時(shí)候,發(fā)現(xiàn)自己的配置雖然能夠調(diào)通接口,返回?cái)?shù)據(jù),但是結(jié)果是一次性顯示出來的,不像之前的chat gpt的官網(wǎng)demo那樣實(shí)現(xiàn)了打字機(jī)效果,一個(gè)字一個(gè)字出來。

所以研究了一下chat gpt打字機(jī)效果的原理,后續(xù)如果要實(shí)現(xiàn)類似的效果可以借鑒。
純前端實(shí)現(xiàn)打字機(jī)效果
最開始我搜索打字機(jī)效果時(shí),出現(xiàn)的結(jié)果大多數(shù)是純前端的方案。其原理也很簡(jiǎn)單,通過js定時(shí)把內(nèi)容輸出到屏幕。下面是chat gpt的答案:
前端實(shí)現(xiàn)打字機(jī)效果可以通過以下步驟:
- 將文本內(nèi)容嵌入到 HTML 元素中,如 div 或 span。
- 通過 CSS 樣式設(shè)置元素的顯示方式為隱藏(如 display: none;)。
- 使用 JavaScript 獲取該元素,并逐個(gè)顯示其中的字符。
- 使用定時(shí)器(如 setInterval() 函數(shù))控制每個(gè)字符的出現(xiàn)時(shí)間間隔,從而實(shí)現(xiàn)逐個(gè)逐個(gè)顯示的效果。
- 當(dāng)所有字符都被顯示后,停止定時(shí)器以避免不必要的計(jì)算開銷。
下面是一個(gè)簡(jiǎn)單的示例代碼:
HTML:
<div id="typewriter">Hello World!</div>
CSS:
#typewriter {
display: none;
}
JS:
const element = document.getElementById('typewriter');
let i = 0;
const interval = setInterval(() => {
element.style.display = 'inline';
element.textContent = element.textContent.slice(0, i++) + '_';
if (i > element.textContent.length) {
clearInterval(interval);
element.textContent = element.textContent.slice(0, -1);
}
}, 100);
該代碼會(huì)將 id 為 typewriter 的元素中的文本逐個(gè)顯示,每個(gè)字符之間相隔 100 毫秒。最終顯示完畢后,會(huì)將最后一個(gè)字符的下劃線去除。
流式輸出
后面我抓包以及查看了chat gpt的官方文檔之后,發(fā)現(xiàn)事情并沒有這么簡(jiǎn)單。chat gpt的打字機(jī)效果并不是后端一次性返回后,純前端的樣式。而是后端通過流式輸出不斷向前端輸出內(nèi)容。
在chat gpt官方文檔中,有一個(gè)參數(shù)可以讓它實(shí)現(xiàn)流式輸出:

這是一個(gè)叫“event_stream_format”的協(xié)議規(guī)范。
event_stream_format(簡(jiǎn)稱 ESF)是一種基于 HTTP/1.1 的、用于實(shí)現(xiàn)服務(wù)器推送事件的協(xié)議規(guī)范。它定義了一種數(shù)據(jù)格式,可以將事件作為文本流發(fā)送給客戶端。ESF 的設(shè)計(jì)目標(biāo)是提供簡(jiǎn)單有效的實(shí)時(shí)通信方式,以及支持眾多平臺(tái)和編程語言。
ESF 數(shù)據(jù)由多行文本組成,每行用 \n(LF)分隔。其中,每個(gè)事件由以下三部分組成:
- 事件類型(event)
- 數(shù)據(jù)(data)
- 標(biāo)識(shí)符(id)
例如:
event: message
data: Hello, world!
id: 123
這個(gè)例子表示一個(gè)名為 message 的事件,攜帶著消息內(nèi)容 Hello, world!,并提供了一個(gè)標(biāo)識(shí)符為 123 的可選參數(shù)。
ESF 還支持以下兩種特殊事件類型:
- 注釋(comment):以冒號(hào)開頭的一行,只做為注釋使用。
- 重傳(retry):指定客戶端重連的時(shí)間間隔,以毫秒為單位。
例如:
: This is a comment
retry: 10000
event: update
data: {"status": "OK"}
ESF 協(xié)議還支持 Last-Event-ID 頭部,它允許客戶端在斷線后重新連接,并從上次連接中斷處恢復(fù)。當(dāng)客戶端連接時(shí),可以通過該頭部將上次最新的事件 ID 傳遞給服務(wù)器,以便服務(wù)器根據(jù)該 ID 繼續(xù)發(fā)送事件。
ESF 是一種簡(jiǎn)單的、輕量級(jí)的協(xié)議,適用于需要實(shí)時(shí)數(shù)據(jù)交換和多方通信的場(chǎng)景。由于其使用了標(biāo)準(zhǔn)的 HTTP/1.1 協(xié)議,因此可以輕易地在現(xiàn)有的 Web 基礎(chǔ)設(shè)施上實(shí)現(xiàn)。
抓包可以發(fā)現(xiàn)這個(gè)響應(yīng)長(zhǎng)這樣:


可以看到是data: 加上一個(gè)json,每次的流式數(shù)據(jù)在delta里面。
http response中有幾個(gè)重要的頭:

其中,keep-alive是保持客戶端和服務(wù)端的雙向通信,這個(gè)大家應(yīng)該都比較了解。下面解釋一下另外兩個(gè)頭.
這里其實(shí)openai返回的是text/event-stream,text/event-stream 是一種流媒體協(xié)議,用于在 Web 應(yīng)用程序中推送實(shí)時(shí)事件。它的內(nèi)容是文本格式的,每個(gè)事件由一個(gè)或多個(gè)字段組成,以換行符(\n)分隔。這個(gè) MIME 類型通常用于服務(wù)器到客戶端的單向通信,例如服務(wù)器推送最新的新聞、股票報(bào)價(jià)等信息給客戶端。
我這里使用的開源項(xiàng)目chatgpt-web抓的包,請(qǐng)求被nodejs包了一層,返回了application/octet-stream (不太清楚這么做的動(dòng)機(jī)是什么),它是一種 MIME 類型,通常用于指示某個(gè)資源的內(nèi)容類型為二進(jìn)制文件,也就是未知的二進(jìn)制數(shù)據(jù)流。該類型通常不會(huì)執(zhí)行任何自定義處理,并且可以由客戶端根據(jù)需要進(jìn)行下載或保存。
Transfer-Encoding: chunked 是一種 HTTP 報(bào)文傳輸編碼方式,用于指示報(bào)文主體被分為多個(gè)等大小的塊(chunks)進(jìn)行傳輸。每個(gè)塊包含一個(gè)十六進(jìn)制數(shù)字的長(zhǎng)度字段,后跟一個(gè) CRLF(回車換行符),然后是實(shí)際的數(shù)據(jù)內(nèi)容,最后以另一個(gè) CRLF 結(jié)束。
使用 chunked 編碼方式可以使服務(wù)器在發(fā)送未知大小的數(shù)據(jù)時(shí)更加靈活,同時(shí)也可以避免一些限制整個(gè)響應(yīng)主體大小的限制。當(dāng)接收端收到所有塊后,會(huì)將它們組合起來,解壓縮(如果需要),并形成原始的響應(yīng)主體。
總之,Transfer-Encoding: chunked 允許服務(wù)器在發(fā)送 HTTP 響應(yīng)時(shí),動(dòng)態(tài)地生成報(bào)文主體,而不必事先確定其大小,從而提高了通信效率和靈活性。
服務(wù)端的實(shí)現(xiàn)
作為chat gpt代理
如果寫一個(gè)golang http服務(wù)作為chat gpt的代理,只需要循環(huán)掃描chat gpt返回的每行結(jié)果,每行作為一個(gè)事件輸出給前端就行了。核心代碼如下:
// 設(shè)置Content-Type標(biāo)頭為text/event-stream
w.Header().Set("Content-Type", "text/event-stream")
// 設(shè)置緩存控制標(biāo)頭以禁用緩存
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Keep-Alive", "timeout=5")
// 循環(huán)讀取響應(yīng)體并將每行作為一個(gè)事件發(fā)送到客戶端
scanner := bufio.NewScanner(resp.Body)
for scanner.Scan() {
eventData := scanner.Text()
if eventData == "" {
continue
}
fmt.Fprintf(w, "%s\n\n", eventData)
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
} else {
log.Println("Flushing not supported")
}
}
自己作為服務(wù)端
這里模仿openai的數(shù)據(jù)結(jié)構(gòu),自己作為服務(wù)端,返回流式輸出:
const Text = `
proxy_cache:通過這個(gè)模塊,Nginx 可以緩存代理服務(wù)器從后端服務(wù)器請(qǐng)求到的響應(yīng)數(shù)據(jù)。當(dāng)下一個(gè)客戶端請(qǐng)求相同的資源時(shí),Nginx 可以直接從緩存中返回響應(yīng),而不必去請(qǐng)求后端服務(wù)器。這大大降低了代理服務(wù)器的負(fù)載,同時(shí)也能提高客戶端訪問速度。需要注意的是,使用 proxy_cache 模塊時(shí)需要謹(jǐn)慎配置緩存策略,避免出現(xiàn)緩存不一致或者過期的情況。
proxy_buffering:通過這個(gè)模塊,Nginx 可以將后端服務(wù)器響應(yīng)數(shù)據(jù)緩沖起來,并在完整的響應(yīng)數(shù)據(jù)到達(dá)之后再將其發(fā)送給客戶端。這種方式可以減少代理服務(wù)器和客戶端之間的網(wǎng)絡(luò)連接數(shù),提高并發(fā)處理能力,同時(shí)也可以防止后端服務(wù)器過早關(guān)閉連接,導(dǎo)致客戶端無法接收到完整的響應(yīng)數(shù)據(jù)。
綜上所述, proxy_cache 和 proxy_buffering 都可以通過緩存技術(shù)提高代理服務(wù)器性能和安全性,但需要注意合理的配置和使用,以避免潛在的緩存不一致或者過期等問題。同時(shí), proxy_buffering 還可以通過緩沖響應(yīng)數(shù)據(jù)來提高代理服務(wù)器的并發(fā)處理能力,從而更好地服務(wù)于客戶端。
`
type ChatCompletionChunk struct {
ID string `json:"id"`
Object string `json:"object"`
Created int64 `json:"created"`
Model string `json:"model"`
Choices []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
Index int `json:"index"`
FinishReason *string `json:"finish_reason"`
} `json:"choices"`
}
func handleSelfRequest(w http.ResponseWriter, r *http.Request) {
// 設(shè)置Content-Type標(biāo)頭為text/event-stream
w.Header().Set("Content-Type", "text/event-stream")
// 設(shè)置緩存控制標(biāo)頭以禁用緩存
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Keep-Alive", "timeout=5")
w.Header().Set("Transfer-Encoding", "chunked")
// 生成一個(gè)uuid
uid := uuid.NewString()
created := time.Now().Unix()
for i, v := range Text {
eventData := fmt.Sprintf("%c", v)
if eventData == "" {
continue
}
var finishReason *string
if i == len(Text)-1 {
temp := "stop"
finishReason = &temp
}
chunk := ChatCompletionChunk{
ID: uid,
Object: "chat.completion.chunk",
Created: created,
Model: "gpt-3.5-turbo-0301",
Choices: []struct {
Delta struct {
Content string `json:"content"`
} `json:"delta"`
Index int `json:"index"`
FinishReason *string `json:"finish_reason"`
}{
{ Delta: struct {
Content string `json:"content"`
}{
Content: eventData,
},
Index: 0,
FinishReason: finishReason,
},
},
}
fmt.Println("輸出:" + eventData)
marshal, err := json.Marshal(chunk)
if err != nil {
return
}
fmt.Fprintf(w, "data: %v\n\n", string(marshal))
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
} else {
log.Println("Flushing not supported")
}
if i == len(Text)-1 {
fmt.Fprintf(w, "data: [DONE]")
flusher, ok := w.(http.Flusher)
if ok {
flusher.Flush()
} else {
log.Println("Flushing not supported")
}
} time.Sleep(100 * time.Millisecond)
}
}
核心是每次寫進(jìn)一行數(shù)據(jù)data: xx \n\n,最終以data: [DONE]結(jié)尾。
前端的實(shí)現(xiàn)
前端代碼參考https://github.com/Chanzhaoyu/chatgpt-web的實(shí)現(xiàn)。
這里核心是使用了axios的onDownloadProgress鉤子,當(dāng)stream有輸出時(shí),獲取chunk內(nèi)容,更新到前端顯示。
await fetchChatAPIProcess<Chat.ConversationResponse>({
prompt: message,
options,
signal: controller.signal,
onDownloadProgress: ({ event }) => {
const xhr = event.target
const { responseText } = xhr
// Always process the final line
const lastIndex = responseText.lastIndexOf('\n')
let chunk = responseText
if (lastIndex !== -1)
chunk = responseText.substring(lastIndex)
try {
const data = JSON.parse(chunk)
updateChat(
+uuid,
dataSources.value.length - 1,
{
dateTime: new Date().toLocaleString(),
text: lastText + data.text ?? '',
inversion: false,
error: false,
loading: false,
conversationOptions: { conversationId: data.conversationId, parentMessageId: data.id },
requestOptions: { prompt: message, options: { ...options } },
},
)
if (openLongReply && data.detail.choices[0].finish_reason === 'length') {
options.parentMessageId = data.id
lastText = data.text
message = ''
return fetchChatAPIOnce()
}
scrollToBottom()
}
catch (error) {
//
}
},
})
在底層的請(qǐng)求代碼中,設(shè)置對(duì)應(yīng)的header和參數(shù),監(jiān)聽data內(nèi)容,回調(diào)onProgress函數(shù)。
const responseP = new Promise((resolve, reject) => {
const url = this._apiReverseProxyUrl;
const headers = {
...this._headers,
Authorization: `Bearer ${this._accessToken}`,
Accept: "text/event-stream",
"Content-Type": "application/json"
};
if (this._debug) {
console.log("POST", url, { body, headers });
}
fetchSSE(
url,
{
method: "POST",
headers,
body: JSON.stringify(body),
signal: abortSignal,
onMessage: (data) => {
var _a, _b, _c;
if (data === "[DONE]") {
return resolve(result);
}
try {
const convoResponseEvent = JSON.parse(data);
if (convoResponseEvent.conversation_id) {
result.conversationId = convoResponseEvent.conversation_id;
}
if ((_a = convoResponseEvent.message) == null ? void 0 : _a.id) {
result.id = convoResponseEvent.message.id;
}
const message = convoResponseEvent.message;
if (message) {
let text2 = (_c = (_b = message == null ? void 0 : message.content) == null ? void 0 : _b.parts) == null ? void 0 : _c[0];
if (text2) {
result.text = text2;
if (onProgress) {
onProgress(result);
}
}
}
} catch (err) {
}
}
},
this._fetch
).catch((err) => {
const errMessageL = err.toString().toLowerCase();
if (result.text && (errMessageL === "error: typeerror: terminated" || errMessageL === "typeerror: terminated")) {
return resolve(result);
} else {
return reject(err);
}
});
});
nginx配置
在搭建過程中,我還遇到另一個(gè)坑。因?yàn)樽约褐虚g有一層nginx代理,而「nginx默認(rèn)開啟了緩存,所以導(dǎo)致流式輸出到nginx這個(gè)地方被緩存了」,最終前端拿到的數(shù)據(jù)是緩存后一次性輸出的。同時(shí)gzip也可能有影響。
這里可以通過nginx配置,把gzip和緩存都關(guān)掉。
gzip off;
location / {
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_cache off;
proxy_cache_bypass $http_pragma;
proxy_cache_revalidate on;
proxy_http_version 1.1;
proxy_buffering off;
proxy_pass http://xxx.com:1234;
}
proxy_cache 和 proxy_buffering 是 Nginx 的兩個(gè)重要的代理模塊。它們可以顯著提高代理服務(wù)器的性能和安全性。
- proxy_cache:通過這個(gè)模塊,Nginx 可以緩存代理服務(wù)器從后端服務(wù)器請(qǐng)求到的響應(yīng)數(shù)據(jù)。當(dāng)下一個(gè)客戶端請(qǐng)求相同的資源時(shí),Nginx 可以直接從緩存中返回響應(yīng),而不必去請(qǐng)求后端服務(wù)器。這大大降低了代理服務(wù)器的負(fù)載,同時(shí)也能提高客戶端訪問速度。需要注意的是,使用 proxy_cache 模塊時(shí)需要謹(jǐn)慎配置緩存策略,避免出現(xiàn)緩存不一致或者過期的情況。
- proxy_buffering:通過這個(gè)模塊,Nginx 可以將后端服務(wù)器響應(yīng)數(shù)據(jù)緩沖起來,并在完整的響應(yīng)數(shù)據(jù)到達(dá)之后再將其發(fā)送給客戶端。這種方式可以減少代理服務(wù)器和客戶端之間的網(wǎng)絡(luò)連接數(shù),提高并發(fā)處理能力,同時(shí)也可以防止后端服務(wù)器過早關(guān)閉連接,導(dǎo)致客戶端無法接收到完整的響應(yīng)數(shù)據(jù)。
實(shí)測(cè)只配置proxy_cache沒有用,配置了proxy_buffering后流式輸出才生效。