討論一個(gè)技術(shù)問題,大模型流式返回 原創(chuàng)
?“ 技術(shù)上最容易犯的錯(cuò)就是經(jīng)驗(yàn)主義,以及拿來主義”
最近在對(duì)接GPT實(shí)現(xiàn)一個(gè)功能,具體功能就不說了;主要是這個(gè)功能需要流式返回,因此踩了一些坑;所以就在此記錄一下。至于什么是流式返回,不清楚的可以自己問度娘。
大模型流式返回帶來的問題
自chatGPT推出以來,其一個(gè)字一個(gè)字的出現(xiàn),就像一個(gè)打字機(jī);這效果驚艷了很多人,因此在很多場景下很多人都會(huì)選擇打字機(jī)的效果。而打字機(jī)效果背后的實(shí)現(xiàn)就是流式返回。
對(duì)技術(shù)有過了解的人應(yīng)該都知道,正常情況下接口是在所有業(yè)務(wù)處理完成之后一起返回;但流式返回是分多批次返回。簡單來說就是處理了一部分就返回一部分,不用等全部完成之后再返回。
如下圖所示就是一個(gè)典型的流式返回:
那目前流式返回所遇到的問題是什么呢?
其實(shí)從后端的角度來說,流式返回沒有任何問題;不論是使用大模型官方提供的SDK亦或者是調(diào)用他們的接口,都是正常的流式返回。但問題是,調(diào)用第三方接口的目的是為了完成業(yè)務(wù)功能,因此怎么把這個(gè)流式返回也用流式返回給前端就是一個(gè)需要思考的問題了。
從web開發(fā)的角度來說,現(xiàn)在前后端交互主要使用的是http協(xié)議;但http協(xié)議是前端向后端發(fā)起請(qǐng)求,而不能從后端向前端發(fā)起請(qǐng)求;為了解決這個(gè)問題,因此就有了websocket和SSE協(xié)議。
這兩個(gè)協(xié)議的區(qū)別是websocket是全雙工的,而SSE是半雙工的;意思就是說,websocket建立連接之后,前端可以主動(dòng)向后端發(fā)消息,后端也可以主動(dòng)向前端發(fā)消息;而SSE是只能后端向前端發(fā)消息。
但不論是websocket還是SSE協(xié)議,本質(zhì)上只是一種通訊協(xié)議,和業(yè)務(wù)沒什么具體的關(guān)系;這就類似于,搞貨運(yùn)的目的是把貨物安全的送到目的地,至于你是用汽車運(yùn),還是用火車運(yùn)都可以。
那問題出在哪里呢?
剛開始我們使用的是websocket作為流式返回的通訊工具;但再實(shí)際使用中才發(fā)現(xiàn)一個(gè)很大的問題,那就是websocket無法在短時(shí)間內(nèi)接受大量的網(wǎng)絡(luò)傳輸需求;一旦過量就會(huì)導(dǎo)致websocket緩沖區(qū)溢出,也就是TEXT_FULL_WRITRING異常;簡單來說就是,websocket為了減輕網(wǎng)絡(luò)壓力,每次發(fā)送消息都會(huì)先把緩沖區(qū)寫滿;然后再一次性發(fā)送。
但由于流式返回速度較快,有時(shí)候websocket上一條消息還沒發(fā)送出去,下一條新的數(shù)據(jù)又進(jìn)來了;因此就會(huì)導(dǎo)致websocket報(bào)錯(cuò),即使使用的是異步發(fā)送也會(huì)報(bào)錯(cuò)。
public void sendText(String text) {
for(Session session : sessions.values()){
if (session.isOpen()) {
try {
//異步發(fā)送
session.getAsyncRemote().sendText(text);
} catch (Exception e) {
log.error("發(fā)送會(huì)話異常");
}
}else{
log.error("socket 在不可發(fā)送狀態(tài)");
}
}
}
為了解決這個(gè)問題, 因此就在網(wǎng)上查了一下發(fā)現(xiàn);類似于這種流式返回,大部分人的處理方式都是用SSE協(xié)議;因?yàn)镾SE協(xié)議相對(duì)websocket更簡單,效率更高。而在java語言中,使用SSE有兩種方式,第一種就是自己手動(dòng)創(chuàng)建SSE對(duì)象,使用SseEmitter 對(duì)象來實(shí)現(xiàn)。
但這種原生的實(shí)現(xiàn)方式存在很多問題,比如需要自己去控制sse與用戶的關(guān)聯(lián)關(guān)系,sse的狀態(tài)判斷,自動(dòng)重連等等。
因此,springboot就提供了另一種方式,那就是Flux流式處理。
OpenAIClient client = new OpenAIClientBuilder().credential(new AzureKeyCredential(key)).endpoint(endPoint).buildClient();
IterableStream<ChatCompletions> stream = client.getChatCompletionsStream(modelName, new ChatCompletionsOptions(messages));
StringBuffer stringBuffer = new StringBuffer();
return Flux.<String>create(sink -> {
stream.iterator().forEachRemaining(
chatCompletions -> {
if (chatCompletions.getChoices() != null && chatCompletions.getChoices().size() > 0) {
if (chatCompletions.getChoices().get(0).getDelta() != null) {
String content = chatCompletions.getChoices().get(0).getDelta().getContent();
log.info(content);
if (content != null) {
stringBuffer.append(content);
sink.next(content);
}
}
}
}
);
sink.complete();
}).map(data -> ServerSentEvent.<String>builder().data(data).build())
// 每隔一段時(shí)間發(fā)送一個(gè)字符
.delayElements(Duration.ofMillis(10))
// 停止
.takeWhile(i -> !redisUtil.hasKey(stopKey))
// 最后執(zhí)行
.doOnComplete(() -> {
//傳輸完成 業(yè)務(wù)處理
});
如上所示,F(xiàn)lux通過sink封裝大模型的流式返回,然后調(diào)用next方法主動(dòng)把數(shù)據(jù)返回給前端,以此達(dá)到流式效果。
雖然從操作上來說,各種技術(shù)已經(jīng)逐漸成熟,我們都可以直接拿來主義,拿過來用就好了;但實(shí)際上存在的一個(gè)問題就是,當(dāng)你不知道其原理,又沒有經(jīng)驗(yàn)時(shí),你還是會(huì)踩很多坑。
?
本文轉(zhuǎn)載自公眾號(hào)AI探索時(shí)代 作者:DFires
