如何快速創(chuàng)建一個(gè)擁有異步任務(wù)隊(duì)列集群的 Rest Api
異步任務(wù)是 Web 后端開發(fā)中最常見的需求,非常適合多任務(wù)、高并發(fā)的場(chǎng)景。本文分享如何使用 docker-compose、FastAPI、rq 來(lái)快速創(chuàng)建一個(gè)包含異步任務(wù)隊(duì)列集群的 REST API,后端執(zhí)行任務(wù)的節(jié)點(diǎn)可以隨意擴(kuò)展。
系統(tǒng)的架構(gòu)圖:
上圖中的每一個(gè)方框都可以理解為一個(gè)服務(wù)器。
用戶請(qǐng)求 api, api 將任務(wù)放入 redis 隊(duì)列,worker 自動(dòng)去 redis 隊(duì)列取出任務(wù)并執(zhí)行,worker 節(jié)點(diǎn)可以任意水平擴(kuò)展。
接下來(lái),我們來(lái)實(shí)現(xiàn)這一架構(gòu)的 demo,你可以看到 docker 的強(qiáng)大和方便之處。
1、先創(chuàng)建一個(gè)虛擬環(huán)境,安裝依賴
依賴 fastapi,redis,rq 庫(kù),安裝后生成一個(gè) requirements.txt 文件
- mkdir myproject
- python3 -m venv env
- source env/bin/activate
- pip install rq
- pip install fastapi
- pip install redis
- pip freeze > requirements.txt
2、編碼實(shí)現(xiàn) REST API、Worker
REST 是一種風(fēng)格,這里不是重點(diǎn),我們使用 FastAPI 來(lái)快速創(chuàng)建一個(gè)接口,新建一個(gè) api.py 的文件,內(nèi)容如下:
- from fastapi import FastAPI
- from redis import Redis
- from rq import Queue
- from worker import send_captcha
- app = FastAPI()
- # 需要注意,這里的 host 是主機(jī)名,在 docker 中就是服務(wù)名,后面的 docker-compose.ymal 中的服務(wù)名稱也要是這個(gè)
- redis_conn = Redis(host='myproj_redis', port=6379, db=0)
- # 定義一個(gè)隊(duì)列,名稱是 my_queue
- q = Queue('my_queue', connection=redis_conn)
- @app.get('/hello')
- def hello():
- """Test endpoint"""
- return {'hello': 'world'}
- # Rest API 示例
- @app.post('/send_captcha/{phone_number}', status_code=201)
- def addTask(phone_number: str):
- """
- Adds tasks to worker queue.
- Expects body as dictionary matching the Group class.
- """
- job = q.enqueue(send_captcha, phone_number)
- return {'job': "tasks add done."}
這里的 send_captcha 函數(shù)就是一個(gè)異步任務(wù),從 worker.py 中導(dǎo)入,worker.py 的內(nèi)容如下:
- import time
- def send_captcha(phone_number):
- """
- 模擬一個(gè)耗時(shí)的異步任務(wù)
- """
- print(f'{time.strftime("%T")} 準(zhǔn)備發(fā)送手機(jī)驗(yàn)證碼') # in place of actual logging
- print(f'{time.strftime("%T")} 生成隨機(jī)驗(yàn)證碼并存入 redis,設(shè)置 5 分鐘過(guò)期時(shí)間')
- time.sleep(5) # simulate long running task
- print(f'{time.strftime("%T")} {phone_number}發(fā)送完成')
- return { phone_number: 'task complete'}
return { phone_number: 'task complete'}
3、構(gòu)建 Dokcer 鏡像
現(xiàn)在的目標(biāo)是實(shí)現(xiàn)一個(gè)擁有兩個(gè)執(zhí)行節(jié)點(diǎn)的集群。我們需要啟動(dòng) 4 個(gè)容器來(lái)完成一個(gè)集群部署:
- 容器1:運(yùn)行 FastAPI app
- 容器2:運(yùn)行 Redis 服務(wù)
- 容器3:運(yùn)行 worker 1 服務(wù)
- 容器4:運(yùn)行 worker 2 服務(wù)
其中容器 1、3、4 都是 Python 應(yīng)用,可以共用一個(gè) Python 鏡像。
為了方便調(diào)試,我們可以讓 1、3、4 容器共享我們的本地路徑,這樣改了代碼就不需要重新構(gòu)建鏡像,比較方便。
創(chuàng)建一個(gè)包含依賴的 Python 鏡像
現(xiàn)在我們來(lái)創(chuàng)建一個(gè)包含前文 requirements.txt 依賴的 Python 鏡像,編寫 Dockerfile,內(nèi)容如下:
- FROM python:3.8-alpine
- RUN adduser -D myproj
- WORKDIR /home/myproj
- COPY requirements.txt requirements.txt
- RUN pip install -r requirements.txt
- RUN chown -R myproj:myproj ./
- USER myproj
- CMD uvicorn api:app --host 0.0.0.0 --port 5057
內(nèi)容說(shuō)明:
FROM python:3.8-alpine
指定使用 python:3.8-alpine,這個(gè)容器已經(jīng)預(yù)裝了 Python3.8,可以在命令行執(zhí)行 docker search python 看看有哪些 Python 鏡像。
RUN adduser -D myproj
添加一個(gè)用戶 myproj,這一步的主要目的是為了生成目錄 /home/myproj
WORKDIR /home/myproj
設(shè)置程序的執(zhí)行路徑為 /home/myproj
COPY requirements.txt requirements.txt
復(fù)制當(dāng)前路徑下的 requirements.txt 到容器的 /home/myproj,這里沒有復(fù)制 .py 文件是因?yàn)楹竺嫖覀儐?dòng)容器的時(shí)候會(huì)共享本地路徑,不需要再?gòu)?fù)制了,生產(chǎn)部署時(shí)最好復(fù)制到窗口內(nèi)部,這樣容器就不會(huì)依賴本機(jī)。
RUN pip install -r requirements.txt
在容器中安裝依賴
RUN chown -R myproj:myproj ./
將 /home/myproj 路徑下的文件的擁有者和所屬組改為 myproj,這一步為了使用 myproj 用戶來(lái)啟動(dòng) fastapi 服務(wù),生產(chǎn)環(huán)境通常用 root 用戶啟動(dòng),也就不需要這個(gè)指令了。
USER myproj
切換到 myproj 用戶
CMD uvicorn api:app --host 0.0.0.0 --port 5057
容器啟動(dòng)后執(zhí)行的命令,服務(wù)端口為 5057
更多的 Dockerfile 語(yǔ)法請(qǐng)參考官方文檔,這里僅是簡(jiǎn)要說(shuō)明。
現(xiàn)在 Dockerfile 所在的目錄執(zhí)行下面的命令構(gòu)建一個(gè)鏡像:
- docker build -t myproject:latest .
創(chuàng)建完成后,可以使用 docker images 來(lái)查看:
- ❯ docker images | grep myproj
- myproject
4、啟動(dòng)集群
這里使用 Docker Compose 來(lái)啟動(dòng) 4 個(gè)容器,為什么用 Docker Compose 呢?因?yàn)榉奖悖绻挥玫脑?,需要手?dòng)一個(gè)容器一個(gè)容器啟動(dòng)。
Docker Compose 會(huì)讀取一個(gè) yaml 格式的配置文件,依據(jù)配置文件來(lái)啟動(dòng)容器,各容器共享同一網(wǎng)絡(luò)。還記得 api.py 中使用的 Redis 主機(jī)名嗎,這里就需要將 redis 服務(wù)名設(shè)置為那個(gè)主機(jī)名。
編寫一個(gè) docker-compose.yml 內(nèi)容如下:
- version: '3'
- services:
- myproj_redis:
- image: redis:4.0-alpine
- ports:
- - "6379:6379"
- volumes:
- - ./redis:/data
- myproj_api:
- image: myproject:latest
- command: uvicorn api:app --host 0.0.0.0 --port 5057
- ports:
- - "5057:5057"
- volumes:
- - ./:/home/myproj
- myproj_worker1:
- image: myproject:latest
- command: rq worker --url redis://myproj_redis:6379 my_queue
- volumes:
- - ./:/home/myproj
- myproj_worker2:
- image: myproject:latest
- command: rq worker --url redis://myproj_redis:6379 my_queue
- volumes:
- - ./:/home/myproj
第一個(gè)容器是 myproj_redis,運(yùn)行著 redis 服務(wù), redis 的數(shù)據(jù)通過(guò) volumes 方式保存在本地,因此需要在本地創(chuàng)建一個(gè) redis 目錄,來(lái)映射容器內(nèi)部的 /data 目錄。
第二個(gè)容器就是 fastapi 服務(wù),端口 5057,使用本地路徑映射為 /home/myproj
第三個(gè)容器和第四個(gè)容器是 worker 節(jié)點(diǎn),雖然也映射了本地路徑,但它僅使用 worker.py 文件。當(dāng)任務(wù)太多時(shí),worker 節(jié)點(diǎn)可以擴(kuò)展,解決負(fù)載壓力,
最終的目錄是這樣:
執(zhí)行 docker compose 命令啟動(dòng) 4 個(gè)容器:
- docker compose -f docker-compose.yml up
可以看到 4 個(gè)服務(wù)均啟動(dòng)并正常打印了日志輸出。
4、測(cè)試
現(xiàn)在來(lái)測(cè)試一下,左邊的窗口,我使用 Python 快速發(fā)送了 3 個(gè) post 請(qǐng)求:
- import subprocess
- for i in range(3):
- subprocess.run("curl -v -X POST 'http://localhost:5057/send_captcha/18012345678'",shell = True)
從右邊窗口的日志輸出可以看出 worker1 和 worker2 都執(zhí)行了任務(wù),其中 worker1 執(zhí)行了 2 個(gè),worker2 執(zhí)行了 1 個(gè)。
查看完整代碼請(qǐng)點(diǎn)擊「閱讀原文」
最后的話
本文分享了如何使用 Dockerfile 構(gòu)建一個(gè)鏡像,使用 Docker Compose 管理一個(gè)容器集群,以此為基礎(chǔ)實(shí)現(xiàn)了一個(gè)具有異步任務(wù)隊(duì)列集群的 REST API,拋磚引玉,關(guān)于 Dockerfile、docker-compose 的詳細(xì)用法,還請(qǐng)參考 Docker 官方文檔