自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

解密 SSE,像 ChatGPT 一樣返回流式響應(yīng)

開發(fā) 前端
當(dāng)完成握手之后,后續(xù)傳輸?shù)臄?shù)據(jù)就不再是 HTTP 報文,而是 WebSocket 格式的二進制幀。所以這兩者完全是不同的協(xié)議,那有沒有一種辦法,我們?nèi)匀皇褂?HTTP 協(xié)議,同時還能讓服務(wù)端主動推送數(shù)據(jù)呢?

我們知道目前的 HTTP/1.1 采用的是標(biāo)準的請求-響應(yīng)模型,客戶端主動發(fā)請求,服務(wù)端被動地返回響應(yīng)。這種模型在客戶端需要實時獲取結(jié)果的場景下是不合適的,因為這意味著客戶端需要不斷地輪詢,所以最好的做法是服務(wù)端生成結(jié)果之后,主動推送給客戶端。

比如 ChatGPT,它在生成內(nèi)容時,也是生成一部分,就主動向客戶端推送一部分。而在這個過程中,客戶端不需要做任何事情,只需等待 ChatGPT 服務(wù)端返回內(nèi)容即可。

說到這兒,你肯定想到了 WebSocket,沒錯這是一種解決方案。但 WebSocket 太重了,它和 HTTP 都是基于 TCP 的應(yīng)用層傳輸協(xié)議,只不過在握手的時候搭了 HTTP 的便車,利用 HTTP 本身的協(xié)議升級特性,偽裝成 HTTP,這樣就能繞過瀏覽器沙箱、網(wǎng)絡(luò)防火墻等限制。

當(dāng)完成握手之后,后續(xù)傳輸?shù)臄?shù)據(jù)就不再是 HTTP 報文,而是 WebSocket 格式的二進制幀。所以這兩者完全是不同的協(xié)議,那有沒有一種辦法,我們?nèi)匀皇褂?HTTP 協(xié)議,同時還能讓服務(wù)端主動推送數(shù)據(jù)呢?

答案是有的,也就是本文將要介紹的 SSE 技術(shù),它的英文全稱是 Server-Sent Events(服務(wù)端推送事件)。通過 SSE 可以讓服務(wù)端即時推送數(shù)據(jù)到客戶端,而不需要客戶端輪詢服務(wù)端以獲取更新。

圖片圖片

到這你可能會問,那 WebSocket 和 SSE 有什么區(qū)別呢?

1)通信方式

WebSocket 提供全雙工通信,服務(wù)端和客戶端都可以在同一個連接上同時發(fā)送和接收數(shù)據(jù)。最重要的是,WebSocket 獨立于 HTTP 協(xié)議,盡管它開始于一個 HTTP 握手。

SSE 僅提供服務(wù)端到客戶端的單向通信,客戶端不能通過 SSE 給服務(wù)端發(fā)信息。

2)協(xié)議和實現(xiàn)

WebSocket 使用自己的協(xié)議(ws:// 或 wss://),需要服務(wù)端和客戶端都支持,并且協(xié)議比較復(fù)雜。

SSE 則是使用標(biāo)準的 HTTP 協(xié)議,實現(xiàn)起來更簡單,尤其是在服務(wù)端。

3)適用場景

WebSocket 適用于服務(wù)端和客戶端之間雙向?qū)崟r通信的場景,如在線游戲、聊天應(yīng)用等。

SSE 適用于服務(wù)端向客戶端單向推送數(shù)據(jù)的場景,如消息通知、數(shù)據(jù)更新。并且 SSE 自動支持斷線重連,而 WebSocket 則需要額外部署。

4)復(fù)雜性和資源使用

WebSocket 由于其雙向通信的能力,通常比 SSE 更復(fù)雜,可能需要更多的資源來維護和管理連接。

SSE 因為其單向性和基于 HTTP 的特性,它可以利用現(xiàn)有的網(wǎng)絡(luò)基礎(chǔ)設(shè)施,如代理服務(wù)器、負載均衡器和防火墻等等,通常更容易實現(xiàn)和維護。


相信現(xiàn)在你已經(jīng)明白 SSE 是做什么的了,它的目的就是讓服務(wù)端能夠主動推送數(shù)據(jù)給客戶端。如果不需要和服務(wù)端動態(tài)交互,只是希望服務(wù)端在有數(shù)據(jù)的時候推過來,那么 WebSocket 就有些太重了,因為這意味著要替換 HTTP 協(xié)議,而使用 SSE 無疑是更好的選擇。

SSE 是什么我們已經(jīng)知道了,那它是怎么實現(xiàn)的呢?原理是什么呢?

1)建立連接

客戶端發(fā)起一個標(biāo)準的 HTTP 請求來開啟 SSE 會話,這個請求的特殊之處在于它包含一個頭字段。

Accept: text/event-stream

相當(dāng)于客戶端告訴服務(wù)端,期望接收 SSE 消息流。而服務(wù)端在看到該字段時,也知道這是一個 SSE 請求,于是立即向客戶端返回響應(yīng)頭,注意:返回的只有響應(yīng)頭,里面會包含如下頭字段。

Content-Type: text/event-stream

響應(yīng)頭返回之后標(biāo)志著 SSE 連接成功建立,并且連接會保持開放狀態(tài),服務(wù)端后續(xù)可以隨時通過此連接向客戶端發(fā)送數(shù)據(jù)。此外當(dāng)連接不小心斷開時,客戶端也會自動進行重連。

所以在普通的 HTTP 請求中,一旦服務(wù)端返回,那么請求結(jié)束了。雖然可以將 Connection 頭字段設(shè)置為 keep-alive 保證連接不斷開,但每次訪問都包含了 HTTP 請求/響應(yīng)的完整過程。

而在 SSE 中,服務(wù)端會保持一個開放的連接,只要有新數(shù)據(jù)可用,就會直接發(fā)送給客戶端。所以服務(wù)端會將響應(yīng)以流的形式發(fā)送給客戶端,每次發(fā)送的消息都是響應(yīng)流的一部分,而不是獨立的 HTTP 響應(yīng)。

因此 SSE 的服務(wù)端在發(fā)送數(shù)據(jù)時,并不遵循傳統(tǒng)的一次請求,一次響應(yīng)模式。它在建立連接之后會保持連接開放,并通過這個持續(xù)的連接流式地發(fā)送數(shù)據(jù),這種方式就使得 SSE 非常適合實時數(shù)據(jù)推送的場景。

2)發(fā)送消息

客戶端發(fā)送請求,服務(wù)端返回響應(yīng)頭之后,SSE 連接就建立成功了。此時客戶端只需要躺平,安靜地等待服務(wù)端的輸出即可。所以現(xiàn)在的關(guān)鍵就在于服務(wù)端要返回什么格式的數(shù)據(jù)呢?很簡單,一個基本的消息由以下幾部分組成:

  • data:實際的消息數(shù)據(jù);
  • id:可選,消息的唯一標(biāo)識符,用于在連接重新建立時同步消息;
  • event:可選,定義事件類型,用于客戶端區(qū)分消息的類型;
  • retry:可選,自動重連的時間(毫秒),如果連接中斷,客戶端在自動重新連接之前,需要等待多長時間;

注意:每個消息要以兩個換行符(\n\n)結(jié)束,舉個例子,我們發(fā)送一個 Hello World。

data: Hello World\n\n

也可以發(fā)送帶有事件類型的消息:

event: userUpdate
data: {"username": "Serpen", "age": 18}\n\n

還是比較簡單的,服務(wù)端可以保持連接并隨時發(fā)送更多數(shù)據(jù)。然后客戶端在收到時會進行處理,但不需要(也不能)對服務(wù)端作出任何回應(yīng),它只需要被動地接收來自服務(wù)端的數(shù)據(jù)即可。當(dāng)服務(wù)端認為數(shù)據(jù)已經(jīng)全部發(fā)送完畢、無需再發(fā)時,那么便可以主動斷開連接。

關(guān)于 SSE 的原理我們就解釋清楚了,下面來實際編程實現(xiàn)它,這里我們先使用原生的 asyncio 實現(xiàn) SSE。

import asyncio
from asyncio import StreamReader, StreamWriter

class SSE:

    def __init__(self, host="0.0.0.0", port=9999):
        self.host = host
        self.port = port

    @staticmethod
    def parse_request_headers(data: bytes) -> dict:
        """
        此函數(shù)負責(zé)從原始字節(jié)流中解析出請求頭
        """
        headers = data.split(b"\r\n\r\n")[0].split(b"\r\n")
        header_dict = {}
        for header in headers[1:]:
            key, val = header.decode("utf-8").split(":", 1)
            header_dict[key.lower()] = val.strip()
        return header_dict

    async def handler_requests(self,
                               reader: StreamReader,
                               writer: StreamWriter):
        """
        負責(zé)處理來自客戶端的請求
        每來一個客戶端連接,就會基于此函數(shù)創(chuàng)建一個協(xié)程
        并且自動傳遞兩個參數(shù):reader 和 writer
        reader.read  負責(zé)讀取數(shù)據(jù),等價于 socket.recv
        writer.write 負責(zé)發(fā)送數(shù)據(jù),等價于 socket.send
        """
        # 獲取客戶端的請求報文,這里對請求方法、請求地址不做限制
        data = await reader.readuntil(b"\r\n\r\n")
        # 解析出請求頭
        request_headers = self.parse_request_headers(data)
        # 簡單檢測一下 accept 字段,如果不是建立 SSE,那么直接關(guān)閉連接
        if request_headers.get("accept") != "text/event-stream":
            writer.close()
            return await writer.wait_closed()
        # 如果是 SSE 連接,那么返回響應(yīng)頭
        response_header = (
            b"HTTP/1.1 200 OK\r\n"
            b"Content-Type: text/event-stream\r\n"
            b"Cache-Control: no-cache\r\n"
            b"Connection: keep-alive\r\n"
            b'Access-Control-Allow-Origin: *\r\n'
            b"\r\n"
        )
        writer.write(response_header)
        await writer.drain()

        # 然后便可以不斷地向客戶端返回數(shù)據(jù)了
        for _ in range(5):
            # 每隔 1 秒返回數(shù)據(jù)
            data = "data: 高老師總能分享出好東西\r\n\r\n".encode("utf-8")
            writer.write(data)
            await writer.drain()
            await asyncio.sleep(1)
        # 數(shù)據(jù)傳輸完畢
        writer.close()
        await writer.wait_closed()

    async def __create_server(self):
        # 創(chuàng)建服務(wù),第一個參數(shù)是一個回調(diào)函數(shù)
        # 當(dāng)連接過來的時候就會根據(jù)此函數(shù)創(chuàng)建一個協(xié)程
        # 后面是綁定的 ip 和 端口
        server = await asyncio.start_server(self.handler_requests,
                                            self.host,
                                            self.port)
        # 然后開啟無限循環(huán)
        async with server:
            await server.serve_forever()

    def run_server(self):
        loop = asyncio.get_event_loop()
        loop.run_until_complete(self.__create_server())

if __name__ == '__main__':
    sse = SSE()
    sse.run_server()

服務(wù)端代碼編寫完畢,下面編寫前端代碼。

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
    <style>
        #data {
            font-weight: bold;
            color: cadetblue;
            font-size: large;
        }
    </style>
</head>
<body>
    <h1>SSE Test</h1>
    <div id="data"></div>
    <script>
        document.addEventListener("DOMContentLoaded", function () {
            // 和服務(wù)端建立 SSE 連接
            var eventSource = new EventSource("http://localhost:9999");

            eventSource.onmessage = function (e) {
                // 將數(shù)據(jù)渲染在 <div id="data"></div> 的內(nèi)部
                var data = e.data + "\n";
                document.getElementById('data').innerText += data;
            };

            eventSource.onerror = function (e) {
                console.error('Error occurred:', e);
                eventSource.close();
            };
        });
    </script>
</body>
</html>

代碼編寫完畢,我們用瀏覽器打開 HTML 文件,便可看到如下效果。

圖片圖片

以上我們就簡單實現(xiàn)了 SSE,當(dāng)然為了加深印象,這里的后端是使用原生的 asyncio 編寫的,但在工作中,我們會使用現(xiàn)成的 Web 框架,比如 FastAPI,Blacksheep 等等。

需要說明的是,雖然通過 SSE 技術(shù)可以實現(xiàn)類似 ChatGPT 的效果,但 ChatGPT 內(nèi)部并沒有用到 SSE,它內(nèi)部是基于 HTTP 的分塊傳輸實現(xiàn)的。因為 SSE 只能通過 GET 請求發(fā)出,并且無法自定義請求頭。

如果想實現(xiàn) ChatGPT 的效果,需要使用 HTTP 的分塊傳輸。而像 FastAPI、BlackSheep 等框架提供的流式響應(yīng),便是基于 HTTP 的分塊傳輸實現(xiàn)的,比如 FastAPI:

import asyncio
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

app = FastAPI()
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回數(shù)據(jù)
        data = "data: 高老師總能分享出好東西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.get("/")
async def sse():
    return StreamingResponse(event_generator(), 
                             media_type="text/event-stream")

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

首先之前的前端代碼依舊可以正常訪問,通過修改數(shù)據(jù)格式和 Content-Type 可以讓其支持 SSE。但最正確的做法是直接訪問 localhost:9999,效果如下:

圖片圖片

所以基于 StreamingResponse 可以實現(xiàn) SSE,也可以直接訪問。而直接訪問的話,此時里面的 data: 和 \r\n 就是實體數(shù)據(jù)的一部分。并且這種方式和 ChatGPT 的工作機制是相似的,都使用了 HTTP 的分塊傳輸,支持所有的請求方法,而 SSE 只支持 GET 請求。

BlackSheep 也是類似的,它同樣也支持流式響應(yīng)。

import asyncio
from blacksheep import Application, Response, StreamedContent
import uvicorn

app = Application()
app.use_cors(
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

async def event_generator():
    for _ in range(5):
        # 每隔 1 秒返回數(shù)據(jù)
        data = "data: 高老師總能分享出好東西\r\n\r\n".encode("utf-8")
        yield data
        await asyncio.sleep(1)

@app.router.get("/")
async def sse():
    return Response(
        200,
        cnotallow=StreamedContent(b"text/event-stream", event_generator),
    )

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=9999)

可以測試一下,效果是一樣的。如果你不想實現(xiàn) SSE,只是希望固定的數(shù)據(jù)以流的形式一點一點返回,那么記得將數(shù)據(jù)中多余的 data: 和 \r\n 給去掉,并最好修改 Content-Type 為合適的類型。

所以 SSE 一般用于需要服務(wù)端推數(shù)據(jù),但數(shù)據(jù)不知道什么時候會過來,于是通過 SSE 保持連接開放。后續(xù)當(dāng)服務(wù)端有數(shù)據(jù)了,直接通過連接發(fā)送給客戶端即可。

而 FastAPI 和 BlackSheep 提供的流式響應(yīng)更像是,返回的數(shù)據(jù)比較龐大,如果全部準備好再一次性返回,會讓用戶陷入長時間的等待,造成不好的體驗。于是通過分塊傳輸,準備好一部分就返回一部分。雖然整體時間沒變,但可以讓用戶立刻獲取到數(shù)據(jù),從而提升用戶體驗。

比如 ChatGPT,當(dāng)它回答的內(nèi)容比較多的時候,那么整個過程耗費幾十秒鐘是常有的事情,假設(shè) 30 秒。相比讓用戶等待 30 秒,然后內(nèi)容一下子刷出來,顯然生成一部分返回一部分這種方式更讓人喜歡。

因此使用 SSE 還是流式響應(yīng),則取決于你當(dāng)前的業(yè)務(wù)。如果你返回的數(shù)據(jù)是確定的,只是準備的時間比較長,或者數(shù)據(jù)量比較大,那么推薦使用流式響應(yīng)。

至于 SSE,在這些現(xiàn)成的 Web 框架里面,也可以通過流式響應(yīng)來實現(xiàn),只需要將 Content-Type 設(shè)置為 text/event-stream,并將數(shù)據(jù)加上前綴 data: 和后綴 \r\n\r\n。

但說實話,如果想實現(xiàn) SSE,不建議通過流式響應(yīng)來實現(xiàn),而是使用專門的庫。以 FastAPI 為例:

from sse_starlette.sse import EventSourceResponse

FastAPI 其實就是在 starlette 的基礎(chǔ)上套了一層殼,通過安裝 sse_starlette 可以讓 FastAPI 更好地支持 SSE。

以上就是本文的內(nèi)容,如果對你有幫助,就點個贊吧。

責(zé)任編輯:武曉燕 來源: 古明地覺的編程教室
相關(guān)推薦

2021-08-12 06:08:15

CSS 技巧組件狀態(tài)

2023-04-05 14:19:07

FlinkRedisNoSQL

2013-12-17 09:02:03

Python調(diào)試

2022-12-21 15:56:23

代碼文檔工具

2023-05-23 13:59:41

RustPython程序

2013-12-31 09:19:23

Python調(diào)試

2015-03-16 12:50:44

2013-08-22 10:17:51

Google大數(shù)據(jù)業(yè)務(wù)價值

2021-05-20 08:37:32

multiprocesPython線程

2011-01-18 10:45:16

喬布斯

2012-06-08 13:47:32

Wndows 8Vista

2015-02-05 13:27:02

移動開發(fā)模塊SDK

2024-04-01 00:05:00

ChatGPTSSE

2023-02-23 15:35:14

人工智能ChatGPT聊天機器人

2012-03-21 10:15:48

RIM越獄

2021-09-07 10:29:11

JavaScript模塊CSS

2017-05-22 10:33:14

PythonJuliaCython

2021-12-14 19:40:07

Node路由Vue

2012-06-14 09:48:11

OpenStackLinux

2015-04-09 11:27:34

點贊
收藏

51CTO技術(shù)棧公眾號