基于Celery、Redis和Florence 2實(shí)戰(zhàn)異步機(jī)器學(xué)習(xí)推理 原創(chuàng)
本文將通過(guò)一個(gè)最小但功能強(qiáng)大的實(shí)例教程,引導(dǎo)你進(jìn)入異步機(jī)器學(xué)習(xí)推理開(kāi)發(fā)領(lǐng)域。
簡(jiǎn)介
大多數(shù)機(jī)器學(xué)習(xí)服務(wù)教程都專注于實(shí)時(shí)同步服務(wù)的介紹,這允許對(duì)預(yù)測(cè)請(qǐng)求做出即時(shí)響應(yīng)。然而,這種方法可能難以應(yīng)對(duì)流量激增,對(duì)于長(zhǎng)時(shí)間運(yùn)行的任務(wù)來(lái)說(shuō)并不理想。因此,類似于這樣的任務(wù)還需要更強(qiáng)大的機(jī)器來(lái)快速響應(yīng);否則,一旦客戶端或服務(wù)器發(fā)生故障,預(yù)測(cè)結(jié)果通常會(huì)丟失。
在本文中,我們將演示如何使用分布式任務(wù)調(diào)度框架Celery和開(kāi)源分布式鍵值對(duì)數(shù)據(jù)庫(kù)Redis作為異步工作線程來(lái)運(yùn)行機(jī)器學(xué)習(xí)模型。試驗(yàn)中,我們將使用微軟開(kāi)源的統(tǒng)一視覺(jué)基礎(chǔ)模型Florence 2,這是一種以其令人印象深刻的性能而聞名的視覺(jué)語(yǔ)言模型。本教程將提供一個(gè)最小但功能強(qiáng)大的示例;當(dāng)然,您可以根據(jù)自己的實(shí)戰(zhàn)場(chǎng)景進(jìn)一步進(jìn)行調(diào)整和擴(kuò)展。
您可以在下面鏈接處查看該應(yīng)用程序的演示:
?https://coral-app-qdy8z.ondigitalocean.app/?
總體來(lái)看,我們提供的解決方案的核心基于Celery框架,這是一個(gè)支持我們實(shí)現(xiàn)客戶端/工作線程邏輯的Python庫(kù)。它允許我們將計(jì)算工作分配給許多工作線程,從而提高機(jī)器學(xué)習(xí)推理應(yīng)用場(chǎng)景對(duì)高負(fù)載和不可預(yù)測(cè)負(fù)載的可擴(kuò)展性。
總體運(yùn)行流程如下:
- 客戶端向代理管理的隊(duì)列(在我們的示例中為Redis)提交一個(gè)帶有一些參數(shù)的任務(wù)。
- 由一個(gè)(或多個(gè)工作線程)持續(xù)監(jiān)控隊(duì)列,并在任務(wù)到來(lái)時(shí)接收任務(wù)。然后,它執(zhí)行它們并將結(jié)果保存在后端存儲(chǔ)中。
- 客戶端可以通過(guò)輪詢后端或訂閱任務(wù)的通道,使用其id獲取任務(wù)的結(jié)果。
簡(jiǎn)化實(shí)例
讓我們從一個(gè)簡(jiǎn)化的例子開(kāi)始:
圖片由作者本人提供
首先,通過(guò)如下命令運(yùn)行Redis:
Docker run -p 6379:6379 redis
下面給出的是工作線程代碼:
from celery import Celery
#配置Celery以使用Redis作為代理和后端
app = Celery(
"tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0"
)
# 定義一個(gè)簡(jiǎn)單的任務(wù)
@app.task
def add(x, y):
return x + y
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info"])
相應(yīng)的客戶端代碼如下:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0", backend="redis://localhost:6379/0")
print(f"{app.control.inspect().active()=}")
task_name = "tasks.add"
add = app.signature(task_name)
print("Gotten Task")
#向工作線程發(fā)送一個(gè)任務(wù)
result = add.delay(4, 6)
print("Waiting for Task")
result.wait()
#得到結(jié)果
print(f"Result: {result.result}")
運(yùn)行上面代碼,將給出了我們期望的結(jié)果:“Result: 10”。
實(shí)戰(zhàn)案例
下面,我們繼續(xù)討論構(gòu)建一個(gè)真正的基于Florence 2模型服務(wù)的實(shí)用型案例。
具體地說(shuō),我們將構(gòu)建一個(gè)多容器圖像字幕應(yīng)用程序,該應(yīng)用程序使用Redis進(jìn)行任務(wù)排隊(duì),使用Celery進(jìn)行任務(wù)分發(fā),并使用本地卷或谷歌云存儲(chǔ)實(shí)現(xiàn)潛在的圖像存儲(chǔ)。該應(yīng)用程序的設(shè)計(jì)包含幾個(gè)核心組件:模型推理、任務(wù)分配、客戶端交互和文件存儲(chǔ)。
架構(gòu)概述
圖片由作者本人提供
各組件分工如下:
- 客戶端(Client):通過(guò)將圖像字幕請(qǐng)求發(fā)送給工作線程(通過(guò)代理)來(lái)發(fā)起圖像字幕請(qǐng)求。
- 工作線程(Worker):接收請(qǐng)求,下載圖像,使用預(yù)訓(xùn)練的模型進(jìn)行推理,并返回結(jié)果。
- 分布式鍵值對(duì)數(shù)據(jù)庫(kù)Redis:充當(dāng)消息代理,促進(jìn)客戶端和工作線程之間的通信。
- 文件存儲(chǔ):圖像文件的臨時(shí)存儲(chǔ)。
接下來(lái),我們進(jìn)行各組件功能的更具體的剖析。
1.模型推理(Model.py)
首先,實(shí)現(xiàn)依賴關(guān)系和初始化:
import os
from io import BytesIO
import requests
from google.cloud import storage
from loguru import logger
from modeling_florence2 import Florence2ForConditionalGeneration
from PIL import Image
from processing_florence2 import Florence2Processor
model = Florence2ForConditionalGeneration.from_pretrained(
"microsoft/Florence-2-base-ft"
)
processor = Florence2Processor.from_pretrained("microsoft/Florence-2-base-ft")
上面代碼完成的任務(wù)如下:
- 導(dǎo)入圖像處理、Web請(qǐng)求、谷歌云存儲(chǔ)交互和日志記錄所需的庫(kù)。
- 初始化預(yù)訓(xùn)練的Florence-2模型和處理器以生成圖像字幕。
然后,進(jìn)行圖像下載(Download_Image):
def download_image(url):
if url.startswith("http://") or url.startswith("https://"):
#處理HTTP/HTTPS URL
#…(從URL下載圖像的代碼)…
elif url.startswith("gs://"):
#處理谷歌云存儲(chǔ)路徑
#…(從GCS下載圖像的代碼)。
else:
#處理本地文件路徑
# ... (code to open image from local path) ...
歸納一下的話,上面代碼完成的任務(wù)如下:
- 從提供的URL下載圖像。
- 支持HTTP/HTTPS URL、谷歌云存儲(chǔ)路徑(gs://)和本地文件路徑。
接下來(lái),執(zhí)行推理(run_Inference):
def run_inference(url, task_prompt):
# …(使用donan_image函數(shù)下載圖像的代碼)。
try:
# …(打開(kāi)和處理圖像的代碼)。
inputs = processor(text=task_prompt, images=image, return_tensors="pt")
except ValueError:
#錯(cuò)誤處理
# …(使用模型生成字幕的代碼)。
generated_ids = model.generate(
input_ids=inputs["input_ids"],
pixel_values=inputs["pixel_values"],
#……(模型生成參數(shù))。
)
#…(解碼生成的字幕的代碼)。
generated_text = processor.batch_decode(generated_ids, skip_special_tokens=False)[0]
#…(后處理生成的字幕的代碼)。
parsed_answer = processor.post_process_generation(
generated_text, task=task_prompt, image_size=(image.width, image.height)
)
return parsed_answer
上面代碼實(shí)現(xiàn)了編排圖像字幕的過(guò)程,具體實(shí)現(xiàn)如下:
- 使用download_image下載圖像。
- 為模型準(zhǔn)備圖像和任務(wù)提示。
- 使用加載的Florence-2模型生成字幕。
- 對(duì)生成的字幕進(jìn)行解碼和后處理。
- 返回最終字幕。
2.任務(wù)分配(worker.py)
首先,進(jìn)行Celery設(shè)置:
import os
from celery import Celery
# ... 其他導(dǎo)入...
#從環(huán)境變量中獲取Redis URL或使用默認(rèn)值
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# 將Celery配置為使用Redis,作為代理和后端
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
# ... (Celery配置) ...
這段代碼完成的任務(wù)是:將Celery設(shè)置為使用Redis作為任務(wù)分發(fā)的消息代理。
接下來(lái),定義任務(wù)(inference_task):
@app.task(bind=True, max_retries=3)
def inference_task(self, url, task_prompt):
#……(日志記錄和錯(cuò)誤處理)。
return run_inference(url, task_prompt)
上面代碼具體實(shí)現(xiàn)了:
l 定義將由Celery工作線程執(zhí)行的推理任務(wù)。
l 此任務(wù)從model.py調(diào)用run_inference函數(shù)。
最后,執(zhí)行工作線程:
if __name__ == "__main__":
app.worker_main(["worker", "--loglevel=info", "--pool=solo"])
啟動(dòng)一個(gè)監(jiān)聽(tīng)并執(zhí)行任務(wù)的Celery工作線程。
3.客戶端交互(Client.py)
首先,實(shí)現(xiàn)Celery連接:
import os
from celery import Celery
#從環(huán)境變量中獲取Redis URL或使用默認(rèn)值
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
#將Celery配置為使用Redis作為代理和后端
app = Celery("tasks", broker=REDIS_URL, backend=REDIS_URL)
使用Redis作為消息代理建立與Celery的連接。
接下來(lái),進(jìn)行任務(wù)提交(send_inference_Task):
def send_inference_task(url, task_prompt):
task = inference_task.delay(url, task_prompt)
print(f"Task sent with ID: {task.id}")
# 等待結(jié)果
result = task.get(timeout=120)
return result
上述代碼完成了兩項(xiàng)任務(wù):
- 向Celery工作線程發(fā)送圖像字幕任務(wù)(推理任務(wù))。
- 等待工作線程完成任務(wù)并檢索結(jié)果。
再接下來(lái),實(shí)現(xiàn)Docker集成(Docker compose.yml)。
這一步主要是使用Docker Compose定義多容器設(shè)置:
- redis:運(yùn)行redis服務(wù)器進(jìn)行消息代理。
- model:構(gòu)建和部署模型推理工作線程。
- app:構(gòu)建和部署客戶端應(yīng)用程序。
此處花朵圖片由RoonZ nl在Unsplash(https://unsplash.com/photos/yellow-and-blue-petaled-flower-vjDbHCjHlEY?utm_content=creditCopyText&utm_medium=referral&utm_source=unsplash)上提供
- flower:運(yùn)行一個(gè)基于Web的Celery任務(wù)監(jiān)控工具。
圖片由作者本人提供
其實(shí),您可以使用以下一句命令運(yùn)行上面完整的棧操作:
docker-compose up
小結(jié)
至此,整個(gè)任務(wù)完成!歸納一下,我們剛剛探索了使用Celery、Redis和Florence 2構(gòu)建異步機(jī)器學(xué)習(xí)推理系統(tǒng)的全過(guò)程。具體地說(shuō),本文演示了如何有效地使用Celery進(jìn)行任務(wù)分配,使用Redis進(jìn)行消息代理,使用Florence 2模型進(jìn)行圖像字幕處理。通過(guò)采用異步工作流方案,您可以處理大量請(qǐng)求,提高性能,并增強(qiáng)ML推理應(yīng)用程序的整體彈性。最后,我們提供的Docker Compose設(shè)置允許您使用單個(gè)命令來(lái)自行運(yùn)行整個(gè)系統(tǒng)。
準(zhǔn)備好下一步操作了嗎?將本文介紹的這種架構(gòu)部署到云端可能會(huì)遇到一系列挑戰(zhàn)。
項(xiàng)目源碼地址:https://github.com/CVxTz/celery_ml_deploy
項(xiàng)目演示地址: https://coral-app-qdy8z.ondigitalocean.app/
譯者介紹
朱先忠,51CTO社區(qū)編輯,51CTO專家博客、講師,濰坊一所高校計(jì)算機(jī)教師,自由編程界老兵一枚。
原文標(biāo)題:Asynchronous Machine Learning Inference with Celery, Redis, and Florence 2,作者:Youness Mansar
鏈接:??https://towardsdatascience.com/asynchronous-machine-learning-inference-with-celery-redis-and-florence-2-be18ebc0fbab?。
