如何利用OpenAI、NATS和Streamlight徹底改變實(shí)時(shí)警報(bào)
譯文本文將介紹如何使用Streamlight、NATS和OpenAI這些非??岬墓ぞ撸赑ython中構(gòu)建一個(gè)全棧事件驅(qū)動的天氣警報(bào)聊天應(yīng)用程序。該應(yīng)用程序可以實(shí)時(shí)收集天氣信息,使用人工智能了解警報(bào)標(biāo)準(zhǔn),并將這些警報(bào)發(fā)送到用戶界面。
這段內(nèi)容和代碼示例對于那些希望了解現(xiàn)代實(shí)時(shí)警報(bào)系統(tǒng)如何與大型語言模型(LLM)協(xié)調(diào)工作以及如何實(shí)現(xiàn)的開發(fā)人員來說非常有幫助。
人們也可以采用GitHub上的源代碼自己進(jìn)行嘗試。
幕后的力量
以下了解人工智能天氣警報(bào)聊天應(yīng)用程序是如何工作的,并將原始數(shù)據(jù)轉(zhuǎn)換為可操作的警報(bào),實(shí)時(shí)了解天氣變化。應(yīng)用程序的核心是一個(gè)用Python實(shí)現(xiàn)的響應(yīng)式后端,由NATS提供支持,以確保實(shí)時(shí)數(shù)據(jù)處理和消息管理。集成OpenAI的GPT模型,使對話式人工智能能夠理解警報(bào)的性質(zhì),并響應(yīng)用戶的查詢。用戶可以使用自然語言指定他們的警報(bào)標(biāo)準(zhǔn), 然后GPT模型將對其進(jìn)行解釋。
圖1實(shí)時(shí)警報(bào)應(yīng)用架構(gòu)
實(shí)時(shí)數(shù)據(jù)采集
從后端自各種來源連續(xù)異步收集天氣數(shù)據(jù)開始。應(yīng)用程序現(xiàn)在使用api.weatherapi.com服務(wù),每10秒實(shí)時(shí)獲取一次天氣信息。這些數(shù)據(jù)包括全球各地溫度、濕度、降水等參數(shù)。這段代碼異步獲取愛沙尼亞當(dāng)前的天氣數(shù)據(jù),但應(yīng)用程序可以改進(jìn)為從用戶輸入動態(tài)設(shè)置位置:
async def fetch_weather_data():
api_url = f"http://api.weatherapi.com/v1/current.json?key={weather_api_key}&q=estonia"
try:
async with aiohttp.ClientSession() as session:
async with session.get(api_url) as response:
if response.status == 200:
return await response.json()
else:
logging.error(f"Error fetching weather data: HTTP {response.status}")
return None
except Exception as e:
logging.error(f"Error fetching weather data: {e}")
return None
NATS在數(shù)據(jù)流中的作用
backend.py文件中main()函數(shù)中的代碼段演示了NATS的集成,用于驅(qū)動消息傳遞、連續(xù)的天氣監(jiān)測和警報(bào)。使用NATS.py庫將NATS集成到Python代碼中。首先,在NATs://localhost:4222建立運(yùn)行在Docker中的NATS服務(wù)器的連接。
nats_client = await nats.connect("nats://localhost:4222")
然后,定義一個(gè)異步message_handler函數(shù)。該函數(shù)訂閱并處理聊天主題上從NATS服務(wù)器接收到的消息。如果消息以“Set Alert:”開頭(將其附加在前端),它將提取并更新用戶的警報(bào)標(biāo)準(zhǔn)。
async def message_handler(msg):
nonlocal user_alert_criteria
data = msg.data.decode()
if data.startswith("Set Alert:"):
user_alert_criteria = data[len("Set Alert:"):].strip()
logging.info(f"User alert criteria updated: {user_alert_criteria}")
await nats_client.subscribe("chat", cb=message_handler)
后端服務(wù)集成了天氣API和Open AI Chat Completion API等外部服務(wù)。如果同時(shí)存在天氣數(shù)據(jù)和用戶警報(bào)標(biāo)準(zhǔn),該應(yīng)用程序會為OpenAI的GPT模型構(gòu)建一個(gè)提示,以確定天氣是否符合用戶的標(biāo)準(zhǔn)。該提示要求人工智能根據(jù)用戶的標(biāo)準(zhǔn)分析當(dāng)前天氣,并以“是”或“否”和簡短的天氣摘要做出回應(yīng)。
一旦人工智能確定傳入的天氣數(shù)據(jù)符合用戶的警報(bào)標(biāo)準(zhǔn),它就會制作個(gè)性化的警報(bào)消息,并向NATS服務(wù)器上的chat_response主題發(fā)布天氣警報(bào),以更新前端應(yīng)用程序的最新變化。此消息包含用戶友好的通知,旨在通知和建議用戶。例如,它可能會提示,“小心!愛沙尼亞明天會下雨。別忘了帶傘!”
while True:
current_weather = await fetch_weather_data()
if current_weather and user_alert_criteria:
logging.info(f"Current weather data: {current_weather}")
prompt = f"Use the current weather: {current_weather} information and user alert criteria: {user_alert_criteria}. Identify if the weather meets these criteria and return only YES or NO with a short weather temperature info without explaining why."
response_text = await get_openai_response(prompt)
if response_text and "YES" in response_text:
logging.info("Weather conditions met user criteria.")
ai_response = f"Weather alert! Your specified conditions have been met. {response_text}"
await nats_client.publish("chat_response", payload=ai_response.encode())
else:
logging.info("Weather conditions did not meet user criteria.")
else:
logging.info("No current weather data or user alert criteria set.")await asyncio.sleep(10)
實(shí)時(shí)發(fā)送和接收警報(bào)
了解一下后端和前端之間的整體通信流程。
- 通過使用Streamlit構(gòu)建的簡單聊天界面(請參閱frontend.py文件),用戶可以使用自然語言輸入天氣警報(bào)標(biāo)準(zhǔn)并提交。
alert_criteria = st.text_input("Set your weather alert criteria", key="alert_criteria", disabled=st.session_state['alert_set'])
- Streamlit前端代碼通過NATS消息傳遞與后端服務(wù)交互。它將這些標(biāo)準(zhǔn)發(fā)布到聊天主題上的NATS服務(wù)器。
def send_message_to_nats_handler(message):
with NATSClient() as client:
client.connect()
client.publish("chat", payload=message.encode())
client.subscribe("chat_response", callback=read_message_from_nats_handler)
client.wait()
if set_alert_btn:
st.session_state['alert_set'] = True
st.success('Alert criteria set')
send_message_to_nats_handler(f"Set Alert: {alert_criteria}")
正如在前一節(jié)中看到的,后端服務(wù)監(jiān)聽聊天主題,接收標(biāo)準(zhǔn),獲取當(dāng)前天氣數(shù)據(jù),并使用人工智能來確定是否應(yīng)該觸發(fā)警報(bào)。如果滿足條件,后端服務(wù)將向chat_response主題發(fā)送警報(bào)消息。前端接收此消息并更新用戶界面(UI)以通知用戶。
def read_message_from_nats_handler(msg):
message = msg.payload.decode()
st.session_state['conversation'].append(("AI", message))
st.markdown(f"<span style='color: red;'></span> AI: {message}", unsafe_allow_html=True)
進(jìn)行嘗試
要詳細(xì)探索實(shí)時(shí)天氣警報(bào)聊天應(yīng)用程序并親自嘗試,可以訪問前面鏈接的GitHub存儲庫。該存儲庫包含所有必要的代碼、詳細(xì)的設(shè)置說明和幫助入門的附加文檔。在設(shè)置完成之后,就可以啟動Streamlit前端和Python后端。設(shè)置天氣警報(bào)標(biāo)準(zhǔn),并查看系統(tǒng)如何處理實(shí)時(shí)天氣數(shù)據(jù)以了解情況。
圖2警報(bào)應(yīng)用程序的Streamlight UI
建立流處理管道
實(shí)時(shí)天氣警報(bào)聊天應(yīng)用程序演示了NATS在分布式系統(tǒng)中用于實(shí)時(shí)消息傳遞的強(qiáng)大用例,允許在面向用戶的前端和數(shù)據(jù)處理后端之間進(jìn)行有效的通信。但是,應(yīng)該考慮幾個(gè)關(guān)鍵步驟,以確保呈現(xiàn)給用戶的信息是相關(guān)的、準(zhǔn)確的和可操作的。在應(yīng)用程序中,只是獲取實(shí)時(shí)的原始天氣數(shù)據(jù),并將其直接發(fā)送到OpenAI或前端。有時(shí),需要在數(shù)據(jù)到達(dá)外部服務(wù)之前對其進(jìn)行實(shí)時(shí)轉(zhuǎn)換,以便對其進(jìn)行過濾、豐富、聚合或規(guī)范化。需要開始考慮創(chuàng)建具有多個(gè)階段的流處理管道。
例如,并非從API獲取的所有數(shù)據(jù)都與每個(gè)用戶相關(guān),可以在初始階段過濾掉不必要的信息。此外,數(shù)據(jù)可以采用各種格式,特別是如果需要從多個(gè)API獲取信息以獲得全面警報(bào),這就需要對這些數(shù)據(jù)進(jìn)行規(guī)范化。在下一階段,使用額外的場景或原始數(shù)據(jù)的信息來豐富數(shù)據(jù),使其更有用。這可能包括將當(dāng)前天氣狀況與歷史數(shù)據(jù)進(jìn)行比較,以識別異常模式,或者使用另一個(gè)外部API添加基于位置的見解,例如針對特定地區(qū)天氣狀況的特定建議。在后期階段,可能會匯總每小時(shí)的溫度數(shù)據(jù),以給出白天的平均溫度或突出顯示白天達(dá)到的峰值溫度。
下一個(gè)步驟
當(dāng)涉及到在生產(chǎn)環(huán)境中轉(zhuǎn)換數(shù)據(jù)、部署、運(yùn)行和擴(kuò)展應(yīng)用程序時(shí),你可能希望使用Python中的專用框架(例如GlassFlow)來構(gòu)建復(fù)雜的流處理管道。GlassFlow為流處理提供了一個(gè)完全托管的無服務(wù)器基礎(chǔ)設(shè)施,不必考慮設(shè)置或維護(hù),應(yīng)用程序可以輕松處理大量數(shù)據(jù)和用戶請求。它提供了高級狀態(tài)管理功能,可以更輕松地跟蹤用戶警報(bào)標(biāo)準(zhǔn)和其他應(yīng)用程序狀態(tài)。而應(yīng)用程序可以根據(jù)其用戶群進(jìn)行擴(kuò)展,而不會影響性能。
原文標(biāo)題:Revolutionizing Real-Time Alerts With AI, NATS, and Streamlit,作者:Bobur Umurzokov
鏈接:https://dzone.com/articles/revolutionizing-real-time-alerts-with-ai-nats-and。