Nodejs線程池的設(shè)計(jì)與實(shí)現(xiàn)
本文轉(zhuǎn)載自微信公眾號(hào)「編程雜技」,作者theanarkh。轉(zhuǎn)載本文請(qǐng)聯(lián)系編程雜技公眾號(hào)。
前言:之前的版本不方便開(kāi)放,重新設(shè)計(jì)了一版nodejs的線程池庫(kù),本文介紹該庫(kù)的一些設(shè)計(jì)和實(shí)現(xiàn)。
nodejs雖然提供了線程的能力,但是很多時(shí)候,往往不能直接使用線程或者無(wú)限制地創(chuàng)建線程,比如我們有一個(gè)功能是cpu密集型的,如果一個(gè)請(qǐng)求就開(kāi)一個(gè)線程,這很明顯不是最好的實(shí)踐,這時(shí)候,我們需要使用池化的技術(shù),本文介紹在nodejs線程模塊的基礎(chǔ)上,如何設(shè)計(jì)和實(shí)現(xiàn)一個(gè)線程池庫(kù)(https://github.com/theanarkh/nodejs-threadpool或npm i nodejs-threadpool )。下面是線程池的總體架構(gòu)。
設(shè)計(jì)一個(gè)線程池,在真正寫(xiě)代碼之前,有很多設(shè)計(jì)需要考慮,大概如下:
1任務(wù)隊(duì)列的設(shè)計(jì),一個(gè)隊(duì)列,多個(gè)線程互斥訪問(wèn),或者每個(gè)線程一個(gè)隊(duì)列,不需要互斥訪問(wèn)。
2 線程退出的設(shè)計(jì),可以由主線程檢測(cè)空閑線程,然后使子線程退出。或者子線程退出,通知主線程??臻e不一定是沒(méi)有任務(wù)就退出,可以設(shè)計(jì)空閑時(shí)間達(dá)到閾值后退出,因?yàn)閯?chuàng)建線程是有時(shí)間開(kāi)銷(xiāo)的。
3 任務(wù)數(shù)的設(shè)計(jì),每個(gè)線程可以有個(gè)任務(wù)數(shù),還可以增加一個(gè)總?cè)蝿?wù)數(shù),即全部線程任務(wù)數(shù)加起來(lái)
4 選擇線程的設(shè)計(jì),選擇任務(wù)數(shù)最少的線程。
5 線程類(lèi)型的設(shè)計(jì),可以區(qū)分核心線程和預(yù)備線程,任務(wù)少的時(shí)候,核心線程處理就行。任務(wù)多也創(chuàng)建預(yù)備線程幫忙處理。
6 線程池類(lèi)型的設(shè)計(jì),cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。
7 支持任務(wù)的取消和超時(shí)機(jī)制,防止一個(gè)任務(wù)時(shí)間過(guò)長(zhǎng)或者死循環(huán)。
本文介紹的線程池具體設(shè)計(jì)思想如下(參考java):
1 主線程維護(hù)一個(gè)隊(duì)列,子線程的任務(wù)由子線程負(fù)責(zé)分發(fā),不需要互斥訪問(wèn),子線程也不需要維護(hù)自己的隊(duì)列。
2 線程退出的設(shè)計(jì),主線程負(fù)責(zé)檢查子線程空閑時(shí)間是否達(dá)到閾值,是則使子線程退出。
3 任務(wù)數(shù)的設(shè)計(jì),主線程負(fù)責(zé)管理任務(wù)個(gè)數(shù)并應(yīng)有相應(yīng)的策略。
4 選擇線程的設(shè)計(jì),選擇任務(wù)數(shù)最少的線程。
5 線程類(lèi)型的設(shè)計(jì),區(qū)分核心線程和預(yù)備線程,任務(wù)少的時(shí)候,核心線程處理就行。任務(wù)多也創(chuàng)建預(yù)備線程幫忙處理。
6 線程池類(lèi)型的設(shè)計(jì),cpu密集型的,線程數(shù)等于核數(shù),否則自定義線程數(shù)就行。
7 支持任務(wù)的取消和超時(shí)機(jī)制,超時(shí)或者取消的時(shí)候,主線程判斷任務(wù)是待執(zhí)行還是正在執(zhí)行,如果是待執(zhí)行則從任務(wù)隊(duì)列中刪除,如果是正在執(zhí)行則殺死對(duì)應(yīng)的子線程。下面我們看一下具體的設(shè)計(jì)。
1 主線程和子線程通信的數(shù)據(jù)結(jié)構(gòu)
- // 任務(wù)類(lèi),一個(gè)任務(wù)對(duì)應(yīng)一個(gè)id
- class Work {
- constructor({workId, filename, options}) {
- // 任務(wù)id
- this.workId = workId;
- // 任務(wù)邏輯,字符串或者js文件路徑
- this.filename = filename;
- // 任務(wù)返回的結(jié)果
- this.data = null;
- // 任務(wù)返回的錯(cuò)誤
- this.error = null;
- // 執(zhí)行任務(wù)時(shí)傳入的參數(shù),用戶定義
- this.options = options;
- }
- }
主線程給子線程分派一個(gè)任務(wù)的時(shí)候,就給子線程發(fā)送一個(gè)Work對(duì)象。在nodejs中線程間通信需要經(jīng)過(guò)序列化和反序列化,所以通信的數(shù)據(jù)結(jié)構(gòu)包括的信息不能過(guò)多。
2 子線程處理任務(wù)邏輯
- const { parentPort } = require('worker_threads');
- const vm = require('vm');
- const { isFunction, isJSFile } = require('./utils');
- // 監(jiān)聽(tīng)主線程提交過(guò)來(lái)的任務(wù)
- parentPort.on('message', async (work) => {
- try {
- const { filename, options } = work;
- let aFunction;
- if (isJSFile(filename)) {
- aFunction = require(filename);
- } else {
- aFunction = vm.runInThisContext(`(${filename})`);
- }
- if (!isFunction(aFunction)) {
- throw new Error('work type error: js file or string');
- }
- work.data = await aFunction(options);
- parentPort.postMessage({event: 'done', work});
- } catch (error) {
- work.error = error.toString();
- parentPort.postMessage({event: 'error', work});
- }
- });
- process.on('uncaughtException', (...rest) => {
- console.error(...rest);
- });
- process.on('unhandledRejection', (...rest) => {
- console.error(...rest);
- });
子線程的邏輯比較簡(jiǎn)單,就是監(jiān)聽(tīng)主線程分派過(guò)來(lái)的任務(wù),然后執(zhí)行任務(wù),執(zhí)行完之后通知主線程。任務(wù)支持js文件和字符串代碼的形式。需要返回一個(gè)Promise或者async函數(shù)。用于用于通知主線程任務(wù)已經(jīng)完成。
3 線程池和業(yè)務(wù)的通信
- // 提供給用戶側(cè)的接口
- class UserWork extends EventEmitter {
- constructor({ workId }) {
- super();
- // 任務(wù)id
- this.workId = workId;
- // 支持超時(shí)取消任務(wù)
- this.timer = null;
- // 任務(wù)狀態(tài)
- this.state = WORK_STATE.PENDDING;
- }
- // 超時(shí)后取消任務(wù)
- setTimeout(timeout) {
- this.timer = setTimeout(() => {
- this.timer && this.cancel() && this.emit('timeout');
- }, ~~timeout);
- }
- // 取消之前設(shè)置的定時(shí)器
- clearTimeout() {
- clearTimeout(this.timer);
- this.timer = null;
- }
- // 直接取消任務(wù),如果執(zhí)行完了就不能取消了,this.terminate是動(dòng)態(tài)設(shè)置的
- cancel() {
- if (this.state === WORK_STATE.END || this.state === WORK_STATE.CANCELED) {
- return false;
- } else {
- this.terminate();
- return true;
- }
- }
- // 修改任務(wù)狀態(tài)
- setState(state) {
- this.state = state;
- }
- }
業(yè)務(wù)提交一個(gè)任務(wù)給線程池的時(shí)候,線程池會(huì)返回一個(gè)UserWork類(lèi),業(yè)務(wù)側(cè)通過(guò)UserWork類(lèi)和線程池通信。
4 管理子線程的數(shù)據(jù)結(jié)構(gòu)
- // 管理子線程的數(shù)據(jù)結(jié)構(gòu)
- class Thread {
- constructor({ worker }) {
- // nodejs的Worker對(duì)象,nodejs的worker_threads模塊的Worker
- this.worker = worker;
- // 線程狀態(tài)
- this.state = THREAD_STATE.IDLE;
- // 上次工作的時(shí)間
- this.lastWorkTime = Date.now();
- }
- // 修改線程狀態(tài)
- setState(state) {
- this.state = state;
- }
- // 修改線程最后工作時(shí)間
- setLastWorkTime(time) {
- this.lastWorkTime = time;
- }
- }
線程池中維護(hù)了多個(gè)子線程,Thread類(lèi)用于管理子線程的信息。
5 線程池 線程池的實(shí)現(xiàn)是核心,我們分為幾個(gè)部分講。
5.1 支持的配置
- constructor(options = {}) {
- this.options = options;
- // 子線程隊(duì)列
- this.workerQueue = [];
- // 核心線程數(shù)
- this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
- // 線程池最大線程數(shù),如果不支持動(dòng)態(tài)擴(kuò)容則最大線程數(shù)等于核心線程數(shù)
- this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
- // 超過(guò)任務(wù)隊(duì)列長(zhǎng)度時(shí)的處理策略
- this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
- // 是否預(yù)創(chuàng)建子線程
- this.preCreate = options.preCreate === true;
- // 線程最大空閑時(shí)間,達(dá)到后自動(dòng)退出
- this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
- // 是否預(yù)創(chuàng)建線程池
- this.preCreate && this.preCreateThreads();
- // 保存線程池中任務(wù)對(duì)應(yīng)的UserWork
- this.workPool = {};
- // 線程池中當(dāng)前可用的任務(wù)id,每次有新任務(wù)時(shí)自增1
- this.workId = 0;
- // 線程池中的任務(wù)隊(duì)列
- this.queue = [];
- // 線程池總?cè)蝿?wù)數(shù)
- this.totalWork = 0;
- // 支持的最大任務(wù)數(shù)
- this.maxWork = ~~options.maxWork || config.MAX_WORK;
- // 處理任務(wù)的超時(shí)時(shí)間,全局配置
- this.timeout = ~~options.timeout;
- this.pollIdle();
- }
上面的代碼列出了線程池所支持的能力。
5.2 創(chuàng)建線程
- newThread() {
- const worker = new Worker(workerPath);
- const thread = new Thread({worker});
- this.workerQueue.push(thread);
- const threadId = worker.threadId;
- worker.on('exit', () => {
- // 找到該線程對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu),然后刪除該線程的數(shù)據(jù)結(jié)構(gòu)
- const position = this.workerQueue.findIndex(({worker}) => {
- return worker.threadId === threadId;
- });
- const exitedThread = this.workerQueue.splice(position, 1);
- // 退出時(shí)狀態(tài)是BUSY說(shuō)明還在處理任務(wù)(非正常退出)
- this.totalWork -= exitedThread.state === THREAD_STATE.BUSY ? 1 : 0;
- });
- // 和子線程通信
- worker.on('message', (result) => {
- const {
- work,
- event,
- } = result;
- const { data, error, workId } = work;
- // 通過(guò)workId拿到對(duì)應(yīng)的userWork
- const userWork = this.workPool[workId];
- // 不存在說(shuō)明任務(wù)被取消了
- if (!userWork) {
- return;
- }
- // 修改線程池?cái)?shù)據(jù)結(jié)構(gòu)
- this.endWork(userWork);
- // 修改線程數(shù)據(jù)結(jié)構(gòu)
- thread.setLastWorkTime(Date.now());
- // 還有任務(wù)則通知子線程處理,否則修改子線程狀態(tài)為空閑
- if (this.queue.length) {
- // 從任務(wù)隊(duì)列拿到一個(gè)任務(wù)交給子線程
- this.submitWorkToThread(thread, this.queue.shift());
- } else {
- thread.setState(THREAD_STATE.IDLE);
- }
- switch(event) {
- case 'done':
- // 通知用戶,任務(wù)完成
- userWork.emit('done', data);
- break;
- case 'error':
- // 通知用戶,任務(wù)出錯(cuò)
- if (EventEmitter.listenerCount(userWork, 'error')) {
- userWork.emit('error', error);
- }
- break;
- default: break;
- }
- });
- worker.on('error', (...rest) => {
- console.error(...rest);
- });
- return thread;
- }
創(chuàng)建線程,并保持線程對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)、退出、通信管理、任務(wù)分派。子線程執(zhí)行完任務(wù)后,會(huì)通知線程池,主線程通知用戶。
5.3 選擇線程
- selectThead() {
- // 找出空閑的線程,把任務(wù)交給他
- for (let i = 0; i < this.workerQueue.length; i++) {
- if (this.workerQueue[i].state === THREAD_STATE.IDLE) {
- return this.workerQueue[i];
- }
- }
- // 沒(méi)有空閑的則隨機(jī)選擇一個(gè)
- return this.workerQueue[~~(Math.random() * this.workerQueue.length)];
- }
當(dāng)用戶給線程池提交一個(gè)任務(wù)時(shí),線程池會(huì)選擇一個(gè)空閑的線程處理該任務(wù)。如果沒(méi)有可用線程則任務(wù)插入待處理隊(duì)列等待處理。
5.4 提交任務(wù)
- // 給線程池提交一個(gè)任務(wù)
- submit(filename, options = {}) {
- return new Promise(async (resolve, reject) => {
- let thread;
- // 沒(méi)有線程則創(chuàng)建一個(gè)
- if (this.workerQueue.length) {
- thread = this.selectThead();
- // 該線程還有任務(wù)需要處理
- if (thread.state === THREAD_STATE.BUSY) {
- // 子線程個(gè)數(shù)還沒(méi)有達(dá)到核心線程數(shù),則新建線程處理
- if (this.workerQueue.length < this.coreThreads) {
- thread = this.newThread();
- } else if (this.totalWork + 1 > this.maxWork){
- // 總?cè)蝿?wù)數(shù)已達(dá)到閾值,還沒(méi)有達(dá)到線程數(shù)閾值,則創(chuàng)建
- if(this.workerQueue.length < this.maxThreads) {
- thread = this.newThread();
- } else {
- // 處理溢出的任務(wù)
- switch(this.discardPolicy) {
- case DISCARD_POLICY.ABORT:
- return reject(new Error('queue overflow'));
- case DISCARD_POLICY.CALLER_RUN:
- const workId = this.generateWorkId();
- const userWork = new UserWork({workId});
- userWork.setState(WORK_STATE.RUNNING);
- userWork.terminate = () => {
- userWork.setState(WORK_STATE.CANCELED);
- };
- this.timeout && userWork.setTimeout(this.timeout);
- resolve(userWork);
- try {
- let aFunction;
- if (isJSFile(filename)) {
- aFunction = require(filename);
- } else {
- aFunction = vm.runInThisContext(`(${filename})`);
- }
- if (!isFunction(aFunction)) {
- throw new Error('work type error: js file or string');
- }
- const result = await aFunction(options);
- // 延遲通知,讓用戶有機(jī)會(huì)取消或者注冊(cè)事件
- setImmediate(() => {
- if (userWork.state !== WORK_STATE.CANCELED) {
- userWork.setState(WORK_STATE.END);
- userWork.emit('done', result);
- }
- });
- } catch (error) {
- setImmediate(() => {
- if (userWork.state !== WORK_STATE.CANCELED) {
- userWork.setState(WORK_STATE.END);
- userWork.emit('error', error.toString());
- }
- });
- }
- return;
- case DISCARD_POLICY.OLDEST_DISCARD:
- const work = this.queue.shift();
- // maxWork為1時(shí),work會(huì)為空
- if (work && this.workPool[work.workId]) {
- this.cancelWork(this.workPool[work.workId]);
- } else {
- return reject(new Error('no work can be discarded'));
- }
- break;
- case DISCARD_POLICY.DISCARD:
- return reject(new Error('discard'));
- case DISCARD_POLICY.NOT_DISCARD:
- break;
- default:
- break;
- }
- }
- }
- }
- } else {
- thread = this.newThread();
- }
- // 生成一個(gè)任務(wù)id
- const workId = this.generateWorkId();
- // 新建一個(gè)UserWork
- const userWork = new UserWork({workId});
- this.timeout && userWork.setTimeout(this.timeout);
- // 新建一個(gè)work
- const work = new Work({ workId, filename, options });
- // 修改線程池?cái)?shù)據(jù)結(jié)構(gòu),把UserWork和Work關(guān)聯(lián)起來(lái)
- this.addWork(userWork);
- // 選中的線程正在處理任務(wù),則先緩存到任務(wù)隊(duì)列
- if (thread.state === THREAD_STATE.BUSY) {
- this.queue.push(work);
- userWork.terminate = () => {
- this.cancelWork(userWork);
- this.queue = this.queue.filter((node) => {
- return node.workId !== work.workId;
- });
- }
- } else {
- this.submitWorkToThread(thread, work);
- }
- resolve(userWork);
- })
- }
- submitWorkToThread(thread, work) {
- const userWork = this.workPool[work.workId];
- userWork.setState(WORK_STATE.RUNNING);
- // 否則交給線程處理,并修改狀態(tài)和記錄該線程當(dāng)前處理的任務(wù)id
- thread.setState(THREAD_STATE.BUSY);
- thread.worker.postMessage(work);
- userWork.terminate = () => {
- this.cancelWork(userWork);
- thread.setState(THREAD_STATE.DEAD);
- thread.worker.terminate();
- }
- }
- addWork(userWork) {
- userWork.setState(WORK_STATE.PENDDING);
- this.workPool[userWork.workId] = userWork;
- this.totalWork++;
- }
- endWork(userWork) {
- delete this.workPool[userWork.workId];
- this.totalWork--;
- userWork.setState(WORK_STATE.END);
- userWork.clearTimeout();
- }
- cancelWork(userWork) {
- delete this.workPool[userWork.workId];
- this.totalWork--;
- userWork.setState(WORK_STATE.CANCELED);
- userWork.emit('cancel');
- }
提交任務(wù)是線程池暴露給用戶側(cè)的接口,主要處理的邏輯包括,根據(jù)當(dāng)前的策略判斷是否需要新建線程、選擇線程處理任務(wù)、排隊(duì)任務(wù)等,如果任務(wù)數(shù)達(dá)到閾值,則根據(jù)丟棄策略處理該任務(wù)。
5.5 空閑處理
- pollIdle() {
- setTimeout(() => {
- for (let i = 0; i < this.workerQueue.length; i++) {
- const node = this.workerQueue[i];
- if (node.state === THREAD_STATE.IDLE && Date.now() - node.lastWorkTime > this.maxIdleTime) {
- node.worker.terminate();
- }
- }
- this.pollIdle();
- }, 1000);
- }
當(dāng)子線程空閑時(shí)間達(dá)到閾值后,主線程會(huì)殺死子線程,避免浪費(fèi)系統(tǒng)資源??偨Y(jié),這就是線程池具體的設(shè)計(jì)和實(shí)現(xiàn),另外創(chuàng)建線程失敗會(huì)導(dǎo)致主線程掛掉,所以使用線程的時(shí)候,最后新開(kāi)一個(gè)子進(jìn)程來(lái)管理該線程池。