超實(shí)用 Demo:使用 FastAPI、Celery、RabbitMQ 和 MongoDB 實(shí)現(xiàn)一個(gè)異步任務(wù)工作流
異步任務(wù),是 Web 開(kāi)發(fā)中經(jīng)常遇到的問(wèn)題,比如說(shuō)用戶提交了一個(gè)請(qǐng)求,雖然這個(gè)請(qǐng)求對(duì)應(yīng)的任務(wù)非常耗時(shí),但是不能讓用戶等在這里,通常需要立即返回結(jié)果,告訴用戶任務(wù)已提交。任務(wù)可以在后續(xù)慢慢完成,完成后再給用戶發(fā)一個(gè)完成的通知。
今天分享一份代碼,使用 Celery、RabbitMQ 和 MongoDB 實(shí)現(xiàn)一個(gè)異步任務(wù)工作流,你可以修改 task.py 來(lái)實(shí)現(xiàn)你自己的異步任務(wù)。
架構(gòu)圖如下:
其中 Celery 來(lái)執(zhí)行異步任務(wù),RabbitMQ 作為消息隊(duì)列,MongoDB 存儲(chǔ)任務(wù)執(zhí)行結(jié)果,F(xiàn)astAPI 提供 Web 接口。
以上所有模塊均可使用 Docker 一鍵部署。
下面為 Demo 使用方法:
1、確保本機(jī)已安裝 Docker、Git
2、下載源代碼:
git clone https://github.com/aarunjith/async-demo.git
3、部署并啟動(dòng):
cd async-demo
docker compose up --build
4、啟動(dòng)一個(gè)異步任務(wù):
$ curl -X POST http://localhost:8080/process
任務(wù)會(huì)發(fā)送到消息隊(duì)列,同時(shí)會(huì)立即返回一個(gè)任務(wù) id:
? curl -X POST http://localhost:8080/process
{"status":"PENDING","id":"a129c666-7b5b-45f7-ba54-9d7b96a1fe58","error":""}%
5、查詢?nèi)蝿?wù)狀態(tài):
curl -X POST http://localhost:8080/check_progress/<task_id>
任務(wù)完成后的返回結(jié)果如下:
? curl -X POST http://localhost:8080/check_progress/a129c666-7b5b-45f7-ba54-9d7b96a1fe58
{"status":"SUCEESS","data":"\"hello\""}%
代碼目錄結(jié)構(gòu)如下:
其中 app.py 如下:
from fastapi import FastAPI
from celery.result import AsyncResult
from tasks import start_processing
from loguru import logger
from pymongo import MongoClient
import uvicorn
# Lets create a connection to our backend where celery stores the results
client = MongoClient("mongodb://mongodb:27017")
# Default database and collection names that Celery create
db = client['task_results']
coll = db["celery_taskmeta"]
app = FastAPI()
@app.post('/process')
async def process_text_file():
'''
Process endpoint to trigger the start of a process
'''
try:
result = start_processing.delay()
logger.info(f'Started processing the task with id {result.id}')
return {
"status": result.state,
'id': result.id,
'error': ''
}
except Exception as e:
logger.info(f'Task Execution failed: {e}')
return {
"status": "FAILURE",
'id': None,
'error': e
}
@app.post('/check_progress/{task_id}')
async def check_async_progress(task_id: str):
'''
Endpoint to check the task progress and fetch the results if the task is
complete.
'''
try:
result = AsyncResult(task_id)
if result.ready():
data = coll.find({'_id': task_id})[0]
return {'status': 'SUCEESS', 'data': data['result']}
else:
return {"status": result.state, "error": ''}
except Exception as e:
data = coll.find({'_id': task_id})[0]
if data:
return {'status': 'SUCEESS', 'data': data['result']}
return {'status': 'Task ID invalid', 'error': e}
if __name__ == "__main__":
uvicorn.run("app:app", host='0.0.0.0', port='8080')
如果要實(shí)現(xiàn)自己的任務(wù)隊(duì)列,就修改 task.py 來(lái)添加自己的異步任務(wù),可以整合到自己的項(xiàng)目中。