在 Python 中接管鍵盤中斷信號
假設(shè)有這樣一個需求,你需要從 Redis 中持續(xù)不斷讀取數(shù)據(jù),并把這些數(shù)據(jù)寫入到 MongoDB 中。你可能會這樣寫代碼:
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- while True:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- handler.insert_one(data)
但這樣寫有一個問題,就是每來一條數(shù)據(jù)都要連接一次 MongoDB,大量時間浪費在了網(wǎng)絡(luò) I/O上。
于是大家會把代碼改成下面這樣:
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- to_be_insert = []
- while True:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- to_be_insert.append(data)
- if len(to_be_insert) >= 1000:
- handler.insert_many(to_be_insert)
- to_be_insert = []
每湊夠1000條數(shù)據(jù),批量寫入到 MongoDB 中。
現(xiàn)在又面臨另外一個問題。假設(shè)因為某種原因,我需要更新這個程序,于是我按下了鍵盤上的Ctrl + C強制關(guān)閉了這個程序。而此時to_be_insert列表里面有999條數(shù)據(jù)將會永久丟失——它們已經(jīng)被從 Redis 中刪除了,但又沒有來得及寫入 MongoDB 中。
我想實現(xiàn),當我按下 Ctrl + C 時,程序不再從 Redis 中讀取數(shù)據(jù),但會先把to_be_insert中的數(shù)據(jù)(無論有幾條)都插入 MongoDB 中。最后再關(guān)閉程序。
要實現(xiàn)這個需求,就必須在我們按下Ctrl + C時,程序還能繼續(xù)運行一段代碼。可問題是按下Ctrl + C時,程序就直接結(jié)束了,如何還能再運行一段代碼?
實際上,當我們按下鍵盤上的Ctrl + C時,Python 收到一個名為SIGINT的信號。具體規(guī)則可以閱讀官方文檔。收到信號以后,Python 會調(diào)用一個信號回調(diào)函數(shù)。只不過默認的回調(diào)函數(shù)就是讓程序拋出一個 KeyboardInterrupt異常導(dǎo)致程序關(guān)閉?,F(xiàn)在,我們可以設(shè)法讓 Python 使用我們自定義的一段函數(shù)來作為信號回調(diào)函數(shù)。
要使用信號,我們需用導(dǎo)入 Python 的signal庫。然后自定義一個信號回調(diào)函數(shù),當 Python 收到某個信號時,調(diào)用這個函數(shù)。
所以我們修改一下上面的代碼:
- import signal
- import json
- import redis
- import pymongo
- client = redis.Redis()
- handler = pymongo.MongoClient().example.col
- stop = False
- def keyboard_handler(signum, frame):
- global stop
- stop = True
- signal.signal(signal.SIGINT, keyboard_handler)
- to_be_insert = []
- while not stop:
- data_raw = client.blpop('data', timeout=300)
- if not data_raw:
- continue
- data = json.loads(data_raw[1].decode())
- to_be_insert.append(data)
- if len(to_be_insert) >= 1000:
- handler.insert_many(to_be_insert)
- to_be_insert = []
- if to_be_insert:
- handler.insert_many(to_be_insert)
我們定義了一個全局變量stop,默認為 False,所以默認情況下,while not stop所在的循環(huán)體會持續(xù)運行。
我們定義了一個函數(shù)keyboard_handler,它的作用是修改全局變量stop為 True。需要注意的是,在函數(shù)里面修改全局變量,必須先使用global 變量名聲明這個變量為全局變量。否則無法修改。
修改以后,while not stop循環(huán)停止,于是程序進入:
- if to_be_insert:
- handler.insert_many(to_be_insert)
只要列表里面有數(shù)據(jù),就會批量插入 MongoDB 中。然后程序結(jié)束。
整段代碼的關(guān)鍵就在signal.signal(signal.SIGINT, keyboard_handler)這里把信號SIGINT與函數(shù)keyboard_handler關(guān)聯(lián)上了,于是,在上面這段代碼運行的任何時候,只要按下鍵盤的Ctrl + C,程序就會進入keyboard_handler函數(shù)里面,優(yōu)先執(zhí)行這個函數(shù)里面的代碼。執(zhí)行完成以后,回到之前中斷的地方,繼續(xù)執(zhí)行之前沒有完成的代碼。而由于在函數(shù)里面我已經(jīng)修改了stop的值,所以原來的循環(huán)不能繼續(xù)執(zhí)行,于是進入最后的收尾工作。
需要注意的是,如果你的整個代碼全都是使用 Python 寫的,那么 signal可以在你程序的任何階段觸發(fā),只要你按下 Ctrl + C,立刻就會進入設(shè)置好的信號回調(diào)函數(shù)中。
但如果你的代碼中,有一部分代碼是使用 C 語言寫的,那么當你按下Ctrl + C以后,可能需要等這段C 語言的代碼運行完成以后,才會進入你設(shè)置的信號回調(diào)函數(shù)中。