Nodejs多線程的探索和實(shí)踐
本文轉(zhuǎn)載自微信公眾號「編程雜技」,作者theanarkh 。轉(zhuǎn)載本文請聯(lián)系編程雜技公眾號。
1 背景
需求中有以下場景
1 對稱解密、非對稱解密
2 壓縮、解壓
3 大量文件的增刪改查
4 處理大量的字符串,解析協(xié)議
上面的場景都是非常耗時(shí)間的,解密、壓縮、文件操作,nodejs使用了內(nèi)置的線程池支持了異步。但是處理字符串和解析協(xié)議是單純消耗cpu的操作。而且nodejs對解密的支持似乎不是很好。我使用了純js的解密庫,所以無法在nodejs主線程里處理。尤其rsa解密,非常耗時(shí)間。
所以這時(shí)候就要探索解決方案,nodejs提供了多線程的能力。所以自然就選擇了這種方案。但是這只是初步的想法和方案。因?yàn)閚odejs雖然提供了多線程能力,但是沒有提供一個(gè)應(yīng)用層的線程池。所以如果我們單純地使用多線程,一個(gè)請求一個(gè)線程,這顯然不現(xiàn)實(shí)。我們不得不實(shí)現(xiàn)自己的線程池。本文分享的內(nèi)容是這個(gè)線程池的實(shí)現(xiàn)。
線程池的設(shè)計(jì)涉及到很多方面,對于純cpu型的任務(wù),線程數(shù)和cpu核數(shù)要相等才能達(dá)到最優(yōu)的性能,否則過多的線程引起的上下文切換反而會導(dǎo)致性能下降。而對于io型的任務(wù),更多的線程理論上是會更好,因?yàn)榭梢愿绲亟o硬盤發(fā)出命令,磁盤會優(yōu)化并持續(xù)地處理請求,想象一下,如果發(fā)出一個(gè)命令,硬盤處理一個(gè),然后再發(fā)下一個(gè)命令,再處理一個(gè),這樣顯然效率很低。當(dāng)然,線程數(shù)也不是越多越好。線程過多會引起系統(tǒng)負(fù)載過高,過多上下文切換也會帶來性能的下降。下面看一下線程池的實(shí)現(xiàn)方案。
2 設(shè)計(jì)思路
首先根據(jù)配置創(chuàng)建多個(gè)線程(分為預(yù)創(chuàng)建和懶創(chuàng)建),然后對用戶暴露提交任務(wù)的接口,由調(diào)度中心負(fù)責(zé)接收任務(wù),然后根據(jù)策略選擇處理該任務(wù)的線程。子線程一直在輪詢是否有任務(wù)需要處理。處理完通知調(diào)度中心。
下面看一下具體的實(shí)現(xiàn)
2.1 和用戶通信的數(shù)據(jù)結(jié)構(gòu)
- class UserWork extends EventEmitter {
- constructor({ workId, threadId }) {
- super();
- this.workId = workId;
- this.threadId = threadId;
- workPool[workId] = this;
- }
- }
用戶提交任務(wù)的時(shí)候,調(diào)度中心返回一個(gè)UserWork對象。用戶可以使用該對象和調(diào)度中心通信。
2.2 調(diào)度中心的實(shí)現(xiàn)
調(diào)度中心的實(shí)現(xiàn)大致分為以下幾個(gè)邏輯。
2.2.1 初始化
- constructor(options = {}) {
- this.options = options;
- // 線程池總?cè)蝿?wù)數(shù)
- this.totalWork = 0;
- // 子線程隊(duì)列
- this.workerQueue = [];
- // 核心線程數(shù)
- this.coreThreads = ~~options.coreThreads || config.CORE_THREADS;
- // 線程池最大線程數(shù),如果不支持動態(tài)擴(kuò)容則最大線程數(shù)等于核心線程數(shù)
- this.maxThreads = options.expansion !== false ? Math.max(this.coreThreads, config.MAX_THREADS) : this.coreThreads;
- // 工作線程處理任務(wù)的模式
- this.sync = options.sync !== false;
- // 超過任務(wù)隊(duì)列長度時(shí)的處理策略
- this.discardPolicy = options.discardPolicy ? options.discardPolicy : DISCARD_POLICY.NOT_DISCARD;
- // 是否預(yù)創(chuàng)建子線程
- this.preCreate = options.preCreate === true;
- this.maxIdleTime = ~~options.maxIdleTime || config.MAX_IDLE_TIME;
- this.pollIntervalTime = ~~options.pollIntervalTime || config.POLL_INTERVAL_TIME;
- this.maxWork = ~~options.maxWork || config.MAX_WORK;
- // 是否預(yù)創(chuàng)建線程池
- this.preCreate && this.preCreateThreads();
- }
從初始化代碼中我們看到線程池大致支持的能力。
- 核心線程數(shù)
- 最大線程數(shù)
- 過載時(shí)的處理策略,和過載的閾值
- 子線程空閑退出的時(shí)間和輪詢?nèi)蝿?wù)的時(shí)間
- 是否預(yù)創(chuàng)建線程池
- 是否支持動態(tài)擴(kuò)容
核心線程數(shù)是任務(wù)數(shù)沒有達(dá)到閾值時(shí)的工作線程集合。是處理任務(wù)的主力軍。任務(wù)數(shù)達(dá)到閾值后,如果支持動態(tài)擴(kuò)容(可配置)則會創(chuàng)建新的線程去處理更多的任務(wù)。一旦負(fù)載變低,線程空閑時(shí)間達(dá)到閾值則會自動退出。如果擴(kuò)容的線程數(shù)達(dá)到閾值,還有新的任務(wù)到來,則根據(jù)丟棄策略進(jìn)行相關(guān)的處理。
2.2.2 創(chuàng)建線程
- newThread() {
- let { sync } = this;
- const worker = new Worker(workerPath, {workerData: { sync, maxIdleTime: this.maxIdleTime, pollIntervalTime: this.pollIntervalTime, }});
- const node = {
- worker,
- // 該線程處理的任務(wù)數(shù)量
- queueLength: 0,
- };
- this.workerQueue.push(node);
- const threadId = worker.threadId;
- worker.on('exit', (status) => {
- // 異常退出則補(bǔ)充線程,正常退出則不補(bǔ)充
- if (status) {
- this.newThread();
- }
- this.totalWork -= node.queueLength;
- this.workerQueue = this.workerQueue.filter((worker) => {
- return worker.threadId !== threadId;
- });
- });
- // 和子線程通信
- worker.on('message', (result) => {
- const {
- work,
- event,
- } = result;
- const { data, error, workId } = work;
- // 通過workId拿到對應(yīng)的userWorker
- const userWorker = workPool[workId];
- delete workPool[workId];
- // 任務(wù)數(shù)減一
- node.queueLength--;
- this.totalWork--;
- switch(event) {
- case 'done':
- // 通知用戶,任務(wù)完成
- userWorker.emit('done', data);
- break;
- case 'error':
- // 通知用戶,任務(wù)出錯(cuò)
- if (EventEmitter.listenerCount(userWorker, 'error')) {
- userWorker.emit('error', error);
- }
- break;
- default: break;
- }
- });
- worker.on('error', (...rest) => {
- console.log(...rest)
- });
- return node;
- }
創(chuàng)建線程主要是調(diào)用nodejs提供的模塊進(jìn)行創(chuàng)建。然后監(jiān)聽子線程的退出和message、error事件。如果是異常退出則補(bǔ)充線程。調(diào)度中心維護(hù)了一個(gè)子線程的隊(duì)列。記錄了每個(gè)子線程(worker)的實(shí)例和任務(wù)數(shù)。
2.2.3 選擇執(zhí)行任務(wù)的線程
- selectThead() {
- let min = Number.MAX_SAFE_INTEGER;
- let i = 0;
- let index = 0;
- // 找出任務(wù)數(shù)最少的線程,把任務(wù)交給他
- for (; i < this.workerQueue.length; i++) {
- const { queueLength } = this.workerQueue[i];
- if (queueLength < min) {
- index = i;
- min = queueLength;
- }
- }
- return this.workerQueue[index];
- }
選擇策略目前是選擇任務(wù)數(shù)最少的,本來還支持隨機(jī)和輪詢方式,但是貌似沒有什么場景和必要,就去掉了。
2.2.4 暴露提交任務(wù)的接口
- submit(filename, options = {}) {
- return new Promise(async (resolve, reject) => {
- let thread;
- // 沒有線程則創(chuàng)建一個(gè)
- if (this.workerQueue.length) {
- thread = this.selectThead();
- // 任務(wù)隊(duì)列非空
- if (thread.queueLength !== 0) {
- // 子線程個(gè)數(shù)還沒有達(dá)到核心線程數(shù),則新建線程處理
- if (this.workerQueue.length < this.coreThreads) {
- thread = this.newThread();
- } else if (this.totalWork + 1 > this.maxWork){
- // 總?cè)蝿?wù)數(shù)已達(dá)到閾值,還沒有達(dá)到線程數(shù)閾值,則創(chuàng)建
- if(this.workerQueue.length < this.maxThreads) {
- thread = this.newThread();
- } else {
- // 處理溢出的任務(wù)
- switch(this.discardPolicy) {
- case DISCARD_POLICY.ABORT:
- return reject(new Error('queue overflow'));
- case DISCARD_POLICY.CALLER_RUNS:
- const userWork = new UserWork({workId: this.generateWorkId(), threadId});
- try {
- const asyncFunction = require(filename);
- if (!isAsyncFunction(asyncFunction)) {
- return reject(new Error('need export a async function'));
- }
- const result = await asyncFunction(options);
- resolve(userWork);
- setImmediate(() => {
- userWork.emit('done', result);
- });
- } catch (error) {
- resolve(userWork);
- setImmediate(() => {
- userWork.emit('error', error);
- });
- }
- return;
- case DISCARD_POLICY.DISCARD_OLDEST:
- thread.worker.postMessage({cmd: 'delete'});
- break;
- case DISCARD_POLICY.DISCARD:
- return reject(new Error('discard'));
- case DISCARD_POLICY.NOT_DISCARD:
- break;
- default:
- break;
- }
- }
- }
- }
- } else {
- thread = this.newThread();
- }
- // 生成一個(gè)任務(wù)id
- const workId = this.generateWorkId();
- // 新建一個(gè)work,交給對應(yīng)的子線程
- const work = new Work({ workId, filename, options });
- const userWork = new UserWork({workId, threadId: thread.worker.threadId});
- thread.queueLength++;
- this.totalWork++;
- thread.worker.postMessage({cmd: 'add', work});
- resolve(userWork);
- })
- }
提交任務(wù)的函數(shù)比較復(fù)雜,提交一個(gè)任務(wù)的時(shí)候,調(diào)度中心會根據(jù)當(dāng)前的負(fù)載情況和線程數(shù),決定對一個(gè)任務(wù)做如何處理。如果可以處理,則把任務(wù)交給選中的子線程。最后給用戶返回一個(gè)UserWorker對象。
2.3調(diào)度中心和子線程的通信數(shù)據(jù)結(jié)構(gòu)
- class Work {
- constructor({workId, filename, options}) {
- // 任務(wù)id
- this.workId = workId;
- // 文件名
- this.filename = filename;
- // 處理結(jié)果,由用戶代碼返回
- this.data = null;
- // 執(zhí)行出錯(cuò)
- this.error = null;
- // 執(zhí)行時(shí)入?yún)?nbsp;
- this.options = options;
- }
- }
一個(gè)任務(wù)對應(yīng)一個(gè)id,目前只支持文件的執(zhí)行模式,后續(xù)會支持字符串。
2.4 子線程的實(shí)現(xiàn)
子線程的實(shí)現(xiàn)主要分為幾個(gè)部分
2.4.1 監(jiān)聽調(diào)度中心分發(fā)的命令
- parentPort.on('message', ({cmd, work}) => {
- switch(cmd) {
- case 'delete':
- return queue.shift();
- case 'add':
- return queue.push(work);
- }
- });
2.4.2 輪詢是否有任務(wù)需要處理
- function poll() {
- const now = Date.now();
- if (now - lastWorkTime > maxIdleTime && !queue.length) {
- process.exit(0);
- }
- setTimeout(async () => {
- // 處理任務(wù)
- poll();
- }
- }, pollIntervalTime);
- }
- // 輪詢判斷是否有任務(wù)
- poll();
不斷輪詢是否有任務(wù)需要處理,如果沒有并且空閑時(shí)間達(dá)到閾值則退出。
2.4.3 處理任務(wù)
處理任務(wù)模式分為同步和異步
- while(queue.length) {
- const work = queue.shift();
- try {
- const { filename, options } = work;
- const asyncFunction = require(filename);
- if (!isAsyncFunction(asyncFunction)) {
- return;
- }
- lastWorkTime = now;
- const result = await asyncFunction(options);
- work.data = result;
- parentPort.postMessage({event: 'done', work});
- } catch (error) {
- work.error = error.toString();
- parentPort.postMessage({event: 'error', work});
- }
- }
用戶需要導(dǎo)出一個(gè)async函數(shù),使用這種方案主要是為了執(zhí)行時(shí)可以給用戶傳入?yún)?shù)。并且實(shí)現(xiàn)同步。處理完后通知調(diào)度中心。下面是異步處理方式,子線程不需要同步等待用戶的代碼結(jié)果。
- const arr = [];
- while(queue.length) {
- const work = queue.shift();
- try {
- const { filename } = work;
- const asyncFunction = require(filename);
- if (!isAsyncFunction(asyncFunction)) {
- return;
- }
- arr.push({asyncFunction, work});
- } catch (error) {
- work.error = error.toString();
- parentPort.postMessage({event: 'error', work});
- }
- }
- arr.map(async ({asyncFunction, work}) => {
- try {
- const { options } = work;
- lastWorkTime = now;
- const result = await asyncFunction(options);
- work.data = result;
- parentPort.postMessage({event: 'done', work});
- } catch (e) {
- work.error = error.toString();
- parentPort.postMessage({event: 'done', work});
- }
- })
最后還有一些配置和定制化的功能。
- module.exports = {
- // 最大的線程數(shù)
- MAX_THREADS: 50,
- // 線程池最大任務(wù)數(shù)
- MAX_WORK: Infinity,
- // 默認(rèn)核心線程數(shù)
- CORE_THREADS: 10,
- // 最大空閑時(shí)間
- MAX_IDLE_TIME: 10 * 60 * 1000,
- // 子線程輪詢時(shí)間
- POLL_INTERVAL_TIME: 10,
- };
- // 丟棄策略
- const DISCARD_POLICY = {
- // 報(bào)錯(cuò)
- ABORT: 1,
- // 在主線程里執(zhí)行
- CALLER_RUNS: 2,
- // 丟棄最老的的任務(wù)
- DISCARD_OLDEST: 3,
- // 丟棄
- DISCARD: 4,
- // 不丟棄
- NOT_DISCARD: 5,
- };
支持多個(gè)類型的線程池
- class AsyncThreadPool extends ThreadPool {
- constructor(options) {
- super({...options, sync: false});
- }
- }
- class SyncThreadPool extends ThreadPool {
- constructor(options) {
- super({...options, sync: true});
- }
- }
- // cpu型任務(wù)的線程池,線程數(shù)和cpu核數(shù)一樣,不支持動態(tài)擴(kuò)容
- class CPUThreadPool extends ThreadPool {
- constructor(options) {
- super({...options, coreThreads: cores, expansion: false});
- }
- }
- // 線程池只有一個(gè)線程,類似消息隊(duì)列
- class SingleThreadPool extends ThreadPool {
- constructor(options) {
- super({...options, coreThreads: 1, expansion: false });
- }
- }
- // 線程數(shù)固定的線程池,不支持動態(tài)擴(kuò)容線程
- class FixedThreadPool extends ThreadPool {
- constructor(options) {
- super({ ...options, expansion: false });
- }
- }
這就是線程池的實(shí)現(xiàn),有很多細(xì)節(jié)還需要思考。下面是一個(gè)性能測試的例子。
3 測試
- const { MAX } = require('./constants');
- module.exports = async function() {
- let ret = 0;
- let i = 0;
- while(i++ < MAX) {
- ret++;
- Buffer.from(String(Math.random())).toString('base64');
- }
- return ret;
- }
在服務(wù)器以單線程和多線程的方式執(zhí)行以上代碼,下面是MAX為10000和100000時(shí),使用CPUThreadPool類型線程池的性能對比(具體代碼參考https://github.com/theanarkh/nodejs-threadpool)。
10000
單線程 [ 358.35, 490.93, 705.23, 982.6, 1155.72 ]
多線程 [ 379.3, 230.35, 315.52, 429.4, 496.04 ]
100000
單線程 [ 2485.5, 4454.63, 6894.5, 9173.16, 11011.16 ]
多線程 [ 1791.75, 2787.15, 3275.08, 4093.39, 3674.91 ]
我們發(fā)現(xiàn)這個(gè)數(shù)據(jù)差別非常明顯。并且隨著處理時(shí)間的增長,性能差距越明顯。