自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

高效定時(shí)任務(wù)處理:深入學(xué)習(xí)Python中APScheduler庫的奧秘

開發(fā)
本文將從入門到精通地介紹APScheduler庫的使用方法,帶你掌握在Python中實(shí)現(xiàn)定時(shí)任務(wù)的技巧。

APScheduler是Python中一個(gè)強(qiáng)大的第三方庫,用于在后臺(tái)執(zhí)行定時(shí)任務(wù)。它允許我們根據(jù)設(shè)定的時(shí)間間隔、日期規(guī)則或特定時(shí)間來執(zhí)行任務(wù),適用于定時(shí)執(zhí)行腳本、定時(shí)發(fā)送郵件、定時(shí)處理數(shù)據(jù)等場景。APScheduler的功能使得在Python中實(shí)現(xiàn)定時(shí)任務(wù)變得非常簡單和高效。本文將從入門到精通地介紹APScheduler庫的使用方法,帶你掌握在Python中實(shí)現(xiàn)定時(shí)任務(wù)的技巧。

1. 安裝和導(dǎo)入

首先,我們需要安裝APScheduler庫??梢允褂胮ip命令進(jìn)行安裝:

pip install apscheduler

安裝完成后,我們可以在Python代碼中導(dǎo)入APScheduler:

from apscheduler.schedulers.background import BackgroundScheduler

2. 創(chuàng)建定時(shí)任務(wù)

APScheduler提供了BackgroundScheduler和BlockingScheduler兩種類型的調(diào)度器,用于創(chuàng)建定時(shí)任務(wù)。BackgroundScheduler在后臺(tái)運(yùn)行,不會(huì)阻塞主線程;而BlockingScheduler會(huì)阻塞主線程直到所有任務(wù)完成。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們首先創(chuàng)建了一個(gè)后臺(tái)調(diào)度器scheduler,然后定義了一個(gè)名為job的任務(wù)函數(shù),在其中打印當(dāng)前時(shí)間。使用scheduler.add_job()添加了一個(gè)定時(shí)任務(wù),設(shè)置為每隔5秒執(zhí)行一次。然后,我們啟動(dòng)了調(diào)度器scheduler,讓定時(shí)任務(wù)在后臺(tái)執(zhí)行。主線程等待20秒后結(jié)束,并調(diào)用scheduler.shutdown()關(guān)閉調(diào)度器。

3. 定時(shí)任務(wù)觸發(fā)器

APScheduler提供了多種觸發(fā)器類型,用于設(shè)置定時(shí)任務(wù)的觸發(fā)條件。 interval觸發(fā)器: 按照設(shè)定的時(shí)間間隔來觸發(fā)任務(wù)。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用'interval'觸發(fā)器,設(shè)置任務(wù)每隔5秒執(zhí)行一次。 cron觸發(fā)器: 使用類似于Linux中cron表達(dá)式的規(guī)則來觸發(fā)任務(wù),可以精確到秒。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每天的13點(diǎn)30分觸發(fā)任務(wù)
scheduler.add_job(job, 'cron', hour=13, minute=30)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(60)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用'cron'觸發(fā)器,設(shè)置任務(wù)每天的13點(diǎn)30分觸發(fā)。 date觸發(fā)器: 在指定的時(shí)間點(diǎn)觸發(fā)任務(wù)。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),設(shè)置任務(wù)在2023年7月31日10點(diǎn)30分觸發(fā)
scheduler.add_job(job, 'date', run_date='2023-07-31 10:30:00')

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(60)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用'date'觸發(fā)器,設(shè)置任務(wù)在2023年7月31日10點(diǎn)30分觸發(fā)。

4. 任務(wù)存儲(chǔ)

APScheduler支持將任務(wù)存儲(chǔ)在不同的后端存儲(chǔ)中,如內(nèi)存、數(shù)據(jù)庫等。默認(rèn)情況下,任務(wù)是存儲(chǔ)在內(nèi)存中的。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用默認(rèn)的內(nèi)存存儲(chǔ)來存儲(chǔ)任務(wù)。 如果需要將任務(wù)存儲(chǔ)在數(shù)據(jù)庫中,可以使用jobstores參數(shù)來設(shè)置。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 創(chuàng)建數(shù)據(jù)庫存儲(chǔ)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用了SQLAlchemyJobStore來將任務(wù)存儲(chǔ)在SQLite數(shù)據(jù)庫中。

5. 并發(fā)執(zhí)行

默認(rèn)情況下,APScheduler會(huì)將任務(wù)串行執(zhí)行,也就是說一個(gè)任務(wù)結(jié)束后才會(huì)執(zhí)行下一個(gè)任務(wù)。如果希望并發(fā)執(zhí)行多個(gè)任務(wù),可以使用max_instances參數(shù)來設(shè)置。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job(index):
    print(f"定時(shí)任務(wù){(diào)index}執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次,最多并發(fā)3個(gè)任務(wù)
scheduler.add_job(job, 'interval', seconds=5, args=[1], max_instances=3)
scheduler.add_job(job, 'interval', seconds=5, args=[2], max_instances=3)
scheduler.add_job(job, 'interval', seconds=5, args=[3], max_instances=3)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用了args參數(shù)傳遞參數(shù)給任務(wù)函數(shù),并使用max_instances參數(shù)設(shè)置最多并發(fā)3個(gè)任務(wù)。

6. 阻塞和非阻塞

APScheduler提供了阻塞和非阻塞兩種調(diào)度器類型。 阻塞調(diào)度器: 在調(diào)度器啟動(dòng)后,會(huì)阻塞主線程直到所有任務(wù)完成。

from apscheduler.schedulers.blocking import BlockingScheduler
import time

# 創(chuàng)建阻塞調(diào)度器
scheduler = BlockingScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

print("主線程結(jié)束")

非阻塞調(diào)度器: 在調(diào)度器啟動(dòng)后,不會(huì)阻塞主線程。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們分別使用BlockingScheduler和BackgroundScheduler創(chuàng)建了阻塞和非阻塞調(diào)度器。

7. 錯(cuò)誤處理

在任務(wù)執(zhí)行過程中,可能會(huì)出現(xiàn)異常。APScheduler提供了異常處理機(jī)制,我們可以通過try...except...捕獲任務(wù)函數(shù)中的異常,并進(jìn)行相應(yīng)的處理。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    try:
        print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))
        # 拋出一個(gè)異常
        raise ValueError("任務(wù)出現(xiàn)異常")
    except Exception as e:
        print("任務(wù)執(zhí)行過程中發(fā)生異常:", str(e))

        # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們在任務(wù)函數(shù)中拋出了一個(gè)ValueError異常,并通過try...except...捕獲并輸出了異常信息。

8. 立即執(zhí)行任務(wù)

有時(shí)候我們可能需要立即執(zhí)行一個(gè)任務(wù),而不是等到下次觸發(fā)時(shí)間。APScheduler提供了run_job方法來立即執(zhí)行任務(wù)。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 立即執(zhí)行任務(wù)
scheduler.run_job(job)

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們使用scheduler.run_job(job)方法立即執(zhí)行了任務(wù)。

9. 調(diào)度器持久化

在實(shí)際應(yīng)用中,我們可能需要將調(diào)度器的配置保存到文件中,以便在下次啟動(dòng)時(shí)恢復(fù)。

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
import time

# 創(chuàng)建數(shù)據(jù)庫存儲(chǔ)
jobstores = {
    'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}

# 創(chuàng)建后臺(tái)調(diào)度器,并指定jobstores參數(shù)
scheduler = BackgroundScheduler(jobstores=jobstores)

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們創(chuàng)建了一個(gè)數(shù)據(jù)庫存儲(chǔ)jobstores,并在創(chuàng)建后臺(tái)調(diào)度器時(shí)指定了jobstores參數(shù)。這樣,在調(diào)度器運(yùn)行過程中,任務(wù)的配置將會(huì)被持久化到數(shù)據(jù)庫中。

10. 任務(wù)監(jiān)聽器

APScheduler提供了任務(wù)監(jiān)聽器,用于監(jiān)聽任務(wù)的狀態(tài)變化。我們可以通過add_listener方法添加監(jiān)聽器,并在任務(wù)狀態(tài)發(fā)生變化時(shí)進(jìn)行相應(yīng)的處理。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次
scheduler.add_job(job, 'interval', seconds=5)

# 定義任務(wù)監(jiān)聽器
def my_listener(event):
    if event.exception:
        print("任務(wù)執(zhí)行過程中發(fā)生異常:", str(event.exception))
    else:
        print("任務(wù)執(zhí)行成功")

        # 添加任務(wù)監(jiān)聽器
scheduler.add_listener(my_listener, mask='all')

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(20)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們創(chuàng)建了一個(gè)任務(wù)監(jiān)聽器my_listener,并在任務(wù)執(zhí)行過程中通過if...else...判斷是否出現(xiàn)異常。然后通過scheduler.add_listener(my_listener, mask='all')方法添加了監(jiān)聽器。

11. 移除定時(shí)任務(wù)

如果我們希望在調(diào)度器運(yùn)行過程中移除某個(gè)定時(shí)任務(wù),可以使用scheduler.remove_job(job_id)方法。

from apscheduler.schedulers.background import BackgroundScheduler
import time

# 創(chuàng)建后臺(tái)調(diào)度器
scheduler = BackgroundScheduler()

# 定義任務(wù)函數(shù)
def job():
    print("定時(shí)任務(wù)執(zhí)行:", time.strftime("%Y-%m-%d %H:%M:%S"))

    # 添加定時(shí)任務(wù),每隔5秒執(zhí)行一次,并獲取任務(wù)ID
job_id = scheduler.add_job(job, 'interval', seconds=5).id

# 啟動(dòng)調(diào)度器
scheduler.start()

# 主線程等待一段時(shí)間后移除定時(shí)任務(wù)
time.sleep(10)
scheduler.remove_job(job_id)

# 主線程等待一段時(shí)間后結(jié)束
time.sleep(10)

# 關(guān)閉調(diào)度器
scheduler.shutdown()

print("主線程結(jié)束")

在上述代碼中,我們通過scheduler.add_job(job, 'interval', seconds=5).id獲取了定時(shí)任務(wù)的ID,并使用scheduler.remove_job(job_id)移除了定時(shí)任務(wù)。

總結(jié)

通過本文的介紹,我們學(xué)習(xí)了APScheduler庫的基本用法,包括創(chuàng)建定時(shí)任務(wù)、定時(shí)任務(wù)觸發(fā)器、任務(wù)存儲(chǔ)、并發(fā)執(zhí)行、阻塞和非阻塞調(diào)度器、錯(cuò)誤處理、立即執(zhí)行任務(wù)、調(diào)度器持久化、任務(wù)監(jiān)聽器和移除定時(shí)任務(wù)等。APScheduler為Python開發(fā)者提供了一個(gè)強(qiáng)大的定時(shí)任務(wù)調(diào)度框架,使得在Python中實(shí)現(xiàn)定時(shí)任務(wù)變得非常簡單和高效。掌握APScheduler的使用將為我們的項(xiàng)目和程序帶來很大的便利。

責(zé)任編輯:趙寧寧 來源: 子午Python
相關(guān)推薦

2023-12-19 08:09:06

Python定時(shí)任務(wù)Cron表達(dá)式

2024-11-04 16:01:01

2023-12-08 14:42:17

Python開發(fā)

2010-01-07 13:38:41

Linux定時(shí)任務(wù)

2023-11-16 09:30:27

系統(tǒng)任務(wù)

2023-11-07 07:47:35

Topic線程PUSH

2017-03-13 09:12:00

TCP數(shù)據(jù)結(jié)構(gòu)請求包

2024-01-03 10:15:59

Python函數(shù)

2024-05-31 13:07:29

.NET Core定時(shí)任務(wù)編程

2010-03-10 15:47:58

crontab定時(shí)任務(wù)

2024-09-09 08:11:12

2020-04-01 16:10:02

PythonAPScheduler調(diào)度

2021-04-16 13:20:41

ZeitLinux工具

2024-05-13 09:49:30

.NETQuartz庫Cron表達(dá)式

2024-01-31 08:38:57

Python定時(shí)任務(wù)函數(shù)

2021-11-22 12:35:40

Python命令定時(shí)任務(wù)

2020-08-05 07:37:29

任務(wù)系統(tǒng)定時(shí)

2009-07-14 17:30:21

Jython語法學(xué)習(xí)

2010-10-09 10:10:55

JavaScriptFunction對象

2009-11-17 14:13:34

PHP配置
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號