分布式任務(wù)隊列 Celery 的實踐
筆者在近期工作中有接觸到 Celery,這是一個開源的分布式任務(wù)隊列(Distributed Task Queue),在 Github 上現(xiàn)有 18k star,主要可以用于實現(xiàn)應(yīng)用中的異步任務(wù)和定時任務(wù),雖然是用 Python 編寫,但協(xié)議可以用任何語言實現(xiàn),現(xiàn)已有 gocelery、nodecelery 和 celery-php 等。
筆者寫下此文總結(jié)對 Celery 的了解和在工作中的使用。本文的大概內(nèi)容如下:
- 任務(wù)隊列是什么;
- Celery 做了什么;
- Celery 在工作中的實踐。
任務(wù)隊列是什么
“消息隊列(Message Queue)”,后端同學應(yīng)該都有了解,常見的有 RabbitMQ、RocketMQ、Kafka。而“任務(wù)隊列(Task Queue)”,筆者在接觸 Celery 之前是沒有聽過的。任務(wù)隊列是什么,而任務(wù)隊列和消息隊列,這兩者之間有何關(guān)系。帶著問題,先看看 Celery 的架構(gòu):
Celery
在 Celery 的架構(gòu)中,可看出由多臺 Server 發(fā)起異步任務(wù)(Async Task),發(fā)送任務(wù)到 Broker 的隊列中,其中的 Celery Beat 進程可負責發(fā)起定時任務(wù)。當 Task 到達 Broker 后,會將其分發(fā)給相應(yīng)的 Celery Worker 進行處理。當 Task 處理完成后,其結(jié)果存儲至 Backend。
在上述過程中的 Broker 和 Backend,Celery 沒有實現(xiàn),而是使用了現(xiàn)有開源實現(xiàn),例如 RabbitMQ 作為 Broker 提供消息隊列服務(wù),Redis 作為 Backend 提供結(jié)果存儲服務(wù)。Celery 就像是抽象了消息隊列架構(gòu)中 Producer、Consumer 的實現(xiàn),將消息隊列中基本單位“消息”抽象成了任務(wù)隊列中的“任務(wù)”,并將異步、定時任務(wù)的發(fā)起和結(jié)果存儲等操作進行了封裝,讓開發(fā)者可以忽略 AMQP、RabbitMQ 等實現(xiàn)細節(jié),為開發(fā)帶來便利。
綜上所述,Celery 作為任務(wù)隊列是基于消息隊列的進一步封裝,其實現(xiàn)依賴消息隊列。
接下來,通過一個簡單的應(yīng)用來具體了解 Celery 做了什么。
Celery 做了什么
在應(yīng)用開發(fā)中,為了保證響應(yīng)速度,耗時且不影響流程的操作通常被做異步處理。例如在用戶注冊的處理過程中,通常會異步發(fā)送郵件通知用戶,下面看看 Celery 是如何實現(xiàn)該異步操作。
在 task.py 中聲明了發(fā)送郵件的方法 send_mail,并為其加上 Celery 提供的 @app.task 裝飾器。通過該裝飾器,可以將 send_mail 函數(shù)變成一個 celery.app.task:Task 實例對象。而該 Task 實例可提供了兩個核心功能:
- 將消息發(fā)送給隊列;
- 聲明 Worker 接收到消息后需要執(zhí)行的具體函數(shù)。
- from celery import Celery
- app = Celery('tasks', broker='amqp://guest@localhost//')
- @app.task
- def send_mail(email):
- print("send mail to ", email)
- import time
- time.sleep(5)
- return "success"
Task 已經(jīng)定義完成,若要發(fā)起異步任務(wù),可通過調(diào)用 Task 的 delay 方法,該方法會將消息發(fā)送至隊列,例如在用戶注冊完成時,發(fā)起發(fā)郵件的異步任務(wù):
- # user.py
- from tasks import send_mail
- def register():
- print("1. 插入記錄到數(shù)據(jù)庫")
- print("2. 通過celery異步發(fā)郵件")
- send_mail.delay("chaycao@gmail.com")
- print("3. 告訴用戶注冊成功")
- if __name__ =='__main__':
- register()
運行以上程序后,消息已經(jīng)發(fā)送至 RabbitMQ 的隊列中,可觀察到其消息格式如下:
Task in RabbitMQ
可看出 Celery 封裝后的消息包含了 task 標識和運行參數(shù)等內(nèi)容。
接著,啟動 Worker 消費 RabbitMQ 中的消息:
- celery -A tasks worker --loglevel=info
Worker 啟動后,可以看到下面打印信息:
Worker Start
首先是 Worker 的配置信息,然后是 Worker 所執(zhí)行的 Task 列表,接著是從 RabbitMQ 中成功獲取消息并執(zhí)行相應(yīng)的 Task。
通過以上示例,可以進一步明白 Celery 作為任務(wù)隊列框架所做的工作,而“分布式任務(wù)隊列”中的”分布式“指的則是 Producer、Consumer 可以有多個,即多個進程向 Broker 發(fā)送任務(wù),多個 Worker 從 Broker 中獲取 Task 并執(zhí)行。
以上只是一個簡單的示例,接著再看下筆者在工作中所接觸到的關(guān)于 Celery 使用的一些實踐經(jīng)驗。
Celery 在工作中的實踐
根據(jù)業(yè)務(wù)場景劃分隊列
在筆者所工作的項目中,Celery 用于處理下單、解析軌跡、推送上游等異步任務(wù)和定時任務(wù)。根據(jù)每個 Task 的業(yè)務(wù)場景,可為其指定對應(yīng)的隊列,例如:
- DEFAULT_CELERY_ROUTES = {
- 'celery_task.pending_create': {'queue': 'create'},
- 'celery_task.multi_create': {'queue': 'create'},
- 'celery_task.pull_tracking': {'queue': 'pull'},
- 'celery_task.pull_branch': {'queue': 'pull'},
- 'celery_task.push_tracking': {'queue': 'push'},
- 'celery_task.push_weight': {'queue': 'push'},
- }
- CELERY_ROUTES = {
- DEFAULT_CELERY_ROUTES
- }
根據(jù)業(yè)務(wù)場景,在 DEFAULT_CELERY_ROUTES 配置中指定 6 個 Task 對應(yīng)的 Queue,共有 3 個隊列 create、pull、push,并將該路由規(guī)則加入到 CELERY_ROUTES 中以生效。這樣設(shè)計的目的是為了不同場景彼此之間互不影響,例如解析任務(wù)阻塞不應(yīng)該影響下單任務(wù)。
進一步劃分隊列
在根據(jù)業(yè)務(wù)場景粗略劃分后,對于某個場景,可能需要更細致的劃分,例如在向上游推送時,為了避免一個上游的阻塞影響向其他上游推送,需要做到不同上游彼此之間互不影響。所以需要針對不同上游使用不同隊列,例如:
- CLIENT_CELERY_ROUTES = {
- # {0} 為 client 的占位符,在 ClientRouter 中進行格式化
- 'celery_task.push_tracking_retry': {'queue': 'push_tracking_retry_{0}'},
- 'celery_task.push_weight_retry': {'queue': 'push_weight_retry_{0}'},
- }
- class ClientRouter(object):
- def route_for_task(self, task, args=None, kwargs=None):
- if task not in CLIENT_CELERY_ROUTES:
- return None
- client_id = kwargs('client_id')
- # 根據(jù) client_id 獲取隊列名
- queue_name = CLIENT_CELERY_ROUTES[task]['queue'].format(client_id)
- return {'queue': queue_name}
- CELERY_ROUTES = {
- 'ClientRouter'
- DEFAULT_CELERY_ROUTES,
- }
在 CLIENT_CELERY_ROUTES 中指定了需要根據(jù) Client 隔離隊列的 Task 和其對應(yīng)的 Queue 名稱格式,隊列名中含有一個占位符,為的是根據(jù)不同 Client 得到不同的隊列名。
接著實現(xiàn)了一個路由器 ClientRouter ,其中定義了 router_for_task 方法,其作用是為 task 指定對應(yīng)的隊列名。可看出其中的邏輯是如果 task 在 CLIENT_CELERY_ROUTES 中,將會用 kwargs 中的 client_id 格式化隊列名,得到最終發(fā)送消息的隊列名,達到根據(jù)入?yún)?client_id 來決定具體使用的隊列,從而起到隔離不同 Client 使用不同隊列的效果。
除了在 Client 的維度上劃分,若需要在其他維度進一步劃分隊列以達到隔離的效果,也可參考該方法來設(shè)計路由規(guī)則。
動態(tài)隊列
再來說說動態(tài)隊列,其本質(zhì)是預(yù)備隊列,其目的是為了在線上環(huán)境減輕某些隊列消息堆積的壓力,起到快速支援的作用。通過配置來定義動態(tài)隊列需要支援哪些隊列,例如當 push 隊列的壓力較大,可配置 json 如下,將 push_tracking 和 push_weight 兩個 Task 路由到預(yù)備的動態(tài)隊列中。
- celery_dynamic_router 配置
- {
- "celery_task.push_tracking": {
- "dynamic_queue": [1,2],
- "dynamic_percentage": 0.7,
- },
- "celery_task.push_weight": {
- "dynamic_queue": [3,4],
- "dynamic_percentage": 0.7,
- }
- }
上述配置的作用是將 70% 的 celery_task.push_tracking Task 路由到動態(tài)隊列 1、2 上,70% 的 celery_task.push_weight Task 路由到動態(tài)隊列 3、4 上。
動態(tài)隊列的路由器 DynamicRouter 大致實現(xiàn)如下:
- class DynamicRouter(object):
- def route_for_task(self, task, args=None, kwargs=None):
- # 獲取配置
- task_config = get_conf_dict('celery_dynamic_router').get(task, None)
- # task如果沒在配置中,則直接返回
- if not task_config:
- return None
- # 獲取task對應(yīng)的動態(tài)隊列配置
- dynamic_queue = task_config.get('dynamic_queue', [])
- dynamic_percentage = task_config.get('dynamic_percentage', 0.0)
- # 將一定比例的task路由到動態(tài)隊列中
- if random.random() <= dynamic_percentage:
- # 決定使用哪個動態(tài)隊列
- queue_name = router_load_balance(dynamic_queue, task_name)
- log.data('get_router| task_name:%s, queue:%s', task_name, queue_name)
- return {'queue': queue_name}
- else:
- return None
動態(tài)配置的定時任務(wù)
前文提到 Celery 不僅能實現(xiàn)異步任務(wù),還能通過 Celery Beat 實現(xiàn)定時任務(wù),首先看一個例子:
- from celery.schedules import crontab
- app.conf.beat_schedule = {
- # 每30秒發(fā)送一次郵件
- 'sendmail-every-30-seconds': {
- 'task': 'asks.send_mail',
- 'schedule': 30.0,
- 'args': ['chaycao@gmail.com']
- },
- }
完成上述配置后,執(zhí)行 Celery Beat 命令:
celery beat
即根據(jù)配置每 30 秒執(zhí)行一次 send_email 任務(wù)。
上述示例是在代碼中配置定時任務(wù)。而在筆者的工作中使用了 djcelery 提供的數(shù)據(jù)庫調(diào)度模型,通過結(jié)合 django 提供的 ORM 功能來動態(tài)設(shè)置,更為方便。下面敘述如何實現(xiàn),首先在 Celery 配置中新增:
- CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
設(shè)置使用 DatabaseScheduler,然后再生成定時任務(wù)的配置表:
- python manage.py migrate
可以看到數(shù)據(jù)庫中多出了以下表:
- | celery_taskmeta |
- | celery_tasksetmeta |
- | djcelery_crontabschedule |
- | djcelery_intervalschedule |
- | djcelery_periodictask |
- | djcelery_periodictasks |
- | djcelery_taskstate |
- | djcelery_workerstate |
完成以上操作,最后只用執(zhí)行 Celery Beat 命令,則會去數(shù)據(jù)庫中讀取配置發(fā)起定時任務(wù)。這樣的好處是可以通過修改數(shù)據(jù)庫中的記錄來實現(xiàn)動態(tài)配置定時任務(wù),例如調(diào)整任務(wù)的周期或者參數(shù)。
以上便是筆者在工作中接觸到 Celery 所收獲的內(nèi)容,如果有需要實現(xiàn)異步任務(wù)、定時任務(wù)的場景,可以考慮使用 Celery。
我是草捏子,一只熱愛技術(shù)和生活的草魚,我們下期見!
參考
Message Queue vs Task Queue difference (https://newbedev.com/message-queue-vs-task-queue-difference)
高性能異步框架Celery入坑指南 (https://juejin.cn/post/6844903689103081480)
分布式任務(wù)隊列 Celery—深入 Task (https://www.cnblogs.com/jmilkfan-fanguiju/p/10589779.html)