輕松完成異步任務,一文搞懂Python Celery
雖然現代的網絡應用比以往任何時候都更快速、更便捷,但仍有許多情況下,需要把繁重的任務轉移到系統的其他部分執(zhí)行,而不是在主線程上進行工作。
這些情況中的示例如下:
- 周期性任務 —— 計劃在特定時間間隔內運行的工作。例如,每日、每月的報告生成。
- 第三方工具 —— 應用程序應該快速向用戶返回響應,而不是等待其他任務先完成。例如,發(fā)送電子郵件、通知,將更新進度傳遞給內部工具。
- 長時間運行的工作 —— 執(zhí)行復雜或資源昂貴的工作,并且用戶需要等待工作完成。例如。DAG工作流、基于Map-Reduce的任務、長時間運行的Spark作業(yè)等。
那么,如何處理這些情況呢?這時,Celery就派上用場了。
什么是Celery?
Celery是一個開源的任務隊列實現,通常與基于Python的網絡框架(如Flask和Django)相結合,在典型的請求-響應周期之外異步執(zhí)行任務。
因此,Celery本質上是一個基于分布式消息傳遞的任務隊列。執(zhí)行單元或任務在一個或多個worker上使用多處理、gevent或Eventlet同時執(zhí)行。這些任務可以同步執(zhí)行(即等到準備就緒)或異步執(zhí)行(即在后臺)。
Celery是如何工作的?
Celery是一個分布式任務隊列,基于生產者-消費者模式。
任務隊列是用于跨線程和機器分配工作的機制,本質上是生產者(Web應用程序)和消費者(Celery工作者)之間的消息中介。
Celery通過消息進行交互,代理(broker)在客戶(生產者)和工作者(消費者)之間充當中間人。為了啟動任務,客戶端將消息推送到隊列中,然后代理將該消息傳遞給工作者。
Celery系統可以由多個worker和broker組成,這為高可用性和橫向擴展提供了可能。
簡而言之,Celery客戶端是生產者,它通過消息代理向隊列中添加新的任務。然后,Celery工作者同樣通過消息代理從隊列中獲取新的任務。一旦處理完畢,結果就會存儲在結果后端。
工作實例
下面的例子將使用RedisMQ作為消息代理。
設置Redis
在linux/macOS系統上,通過以下命令在本地運行Redis服務器:
設置好Redis后,通過執(zhí)行以下命令運行Redis服務器:
該服務器在默認的6379端口運行。
設置應用程序
首先,在本地設置Python項目。
Celery可以通過標準工具如pip或easy_install來安裝。通過以下命令安裝Celery和Redis:
現在需要一個Celery實例來運行應用程序,Celery實現任何任務都是以實例開始,比如創(chuàng)建和管理任務等。
在項目中創(chuàng)建一個文件tasks.py:
這里定義了一個簡單的任務add(),返回兩個數字的總和。
運行Celery Worker
在終端上,切換到項目位置并用以下命令運行Celery worker:
關于Celery worker命令行的詳細信息,可以使用help:
調用任務
在Celery中,使用delay()方法來調用任務。
打開項目的另一個終端窗口并運行以下命令:
這將打開Python命令行。
這將返回一個AsyncResult實例,可以用來檢查任務狀態(tài),獲得其返回值,等待任務完成,也可以在失敗時獲得異常和回溯。
運行add.delay()命令后,任務會被推送到隊列中,然后被worker獲取。這可以在Celery worker終端上進行驗證,可以清楚地看到任務被接收,之后任務成功完成。