Python 實(shí)現(xiàn)定時(shí)任務(wù)的八種方案!
在日常工作中,我們常常會(huì)用到需要周期性執(zhí)行的任務(wù),一種方式是采用 Linux 系統(tǒng)自帶的 crond[1] 結(jié)合命令行實(shí)現(xiàn)。另外一種方式是直接使用 Python。接下里整理的是常見(jiàn)的 Python 定時(shí)任務(wù)的實(shí)現(xiàn)方式。
利用 while True: + sleep() 實(shí)現(xiàn)定時(shí)任務(wù)
位于 time 模塊中的 sleep(secs) 函數(shù),可以實(shí)現(xiàn)令當(dāng)前執(zhí)行的線程暫停 secs 秒后再繼續(xù)執(zhí)行。所謂暫停,即令當(dāng)前線程進(jìn)入阻塞狀態(tài),當(dāng)達(dá)到 sleep() 函數(shù)規(guī)定的時(shí)間后,再由阻塞狀態(tài)轉(zhuǎn)為就緒狀態(tài),等待 CPU 調(diào)度。
基于這樣的特性我們可以通過(guò) while 死循環(huán)+sleep() 的方式實(shí)現(xiàn)簡(jiǎn)單的定時(shí)任務(wù)。
代碼示例:
- import datetime
- import time
- def time_printer():
- now = datetime.datetime.now()
- ts = now.strftime('%Y-%m-%d %H:%M:%S')
- print('do func time :', ts)
- def loop_monitor():
- while True:
- time_printer()
- time.sleep(5) # 暫停 5 秒
- if __name__ == "__main__":
- loop_monitor()
主要缺點(diǎn):
- 只能設(shè)定間隔,不能指定具體的時(shí)間,比如每天早上 8:00
- sleep 是一個(gè)阻塞函數(shù),也就是說(shuō) sleep 這一段時(shí)間,程序什么也不能操作。
使用 Timeloop 庫(kù)運(yùn)行定時(shí)任務(wù)
Timeloop[2] 是一個(gè)庫(kù),可用于運(yùn)行多周期任務(wù)。這是一個(gè)簡(jiǎn)單的庫(kù),它使用 decorator 模式在線程中運(yùn)行標(biāo)記函數(shù)。
示例代碼:
- import time
- from timeloop import Timeloop
- from datetime import timedelta
- tl = Timeloop()
- @tl.job(interval=timedelta(seconds=2))
- def sample_job_every_2s():
- print "2s job current time : {}".format(time.ctime())
- @tl.job(interval=timedelta(seconds=5))
- def sample_job_every_5s():
- print "5s job current time : {}".format(time.ctime())
- @tl.job(interval=timedelta(seconds=10))
- def sample_job_every_10s():
- print "10s job current time : {}".format(time.ctime())
利用 threading.Timer 實(shí)現(xiàn)定時(shí)任務(wù)
threading 模塊中的 Timer 是一個(gè)非阻塞函數(shù),比 sleep 稍好一點(diǎn),timer 最基本理解就是定時(shí)器,我們可以啟動(dòng)多個(gè)定時(shí)任務(wù),這些定時(shí)器任務(wù)是異步執(zhí)行,所以不存在等待順序執(zhí)行問(wèn)題。
Timer(interval, function, args=[ ], kwargs={ })
- interval: 指定的時(shí)間
- function: 要執(zhí)行的方法
- args/kwargs: 方法的參數(shù)
代碼示例:
- import datetime
- from threading import Timer
- def time_printer():
- now = datetime.datetime.now()
- ts = now.strftime('%Y-%m-%d %H:%M:%S')
- print('do func time :', ts)
- loop_monitor()
- def loop_monitor():
- t = Timer(5, time_printer)
- t.start()
- if __name__ == "__main__":
- loop_monitor()
備注:Timer 只能執(zhí)行一次,這里需要循環(huán)調(diào)用,否則只能執(zhí)行一次
利用內(nèi)置模塊 sched 實(shí)現(xiàn)定時(shí)任務(wù)
sched 模塊實(shí)現(xiàn)了一個(gè)通用事件調(diào)度器,在調(diào)度器類使用一個(gè)延遲函數(shù)等待特定的時(shí)間,執(zhí)行任務(wù)。同時(shí)支持多線程應(yīng)用程序,在每個(gè)任務(wù)執(zhí)行后會(huì)立刻調(diào)用延時(shí)函數(shù),以確保其他線程也能執(zhí)行。
class sched.scheduler(timefunc, delayfunc) 這個(gè)類定義了調(diào)度事件的通用接口,它需要外部傳入兩個(gè)參數(shù),timefunc 是一個(gè)沒(méi)有參數(shù)的返回時(shí)間類型數(shù)字的函數(shù)(常用使用的如 time 模塊里面的 time),delayfunc 應(yīng)該是一個(gè)需要一個(gè)參數(shù)來(lái)調(diào)用、與 timefunc 的輸出兼容、并且作用為延遲多個(gè)時(shí)間單位的函數(shù)(常用的如 time 模塊的 sleep)。
代碼示例:
- import datetime
- import time
- import sched
- def time_printer():
- now = datetime.datetime.now()
- ts = now.strftime('%Y-%m-%d %H:%M:%S')
- print('do func time :', ts)
- loop_monitor()
- def loop_monitor():
- s = sched.scheduler(time.time, time.sleep) # 生成調(diào)度器
- s.enter(5, 1, time_printer, ())
- s.run()
- if __name__ == "__main__":
- loop_monitor()
scheduler 對(duì)象主要方法:
- enter(delay, priority, action, argument),安排一個(gè)事件來(lái)延遲 delay 個(gè)時(shí)間單位。
- cancel(event):從隊(duì)列中刪除事件。如果事件不是當(dāng)前隊(duì)列中的事件,則該方法將跑出一個(gè) ValueError。
- run():運(yùn)行所有預(yù)定的事件。這個(gè)函數(shù)將等待(使用傳遞給構(gòu)造函數(shù)的 delayfunc() 函數(shù)),然后執(zhí)行事件,直到不再有預(yù)定的事件。
個(gè)人點(diǎn)評(píng):比 threading.Timer 更好,不需要循環(huán)調(diào)用。
利用調(diào)度模塊 schedule 實(shí)現(xiàn)定時(shí)任務(wù)
schedule[3] 是一個(gè)第三方輕量級(jí)的任務(wù)調(diào)度模塊,可以按照秒,分,小時(shí),日期或者自定義事件執(zhí)行時(shí)間。schedule[4] 允許用戶使用簡(jiǎn)單、人性化的語(yǔ)法以預(yù)定的時(shí)間間隔定期運(yùn)行 Python 函數(shù)(或其它可調(diào)用函數(shù))。
先來(lái)看代碼,是不是不看文檔就能明白什么意思?
- mport schedule
- import time
- def job():
- print("I'm working...")
- schedule.every(10).seconds.do(job)
- schedule.every(10).minutes.do(job)
- schedule.every().hour.do(job)
- schedule.every().day.at("10:30").do(job)
- schedule.every(5).to(10).minutes.do(job)
- schedule.every().monday.do(job)
- schedule.every().wednesday.at("13:15").do(job)
- schedule.every().minute.at(":17").do(job)
- while True:
- schedule.run_pending()
- time.sleep(1)
裝飾器:通過(guò) @repeat() 裝飾靜態(tài)方法
- import time
- from schedule import every, repeat, run_pending
- @repeat(every().second)
- def job():
- print('working...')
- while True:
- run_pending()
- time.sleep(1)
傳遞參數(shù):
- import schedule
- def greet(name):
- print('Hello', name)
- schedule.every(2).seconds.do(greet, name='Alice')
- schedule.every(4).seconds.do(greet, name='Bob')
- while True:
- schedule.run_pending()
裝飾器同樣能傳遞參數(shù):
- from schedule import every, repeat, run_pending
- @repeat(every().second, 'World')
- @repeat(every().minute, 'Mars')
- def hello(planet):
- print('Hello', planet)
- while True:
- run_pending()
取消任務(wù):
- import schedule
- i = 0
- def some_task():
- global i
- i += 1
- print(i)
- if i == 10:
- schedule.cancel_job(job)
- print('cancel job')
- exit(0)
- job = schedule.every().second.do(some_task)
- while True:
- schedule.run_pending()
運(yùn)行一次任務(wù):
- import time
- import schedule
- def job_that_executes_once():
- print('Hello')
- return schedule.CancelJob
- schedule.every().minute.at(':34').do(job_that_executes_once)
- while True:
- schedule.run_pending()
- time.sleep(1)
根據(jù)標(biāo)簽檢索任務(wù):
- # 檢索所有任務(wù):schedule.get_jobs()
- import schedule
- def greet(name):
- print('Hello {}'.format(name))
- schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend')
- schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend')
- schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
- schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest')
- friends = schedule.get_jobs('friend')
- print(friends)
根據(jù)標(biāo)簽取消任務(wù):
- # 取消所有任務(wù):schedule.clear()
- import schedule
- def greet(name):
- print('Hello {}'.format(name))
- if name == 'Cancel':
- schedule.clear('second-tasks')
- print('cancel second-tasks')
- schedule.every().second.do(greet, 'Andrea').tag('second-tasks', 'friend')
- schedule.every().second.do(greet, 'John').tag('second-tasks', 'friend')
- schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer')
- schedule.every(5).seconds.do(greet, 'Cancel').tag('daily-tasks', 'guest')
- while True:
- schedule.run_pending()
運(yùn)行任務(wù)到某時(shí)間:
- import schedule
- from datetime import datetime, timedelta, time
- def job():
- print('working...')
- schedule.every().second.until('23:59').do(job) # 今天 23:59 停止
- schedule.every().second.until('2030-01-01 18:30').do(job) # 2030-01-01 18:30 停止
- schedule.every().second.until(timedelta(hours=8)).do(job) # 8 小時(shí)后停止
- schedule.every().second.until(time(23, 59, 59)).do(job) # 今天 23:59:59 停止
- schedule.every().second.until(datetime(2030, 1, 1, 18, 30, 0)).do(job) # 2030-01-01 18:30 停止
- while True:
- schedule.run_pending()
馬上運(yùn)行所有任務(wù)(主要用于測(cè)試):
- import schedule
- def job():
- print('working...')
- def job1():
- print('Hello...')
- schedule.every().monday.at('12:40').do(job)
- schedule.every().tuesday.at('16:40').do(job1)
- schedule.run_all()
- schedule.run_all(delay_seconds=3) # 任務(wù)間延遲 3 秒
并行運(yùn)行:使用 Python 內(nèi)置隊(duì)列實(shí)現(xiàn):
- import threading
- import time
- import schedule
- def job1():
- print("I'm running on thread %s" % threading.current_thread())
- def job2():
- print("I'm running on thread %s" % threading.current_thread())
- def job3():
- print("I'm running on thread %s" % threading.current_thread())
- def run_threaded(job_func):
- job_thread = threading.Thread(target=job_func)
- job_thread.start()
- schedule.every(10).seconds.do(run_threaded, job1)
- schedule.every(10).seconds.do(run_threaded, job2)
- schedule.every(10).seconds.do(run_threaded, job3)
- while True:
- schedule.run_pending()
- time.sleep(1)
利用任務(wù)框架 APScheduler 實(shí)現(xiàn)定時(shí)任務(wù)
APScheduler[5](advanceded python scheduler)基于 Quartz 的一個(gè) Python 定時(shí)任務(wù)框架,實(shí)現(xiàn)了 Quartz 的所有功能,使用起來(lái)十分方便。提供了基于日期、固定時(shí)間間隔以及 crontab 類型的任務(wù),并且可以持久化任務(wù)。基于這些功能,我們可以很方便的實(shí)現(xiàn)一個(gè) Python 定時(shí)任務(wù)系統(tǒng)。
它有以下三個(gè)特點(diǎn):
- 類似于 Liunx Cron 的調(diào)度程序(可選的開(kāi)始/結(jié)束時(shí)間)
- 基于時(shí)間間隔的執(zhí)行調(diào)度(周期性調(diào)度,可選的開(kāi)始/結(jié)束時(shí)間)
- 一次性執(zhí)行任務(wù)(在設(shè)定的日期/時(shí)間運(yùn)行一次任務(wù))
APScheduler 有四種組成部分:
- 觸發(fā)器 (trigger) 包含調(diào)度邏輯,每一個(gè)作業(yè)有它自己的觸發(fā)器,用于決定接下來(lái)哪一個(gè)作業(yè)會(huì)運(yùn)行。除了他們自己初始配置意外,觸發(fā)器完全是無(wú)狀態(tài)的。
作業(yè)存儲(chǔ) (job store) 存儲(chǔ)被調(diào)度的作業(yè),默認(rèn)的作業(yè)存儲(chǔ)是簡(jiǎn)單地把作業(yè)保存在內(nèi)存中,其他的作業(yè)存儲(chǔ)是將作業(yè)保存在數(shù)據(jù)庫(kù)中。一個(gè)作業(yè)的數(shù)據(jù)講在保存在持久化作業(yè)存儲(chǔ)時(shí)被序列化,并在加載時(shí)被反序列化。調(diào)度器不能分享同一個(gè)作業(yè)存儲(chǔ)。
- 執(zhí)行器 (executor) 處理作業(yè)的運(yùn)行,他們通常通過(guò)在作業(yè)中提交制定的可調(diào)用對(duì)象到一個(gè)線程或者進(jìn)城池來(lái)進(jìn)行。當(dāng)作業(yè)完成時(shí),執(zhí)行器將會(huì)通知調(diào)度器。
- 調(diào)度器 (scheduler) 是其他的組成部分。你通常在應(yīng)用只有一個(gè)調(diào)度器,應(yīng)用的開(kāi)發(fā)者通常不會(huì)直接處理作業(yè)存儲(chǔ)、調(diào)度器和觸發(fā)器,相反,調(diào)度器提供了處理這些的合適的接口。配置作業(yè)存儲(chǔ)和執(zhí)行器可以在調(diào)度器中完成,例如添加、修改和移除作業(yè)?!⊥ㄟ^(guò)配置 executor、jobstore、trigger,使用線程池 (ThreadPoolExecutor 默認(rèn)值 20) 或進(jìn)程池 (ProcessPoolExecutor 默認(rèn)值 5) 并且默認(rèn)最多 3 個(gè) (max_instances) 任務(wù)實(shí)例同時(shí)運(yùn)行,實(shí)現(xiàn)對(duì) job 的增刪改查等調(diào)度控制
示例代碼:
- from apscheduler.schedulers.blocking import BlockingScheduler
- from datetime import datetime
- # 輸出時(shí)間
- def job():
- print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- # BlockingScheduler
- sched = BlockingScheduler()
- sched.add_job(my_job, 'interval', seconds=5, id='my_job_id')
- sched.start()
APScheduler 中的重要概念
Job 作業(yè)
Job 作為 APScheduler 最小執(zhí)行單位。創(chuàng)建 Job 時(shí)指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job 執(zhí)行時(shí)的一些設(shè)置信息。
構(gòu)建說(shuō)明:
- id:指定作業(yè)的唯一 ID
- name:指定作業(yè)的名字
- trigger:apscheduler 定義的觸發(fā)器,用于確定 Job 的執(zhí)行時(shí)間,根據(jù)設(shè)置的 trigger 規(guī)則,計(jì)算得到下次執(zhí)行此 job 的時(shí)間, 滿足時(shí)將會(huì)執(zhí)行
- executor:apscheduler 定義的執(zhí)行器,job 創(chuàng)建時(shí)設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到 scheduler 獲取到執(zhí)行此 job 的 執(zhí)行器,執(zhí)行 job 指定的函數(shù)
- max_instances:執(zhí)行此 job 的最大實(shí)例數(shù),executor 執(zhí)行 job 時(shí),根據(jù) job 的 id 來(lái)計(jì)算執(zhí)行次數(shù),根據(jù)設(shè)置的最大實(shí)例數(shù)來(lái)確定是否可執(zhí)行
- next_run_time:Job 下次的執(zhí)行時(shí)間,創(chuàng)建 Job 時(shí)可以指定一個(gè)時(shí)間 [datetime], 不指定的話則默認(rèn)根據(jù) trigger 獲取觸發(fā)時(shí)間
- misfire_grace_time:Job 的延遲執(zhí)行時(shí)間,例如 Job 的計(jì)劃執(zhí)行時(shí)間是 21:00:00,但因服務(wù)重啟或其他原因?qū)е?21:00:31 才執(zhí)行,如果設(shè)置此 key 為 40, 則該 job 會(huì)繼續(xù)執(zhí)行,否則將會(huì)丟棄此 job
- coalesce:Job 是否合并執(zhí)行,是一個(gè) bool 值。例如 scheduler 停止 20s 后重啟啟動(dòng),而 job 的觸發(fā)器設(shè)置為 5s 執(zhí)行一次,因此此 job 錯(cuò)過(guò)了 4 個(gè)執(zhí)行時(shí)間,如果設(shè)置為是,則會(huì)合并到一次執(zhí)行,否則會(huì)逐個(gè)執(zhí)行
- func:Job 執(zhí)行的函數(shù)
- args:Job 執(zhí)行函數(shù)需要的位置參數(shù)
- kwargs:Job 執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)
Trigger 觸發(fā)器
Trigger 綁定到 Job,在 scheduler 調(diào)度篩選 Job 時(shí),根據(jù)觸發(fā)器的規(guī)則計(jì)算出 Job 的觸發(fā)時(shí)間,然后與當(dāng)前時(shí)間比較確定此 Job 是否會(huì)被執(zhí)行,總之就是根據(jù) trigger 規(guī)則計(jì)算出下一個(gè)執(zhí)行時(shí)間。
目前 APScheduler 支持觸發(fā)器:
- 指定時(shí)間的 DateTrigger
- 指定間隔時(shí)間的 IntervalTrigger
- 像 Linux 的 crontab 一樣的 CronTrigger。
觸發(fā)器參數(shù):date
date 定時(shí),作業(yè)只執(zhí)行一次。
- run_date (datetime|str) – the date/time to run the job at
- timezone (datetime.tzinfo|str) – time zone for run_date if it doesn’t have one already
- sched.add_job(my_job, 'date', run_date=date(2009, 11, 6), args=['text'])
- sched.add_job(my_job, 'date', run_date=datetime(2019, 7, 6, 16, 30, 5), args=['text'])
觸發(fā)器參數(shù):interval
interval 間隔調(diào)度
- weeks (int) – 間隔幾周
- days (int) – 間隔幾天
- hours (int) – 間隔幾小時(shí)
- minutes (int) – 間隔幾分鐘
- seconds (int) – 間隔多少秒
- start_date (datetime|str) – 開(kāi)始日期
- end_date (datetime|str) – 結(jié)束日期
- timezone (datetime.tzinfo|str) – 時(shí)區(qū)
- sched.add_job(job_function, 'interval', hours=2)
觸發(fā)器參數(shù):cron
cron 調(diào)度
- (int|str) 表示參數(shù)既可以是 int 類型,也可以是 str 類型
- (datetime | str) 表示參數(shù)既可以是 datetime 類型,也可以是 str 類型
- year (int|str) – 4-digit year -(表示四位數(shù)的年份,如 2008 年)
- month (int|str) – month (1-12) -(表示取值范圍為 1-12 月)
- day (int|str) – day of the (1-31) -(表示取值范圍為 1-31 日)
- week (int|str) – ISO week (1-53) -(格里歷 2006 年 12 月 31 日可以寫(xiě)成 2006 年-W52-7(擴(kuò)展形式)或 2006W527(緊湊形式))
- day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) – (表示一周中的第幾天,既可以用 0-6 表示也可以用其英語(yǔ)縮寫(xiě)表示)
- hour (int|str) – hour (0-23) – (表示取值范圍為 0-23 時(shí))
- minute (int|str) – minute (0-59) – (表示取值范圍為 0-59 分)
- second (int|str) – second (0-59) – (表示取值范圍為 0-59 秒)
- start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) – (表示開(kāi)始時(shí)間)
- end_date (datetime|str) – latest possible date/time to trigger on (inclusive) – (表示結(jié)束時(shí)間)
- timezone (datetime.tzinfo|str) – time zone to use for the date/time calculations (defaults to scheduler timezone) -(表示時(shí)區(qū)取值)
CronTrigger 可用的表達(dá)式:
表達(dá)式 | 參數(shù)類型 | 描述 |
---|---|---|
* | 所有 | 通配符。例:minutes=*即每分鐘觸發(fā) |
* / a | 所有 | 每隔時(shí)長(zhǎng) a 執(zhí)行一次。例:minutes=”* / 3″ 即每隔 3 分鐘執(zhí)行一次 |
a – b | 所有 | a – b 的范圍內(nèi)觸發(fā)。例:minutes=“2-5”。即 2 到 5 分鐘內(nèi)每分鐘執(zhí)行一次 |
a – b / c | 所有 | a – b 范圍內(nèi),每隔時(shí)長(zhǎng) c 執(zhí)行一次。 |
xth y | 日 | 第幾個(gè)星期幾觸發(fā)。x 為第幾個(gè),y 為星期幾 |
last x | 日 | 一個(gè)月中,最后一個(gè)星期的星期幾觸發(fā) |
last | 日 | 一個(gè)月中的最后一天觸發(fā) |
x, y, z | 所有 | 組合表達(dá)式,可以組合確定值或上述表達(dá)式 |
# 6-8,11-12 月第三個(gè)周五 00:00, 01:00, 02:00, 03:00 運(yùn)行- sched.add_job(job_function, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
- # 每周一到周五運(yùn)行 直到 2024-05-30 00:00:00
- sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2024-05-30'
Executor 執(zhí)行器
Executor 在 scheduler 中初始化,另外也可通過(guò) scheduler 的 add_executor 動(dòng)態(tài)添加 Executor。每個(gè) executor 都會(huì)綁定一個(gè) alias,這個(gè)作為唯一標(biāo)識(shí)綁定到 Job,在實(shí)際執(zhí)行時(shí)會(huì)根據(jù) Job 綁定的 executor 找到實(shí)際的執(zhí)行器對(duì)象,然后根據(jù)執(zhí)行器對(duì)象執(zhí)行 Job。Executor 的種類會(huì)根據(jù)不同的調(diào)度來(lái)選擇,如果選擇 AsyncIO 作為調(diào)度的庫(kù),那么選擇 AsyncIOExecutor,如果選擇 tornado 作為調(diào)度的庫(kù),選擇 TornadoExecutor,如果選擇啟動(dòng)進(jìn)程作為調(diào)度,選擇 ThreadPoolExecutor 或者 ProcessPoolExecutor 都可以。Executor 的選擇需要根據(jù)實(shí)際的 scheduler 來(lái)選擇不同的執(zhí)行器。目前 APScheduler 支持的 Executor:
- executors.asyncio:同步 io,阻塞
- executors.gevent:io 多路復(fù)用,非阻塞
- executors.pool: 線程 ThreadPoolExecutor 和進(jìn)程 ProcessPoolExecutor
- executors.twisted:基于事件驅(qū)動(dòng)
Jobstore 作業(yè)存儲(chǔ)
Jobstore 在 scheduler 中初始化,另外也可通過(guò) scheduler 的 add_jobstore 動(dòng)態(tài)添加 Jobstore。每個(gè) jobstore 都會(huì)綁定一個(gè) alias,scheduler 在 Add Job 時(shí),根據(jù)指定的 jobstore 在 scheduler 中找到相應(yīng)的 jobstore,并將 job 添加到 jobstore 中。作業(yè)存儲(chǔ)器決定任務(wù)的保存方式, 默認(rèn)存儲(chǔ)在內(nèi)存中(MemoryJobStore),重啟后就沒(méi)有了。APScheduler 支持的任務(wù)存儲(chǔ)器有:
- jobstores.memory:內(nèi)存
- jobstores.mongodb:存儲(chǔ)在 mongodb
- jobstores.redis:存儲(chǔ)在 redis
- jobstores.rethinkdb:存儲(chǔ)在 rethinkdb
- jobstores.sqlalchemy:支持 sqlalchemy 的數(shù)據(jù)庫(kù)如 mysql,sqlite 等
- jobstores.zookeeper:zookeeper
不同的任務(wù)存儲(chǔ)器可以在調(diào)度器的配置中進(jìn)行配置(見(jiàn)調(diào)度器)
Event 事件
Event 是 APScheduler 在進(jìn)行某些操作時(shí)觸發(fā)相應(yīng)的事件,用戶可以自定義一些函數(shù)來(lái)監(jiān)聽(tīng)這些事件,當(dāng)觸發(fā)某些 Event 時(shí),做一些具體的操作。常見(jiàn)的比如。Job 執(zhí)行異常事件 EVENT_JOB_ERROR。Job 執(zhí)行時(shí)間錯(cuò)過(guò)事件 EVENT_JOB_MISSED。
目前 APScheduler 定義的 Event:
- EVENT_SCHEDULER_STARTED
- EVENT_SCHEDULER_START
- EVENT_SCHEDULER_SHUTDOWN
- EVENT_SCHEDULER_PAUSED
- EVENT_SCHEDULER_RESUMED
- EVENT_EXECUTOR_ADDED
- EVENT_EXECUTOR_REMOVED
- EVENT_JOBSTORE_ADDED
- EVENT_JOBSTORE_REMOVED
- EVENT_ALL_JOBS_REMOVED
- EVENT_JOB_ADDED
- EVENT_JOB_REMOVED
- EVENT_JOB_MODIFIED
- EVENT_JOB_EXECUTED
- EVENT_JOB_ERROR
- EVENT_JOB_MISSED
- EVENT_JOB_SUBMITTED
- EVENT_JOB_MAX_INSTANCES
Listener 表示用戶自定義監(jiān)聽(tīng)的一些 Event,比如當(dāng) Job 觸發(fā)了 EVENT_JOB_MISSED 事件時(shí)可以根據(jù)需求做一些其他處理。
調(diào)度器
Scheduler 是 APScheduler 的核心,所有相關(guān)組件通過(guò)其定義。scheduler 啟動(dòng)之后,將開(kāi)始按照配置的任務(wù)進(jìn)行調(diào)度。除了依據(jù)所有定義 Job 的 trigger 生成的將要調(diào)度時(shí)間喚醒調(diào)度之外。當(dāng)發(fā)生 Job 信息變更時(shí)也會(huì)觸發(fā)調(diào)度。
APScheduler 支持的調(diào)度器方式如下,比較常用的為 BlockingScheduler 和 BackgroundScheduler
- BlockingScheduler:適用于調(diào)度程序是進(jìn)程中唯一運(yùn)行的進(jìn)程,調(diào)用 start 函數(shù)會(huì)阻塞當(dāng)前線程,不能立即返回。
- BackgroundScheduler:適用于調(diào)度程序在應(yīng)用程序的后臺(tái)運(yùn)行,調(diào)用 start 后主線程不會(huì)阻塞。
- AsyncIOScheduler:適用于使用了 asyncio 模塊的應(yīng)用程序。
- GeventScheduler:適用于使用 gevent 模塊的應(yīng)用程序。
- TwistedScheduler:適用于構(gòu)建 Twisted 的應(yīng)用程序。
- QtScheduler:適用于構(gòu)建 Qt 的應(yīng)用程序。
Scheduler 的工作流程
Scheduler 添加 job 流程:
Scheduler 調(diào)度流程:
使用分布式消息系統(tǒng) Celery 實(shí)現(xiàn)定時(shí)任務(wù)
Celery[6] 是一個(gè)簡(jiǎn)單,靈活,可靠的分布式系統(tǒng),用于處理大量消息,同時(shí)為操作提供維護(hù)此類系統(tǒng)所需的工具,也可用于任務(wù)調(diào)度。Celery 的配置比較麻煩,如果你只是需要一個(gè)輕量級(jí)的調(diào)度工具,Celery 不會(huì)是一個(gè)好選擇。
Celery 是一個(gè)強(qiáng)大的分布式任務(wù)隊(duì)列,它可以讓任務(wù)的執(zhí)行完全脫離主程序,甚至可以被分配到其他主機(jī)上運(yùn)行。我們通常使用它來(lái)實(shí)現(xiàn)異步任務(wù)(async task)和定時(shí)任務(wù)(crontab)。異步任務(wù)比如是發(fā)送郵件、或者文件上傳,圖像處理等等一些比較耗時(shí)的操作 ,定時(shí)任務(wù)是需要在特定時(shí)間執(zhí)行的任務(wù)。
需要注意,celery 本身并不具備任務(wù)的存儲(chǔ)功能,在調(diào)度任務(wù)的時(shí)候肯定是要把任務(wù)存起來(lái)的,因此在使用 celery 的時(shí)候還需要搭配一些具備存儲(chǔ)、訪問(wèn)功能的工具,比如:消息隊(duì)列、Redis 緩存、數(shù)據(jù)庫(kù)等。官方推薦的是消息隊(duì)列 RabbitMQ,有些時(shí)候使用 Redis 也是不錯(cuò)的選擇。
它的架構(gòu)組成如下圖:
Celery 架構(gòu),它采用典型的生產(chǎn)者-消費(fèi)者模式,主要由以下部分組成:
- Celery Beat,任務(wù)調(diào)度器,Beat 進(jìn)程會(huì)讀取配置文件的內(nèi)容,周期性地將配置中到期需要執(zhí)行的任務(wù)發(fā)送給任務(wù)隊(duì)列。
- Producer:需要在隊(duì)列中進(jìn)行的任務(wù),一般由用戶、觸發(fā)器或其他操作將任務(wù)入隊(duì),然后交由 workers 進(jìn)行處理。調(diào)用了 Celery 提供的 API、函數(shù)或者裝飾器而產(chǎn)生任務(wù)并交給任務(wù)隊(duì)列處理的都是任務(wù)生產(chǎn)者。
- Broker,即消息中間件,在這指任務(wù)隊(duì)列本身,Celery 扮演生產(chǎn)者和消費(fèi)者的角色,brokers 就是生產(chǎn)者和消費(fèi)者存放/獲取產(chǎn)品的地方(隊(duì)列)。
- Celery Worker,執(zhí)行任務(wù)的消費(fèi)者,從隊(duì)列中取出任務(wù)并執(zhí)行。通常會(huì)在多臺(tái)服務(wù)器運(yùn)行多個(gè)消費(fèi)者來(lái)提高執(zhí)行效率。
- Result Backend:任務(wù)處理完后保存狀態(tài)信息和結(jié)果,以供查詢。Celery 默認(rèn)已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
實(shí)際應(yīng)用中,用戶從 Web 前端發(fā)起一個(gè)請(qǐng)求,我們只需要將請(qǐng)求所要處理的任務(wù)丟入任務(wù)隊(duì)列 broker 中,由空閑的 worker 去處理任務(wù)即可,處理的結(jié)果會(huì)暫存在后臺(tái)數(shù)據(jù)庫(kù) backend 中。我們可以在一臺(tái)機(jī)器或多臺(tái)機(jī)器上同時(shí)起多個(gè) worker 進(jìn)程來(lái)實(shí)現(xiàn)分布式地并行處理任務(wù)。
Celery 定時(shí)任務(wù)實(shí)例:
- Python Celery & RabbitMQ Tutorial[7]
- Celery 配置實(shí)踐筆記[8]
使用數(shù)據(jù)流工具 Apache Airflow 實(shí)現(xiàn)定時(shí)任務(wù)
Apache Airflow[9] 是 Airbnb 開(kāi)源的一款數(shù)據(jù)流程工具,目前是 Apache 孵化項(xiàng)目。以非常靈活的方式來(lái)支持?jǐn)?shù)據(jù)的 ETL 過(guò)程,同時(shí)還支持非常多的插件來(lái)完成諸如 HDFS 監(jiān)控、郵件通知等功能。Airflow 支持單機(jī)和分布式兩種模式,支持 Master-Slave 模式,支持 Mesos 等資源調(diào)度,有非常好的擴(kuò)展性。被大量公司采用。
Airflow 使用 Python 開(kāi)發(fā),它通過(guò) DAGs(Directed Acyclic Graph, 有向無(wú)環(huán)圖)來(lái)表達(dá)一個(gè)工作流中所要執(zhí)行的任務(wù),以及任務(wù)之間的關(guān)系和依賴。比如,如下的工作流中,任務(wù) T1 執(zhí)行完成,T2 和 T3 才能開(kāi)始執(zhí)行,T2 和 T3 都執(zhí)行完成,T4 才能開(kāi)始執(zhí)行。
Airflow 提供了各種 Operator 實(shí)現(xiàn),可以完成各種任務(wù)實(shí)現(xiàn):
- BashOperator – 執(zhí)行 bash 命令或腳本。
- SSHOperator – 執(zhí)行遠(yuǎn)程 bash 命令或腳本(原理同 paramiko 模塊)。
- PythonOperator – 執(zhí)行 Python 函數(shù)。
- EmailOperator – 發(fā)送 Email。
- HTTPOperator – 發(fā)送一個(gè) HTTP 請(qǐng)求。
- MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,執(zhí)行 SQL 任務(wù)。
- DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上這些 Operators 還可以方便的自定義 Operators 滿足個(gè)性化的任務(wù)需求。
一些情況下,我們需要根據(jù)執(zhí)行結(jié)果執(zhí)行不同的任務(wù),這樣工作流會(huì)產(chǎn)生分支。如:
這種需求可以使用 BranchPythonOperator 來(lái)實(shí)現(xiàn)。
Airflow 產(chǎn)生的背景
通常,在一個(gè)運(yùn)維系統(tǒng),數(shù)據(jù)分析系統(tǒng),或測(cè)試系統(tǒng)等大型系統(tǒng)中,我們會(huì)有各種各樣的依賴需求。包括但不限于:
- 時(shí)間依賴:任務(wù)需要等待某一個(gè)時(shí)間點(diǎn)觸發(fā)。
- 外部系統(tǒng)依賴:任務(wù)依賴外部系統(tǒng)需要調(diào)用接口去訪問(wèn)。
- 任務(wù)間依賴:任務(wù) A 需要在任務(wù) B 完成后啟動(dòng),兩個(gè)任務(wù)互相間會(huì)產(chǎn)生影響。
- 資源環(huán)境依賴:任務(wù)消耗資源非常多, 或者只能在特定的機(jī)器上執(zhí)行。
crontab 可以很好地處理定時(shí)執(zhí)行任務(wù)的需求,但僅能管理時(shí)間上的依賴。Airflow 的核心概念 DAG(有向無(wú)環(huán)圖)—— 來(lái)表現(xiàn)工作流。
- Airflow 是一種 WMS,即:它將任務(wù)以及它們的依賴看作代碼,按照那些計(jì)劃規(guī)范任務(wù)執(zhí)行,并在實(shí)際工作進(jìn)程之間分發(fā)需執(zhí)行的任務(wù)。
- Airflow 提供了一個(gè)用于顯示當(dāng)前活動(dòng)任務(wù)和過(guò)去任務(wù)狀態(tài)的優(yōu)秀 UI,并允許用戶手動(dòng)管理任務(wù)的執(zhí)行和狀態(tài)。
- Airflow 中的工作流是具有方向性依賴的任務(wù)集合。
- DAG 中的每個(gè)節(jié)點(diǎn)都是一個(gè)任務(wù),DAG 中的邊表示的是任務(wù)之間的依賴(強(qiáng)制為有向無(wú)環(huán),因此不會(huì)出現(xiàn)循環(huán)依賴,從而導(dǎo)致無(wú)限執(zhí)行循環(huán))。
Airflow 核心概念
- DAGs:即有向無(wú)環(huán)圖 (Directed Acyclic Graph),將所有需要運(yùn)行的 tasks 按照依賴關(guān)系組織起來(lái),描述的是所有 tasks 執(zhí)行順序。
- Operators:可以簡(jiǎn)單理解為一個(gè) class,描述了 DAG 中某個(gè)的 task 具體要做的事。其中,airflow 內(nèi)置了很多 operators,如 BashOperator 執(zhí)行一個(gè) bash 命令,PythonOperator 調(diào)用任意的 Python 函數(shù),EmailOperator 用于發(fā)送郵件,HTTPOperator 用于發(fā)送 HTTP 請(qǐng)求, SqlOperator 用于執(zhí)行 SQL 命令等等,同時(shí),用戶可以自定義 Operator,這給用戶提供了極大的便利性。
- Tasks:Task 是 Operator 的一個(gè)實(shí)例,也就是 DAGs 中的一個(gè) node。
- Task Instance:task 的一次運(yùn)行。Web 界面中可以看到 task instance 有自己的狀態(tài),包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- Task Relationships:DAGs 中的不同 Tasks 之間可以有依賴關(guān)系,如 Task1 >> Task2,表明 Task2 依賴于 Task2 了。通過(guò)將 DAGs 和 Operators 結(jié)合起來(lái),用戶就可以創(chuàng)建各種復(fù)雜的 工作流(workflow)。
Airflow 的架構(gòu)
在一個(gè)可擴(kuò)展的生產(chǎn)環(huán)境中,Airflow 含有以下組件:
- 元數(shù)據(jù)庫(kù):這個(gè)數(shù)據(jù)庫(kù)存儲(chǔ)有關(guān)任務(wù)狀態(tài)的信息。
- 調(diào)度器:Scheduler 是一種使用 DAG 定義結(jié)合元數(shù)據(jù)中的任務(wù)狀態(tài)來(lái)決定哪些任務(wù)需要被執(zhí)行以及任務(wù)執(zhí)行優(yōu)先級(jí)的過(guò)程。調(diào)度器通常作為服務(wù)運(yùn)行。
- 執(zhí)行器:Executor 是一個(gè)消息隊(duì)列進(jìn)程,它被綁定到調(diào)度器中,用于確定實(shí)際執(zhí)行每個(gè)任務(wù)計(jì)劃的工作進(jìn)程。有不同類型的執(zhí)行器,每個(gè)執(zhí)行器都使用一個(gè)指定工作進(jìn)程的類來(lái)執(zhí)行任務(wù)。例如,LocalExecutor 使用與調(diào)度器進(jìn)程在同一臺(tái)機(jī)器上運(yùn)行的并行進(jìn)程執(zhí)行任務(wù)。其他像 CeleryExecutor 的執(zhí)行器使用存在于獨(dú)立的工作機(jī)器集群中的工作進(jìn)程執(zhí)行任務(wù)。
- Workers:這些是實(shí)際執(zhí)行任務(wù)邏輯的進(jìn)程,由正在使用的執(zhí)行器確定。
Worker 的具體實(shí)現(xiàn)由配置文件中的 executor 來(lái)指定,airflow 支持多種 Executor:
- SequentialExecutor: 單進(jìn)程順序執(zhí)行,一般只用來(lái)測(cè)試
- LocalExecutor: 本地多進(jìn)程執(zhí)行
- CeleryExecutor: 使用 Celery 進(jìn)行分布式任務(wù)調(diào)度
- DaskExecutor:使用 Dask[10] 進(jìn)行分布式任務(wù)調(diào)度
- KubernetesExecutor: 1.10.0 新增,創(chuàng)建臨時(shí) POD 執(zhí)行每次任務(wù)
生產(chǎn)環(huán)境一般使用 CeleryExecutor 和 KubernetesExecutor。
使用 CeleryExecutor 的架構(gòu)如圖:
使用 KubernetesExecutor 的架構(gòu)如圖: