譯者 | 核子可樂(lè)
審校 | 重樓
將多個(gè)大語(yǔ)言模型集成至應(yīng)用程序當(dāng)中往往是項(xiàng)艱巨的挑戰(zhàn),各類不同API及通信協(xié)議的協(xié)同處理,以及如何確保請(qǐng)求路由的復(fù)雜性難題往往令人望而生畏。
好在可以使用消息代理與路由機(jī)制更優(yōu)雅地解決此類問(wèn)題,在解決痛點(diǎn)的同時(shí)實(shí)現(xiàn)多個(gè)關(guān)鍵優(yōu)勢(shì)。
本文將向大家介紹具體操作步驟。這里以KubeMQ為例,配合代碼示例來(lái)指導(dǎo)大家逐步建立一套可與OpenAI及Anthropic Claude交互的路由體系。
使用消息代理作為大模型路由工具的主要優(yōu)勢(shì)
1. 簡(jiǎn)化集成
通過(guò)使用消息代理作為路由機(jī)制,我們可以將不同大模型API交互所涉及的復(fù)雜性抽象出來(lái),從而簡(jiǎn)化客戶端代碼并降低出錯(cuò)幾率。
2. 多模型用例
消息代理能夠?qū)崿F(xiàn)多模型或?qū)iT用于不同任務(wù)的模型間的通信(如一個(gè)模型用于摘要,另一模型用于情緒分析)。其可以確保請(qǐng)求被有效路由至適當(dāng)模型,使得應(yīng)用程序能夠利用各模型的優(yōu)勢(shì)且無(wú)需額外開銷。
3. 批處理與大規(guī)模推理
對(duì)于需要批處理或大規(guī)模推理任務(wù)的應(yīng)用程序,消息代理通過(guò)在大模型繁忙或不可用時(shí)建立請(qǐng)求隊(duì)列,從而實(shí)現(xiàn)異步處理。這將確保不會(huì)丟失任何數(shù)據(jù)或請(qǐng)求,即使是在繁重的工作負(fù)載下也能提供可靠的處理響應(yīng)。
4. 冗余與回退保證
對(duì)于特別關(guān)注正常運(yùn)行時(shí)間的用例,消息代理可確保無(wú)縫回退至替代環(huán)境。例如,如果與提供OpenAI模型的某家云服務(wù)商發(fā)生連接失敗,KubeMQ可自動(dòng)切換至另一服務(wù)商。這樣的冗余設(shè)計(jì)保證AI不間斷操作,有助于增強(qiáng)服務(wù)可靠性與客戶滿意度。
5. 處理高流量應(yīng)用程序
消息代理能夠?qū)魅氲恼?qǐng)求分發(fā)至多個(gè)大模型實(shí)例或副本,防止過(guò)載并確保平衡運(yùn)行。這種負(fù)載均衡設(shè)計(jì)對(duì)于高流量應(yīng)用程序至關(guān)重要,可使其在不影響性能的前提下有效擴(kuò)展。
使用KubeMQ建立大模型路由機(jī)制:集成OpenAI與Claude
現(xiàn)在,我們將分步了解如何使用KubeMQ設(shè)置能夠與OpenAI和Anthropic Claude交互的路由機(jī)制。
全部示例代碼均保存在KubeMQ的GitHub repo當(dāng)中(https://github.com/kubemq-io/kubemq-llm-router)。
準(zhǔn)備工作
在開始之前,請(qǐng)確保你已安裝以下內(nèi)容:
- Python 3.7或更高版本。
- 本地安裝Docker。
- 擁有有效的OpenAI和Anthropic API密鑰。
- KubeMQ令牌(可從KubeMQ官網(wǎng)處獲取)。
- kubemq-cq Python包:
Plain Text
pip install kubemq-cq
- .env文件中包含你的AIP密鑰:
Plain Text
OPENAI_API_KEY=your_openai_api_key
ANTHROPIC_API_KEY=your_anthropic_api_key
設(shè)置KubeMQ
首先,我們需要確保KubeMQ能夠正常運(yùn)行。這里使用Docker進(jìn)行部署:
Shell
docker run -d --rm \
-p 8080:8080 \
-p 50000:50000 \
-p 9090:9090 \
-e KUBEMQ_TOKEN="your_token" \
kubemq/kubemq-community:latest
端口說(shuō)明:
- 8080 – 公開KubeMQ REST API
- 50000 – 打開 gRPC端口以進(jìn)行實(shí)施意見(jiàn)-服務(wù)器通信
- 9090 – 公開KubeMQ REST網(wǎng)關(guān)
注意: 將 your_token部分替換為你的真實(shí)KubeMQ令牌。
創(chuàng)建大模型路由服務(wù)器
大模型路由將充當(dāng)客戶端與大模型之間的中介,負(fù)責(zé)監(jiān)聽特定渠道的查詢并將其路由至適當(dāng)?shù)拇竽P汀?/span>
server.py
Python
import time
from kubemq.cq import Client, QueryMessageReceived, QueryResponseMessage, QueriesSubscription, CancellationToken
from langchain.chat_models import ChatOpenAI
from langchain.llms import Anthropic
import os
from dotenv import load_dotenv
import threading
load_dotenv()
class LLMRouter:
def __init__(self):
self.openai_llm = ChatOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
model_name="gpt-3.5-turbo"
)
self.claude_llm = Anthropic(
api_key=os.getenv("ANTHROPIC_API_KEY"),
model="claude-3"
)
self.client = Client(address="localhost:50000")
def handle_openai_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.openai_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def handle_claude_query(self, request: QueryMessageReceived):
try:
message = request.body.decode('utf-8')
result = self.claude_llm(message)
response = QueryResponseMessage(
query_received=request,
is_executed=True,
body=result.encode('utf-8')
)
self.client.send_response_message(response)
except Exception as e:
self.client.send_response_message(QueryResponseMessage(
query_received=request,
is_executed=False,
error=str(e)
))
def run(self):
def on_error(err: str):
print(f"Error: {err}")
def subscribe_openai():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="openai_requests",
on_receive_query_callback=self.handle_openai_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
def subscribe_claude():
self.client.subscribe_to_queries(
subscription=QueriesSubscription(
channel="claude_requests",
on_receive_query_callback=self.handle_claude_query,
on_error_callback=on_error,
),
cancel=CancellationToken()
)
threading.Thread(target=subscribe_openai).start()
threading.Thread(target=subscribe_claude).start()
print("LLM Router running on channels: openai_requests, claude_requests")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Shutting down...")
if __name__ == "__main__":
router = LLMRouter()
router.run()
說(shuō)明:
- 初始化。
A.為API密鑰加載環(huán)境變量。
B.初始化OpenAI和Anthropic大模型的客戶端。
C.設(shè)置KubeMQ客戶端。
- 處理查詢。
A.handle_openai_query和 handle_claude_query負(fù)責(zé)解碼傳入消息,將其傳遞給相應(yīng)大模型,而后發(fā)回響應(yīng)。
B.捕捉錯(cuò)誤并將 is_executed 標(biāo)記設(shè)置為 False。
- 訂閱。
A.此路由將訂閱兩個(gè)小道:openai_requests 和 claude_requests。
B.使用線程并行處理訂閱。
- 運(yùn)行服務(wù)器。
A.run方法啟動(dòng)訂閱并保持服務(wù)器運(yùn)行,直至中斷。
開發(fā)大模型客戶端
客戶端向大模型路由發(fā)送查詢,指定要使用的模型。
client.py
Python
from kubemq.cq import Client, QueryMessage
import json
class LLMClient:
def __init__(self, address="localhost:50000"):
self.client = Client(address=address)
def send_message(self, message: str, model: str) -> dict:
channel = f"{model}_requests"
response = self.client.send_query_request(QueryMessage(
channel=channel,
body=message.encode('utf-8'),
timeout_in_seconds=30
))
if response.is_error:
return {"error": response.error}
else:
return {"response": response.body.decode('utf-8')}
if __name__ == "__main__":
client = LLMClient()
models = ["openai", "claude"]
message = input("Enter your message: ")
model = input(f"Choose model ({'/'.join(models)}): ")
if model in models:
response = client.send_message(message, model)
if "error" in response:
print(f"Error: {response['error']}")
else:
print(f"Response: {response['response']}")
else:
print("Invalid model selected")
說(shuō)明:
- 初始化。
A.設(shè)置KubeMQ客戶端。
- 發(fā)送消息。
A.send_message 方法根據(jù)所選模型構(gòu)建適當(dāng)通道。
B.向路由發(fā)送查詢消息并等待響應(yīng)。
C.處理錯(cuò)誤并解碼響應(yīng)主體。
- 用戶交互。
A.提示用戶輸入消息并選擇模型。
B.從大模型處輸出響應(yīng)。
通過(guò)REST發(fā)送和接收
對(duì)于傾向或需要RESTful通信的服務(wù)或客戶端,KubeMQ亦可提供REST端點(diǎn)。
通過(guò)REST發(fā)送請(qǐng)求
端點(diǎn):
Plain Text
POST http://localhost:9090/send/request
標(biāo)頭:
Plain Text
Content-Type: application/json
實(shí)體:
JSON
{
"RequestTypeData": 2,
"ClientID": "LLMRouter-sender",
"Channel": "openai_requests",
"BodyString": "What is the capital of France?",
"Timeout": 30000
}
負(fù)載細(xì)節(jié):
- RequestTypeData – 指定請(qǐng)求類型(查詢?yōu)?)。
- ClientID – 發(fā)送請(qǐng)求的客戶端標(biāo)識(shí)符。
- Channel – 與大模型(openai_requests或claude_requests)對(duì)應(yīng)的通道。
- BodyString – 要發(fā)送至大模型的消息。
- Timeout – 等待響應(yīng)的時(shí)間(單位為毫秒)。
接收響應(yīng)
響應(yīng)是一個(gè)包含大模型輸出或錯(cuò)誤消息的JSON對(duì)象。
總結(jié)
在消息代理(KubeMQ)的幫助下,我們建立起可擴(kuò)展且高效的路由機(jī)制,能夠與多個(gè)大模型進(jìn)行交互。此設(shè)置允許客戶端無(wú)縫向不同模型發(fā)送查詢,并可擴(kuò)展以引入更多模型或功能。
這種方法的好處包括:
- 簡(jiǎn)化集成。大家可以將與不同大模型API交互與涉及的復(fù)雜性抽象出來(lái),簡(jiǎn)化客戶端代碼并降低出錯(cuò)幾率。
- 多模型支持。有效將請(qǐng)求路由至專門用于不同任務(wù)的適當(dāng)模型。
- 可靠性。確保在大模型繁忙或不可用時(shí),數(shù)據(jù)不致丟失。
- 冗余。提供后備機(jī)制以保持不間斷操作。
- 可擴(kuò)展性。通過(guò)在多個(gè)大模型實(shí)例間分配請(qǐng)求以應(yīng)對(duì)高流量需求。