幾百個數(shù)據(jù),Promise.all 沒做控制并發(fā)?那你心可真大啊!
需求
我最近在做一個需求:批量去往數(shù)據(jù)庫里存儲一些東西,數(shù)量可能一次性達到幾百個,也就意味著我需要一次性往數(shù)據(jù)庫里存儲幾百次,我是這么做的:
const save = (data) => {
// 數(shù)據(jù)庫操作(Promise)
return insert(data)
}
const datas = [幾百個數(shù)據(jù)]
// 進行存儲
Promise.all(datas.map(save))
被呵斥
正當我覺得這個需求很輕松的時候。。在 Code Review 上,我被團隊的大佬們給呵斥了一頓,理由是:存儲的操作發(fā)生在服務器,服務器是很脆弱的,你一次性存儲幾百個,服務器崩了怎么辦?
隨后大佬們提出解決方案:控制并發(fā),大佬們是真的強,感覺這種東西已經(jīng)是大佬們的常規(guī)操作了。。
控制Promise.all并發(fā)
意思就是,比如我有幾百個存儲操作,我不能一次性去全部執(zhí)行,而是要控制一次性只能執(zhí)行10個操作,10個中有一個執(zhí)行完了,就拿還沒執(zhí)行的操作補上去,就這樣一直到這幾百個操作全部執(zhí)行完為止。。
其實很簡單,可以直接用庫,比如async-pool、es6-promise-pool、p-limit,只要是能用庫的,我建議不要自己去寫,因為不定因素很多,你自己寫的肯定沒有庫寫的好,你說呢~
簡單實現(xiàn)
看到一位兄弟實現(xiàn)的挺不錯的,鏈接:https://segmentfault.com/a/1190000016389127
這是async-pool這個庫的核心源碼:
function asyncPool(poolLimit, array, iteratorFn) {
let i = 0;
const ret = [];
const executing = [];
const enqueue = function () {
// 邊界處理,array為空數(shù)組
if (i === array.length) {
return Promise.resolve();
}
// 每調一次enqueue,初始化一個promise
const item = array[i++];
const p = Promise.resolve().then(() => iteratorFn(item, array));
// 放入promises數(shù)組
ret.push(p);
// promise執(zhí)行完畢,從executing數(shù)組中刪除
const e = p.then(() => executing.splice(executing.indexOf(e), 1));
// 插入executing數(shù)字,表示正在執(zhí)行的promise
executing.push(e);
// 使用Promise.rece,每當executing數(shù)組中promise數(shù)量低于poolLimit,就實例化新的promise并執(zhí)行
let r = Promise.resolve();
if (executing.length >= poolLimit) {
r = Promise.race(executing);
}
// 遞歸,直到遍歷完array
return r.then(() => enqueue());
};
return enqueue().then(() => Promise.all(ret));
}
大概的邏輯可以總結為:
- 從array第1個元素開始,初始化promise對象,同時用一個executing數(shù)組保存正在執(zhí)行的promise
- 不斷初始化promise,直到達到poolLimt
- 使用Promise.race,獲得executing中promise的執(zhí)行情況,當有一個promise執(zhí)行完畢,繼續(xù)初始化promise并放入executing中
- 所有promise都執(zhí)行完了,調用Promise.all返回
使用方式:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
return asyncPool(2, [1000, 5000, 3000, 2000], timeout).then(results => {
});