前端開(kāi)發(fā)中大并發(fā)量如何控制并發(fā)數(shù)
寫(xiě)在前面
最近在進(jìn)行移動(dòng)端h5開(kāi)發(fā),首頁(yè)需要加載的資源很多,一個(gè)lottie動(dòng)效需要請(qǐng)求70多張圖片,但是遇到安卓webview限制請(qǐng)求并發(fā)數(shù),導(dǎo)致部分圖片請(qǐng)求失敗破圖。當(dāng)然圖片資源可以做閑時(shí)加載和預(yù)加載,可以減輕播放動(dòng)效時(shí)資源未加載的問(wèn)題。
同樣的,業(yè)務(wù)開(kāi)發(fā)也會(huì)遇到需要異步請(qǐng)求幾十個(gè)接口,如果同時(shí)并發(fā)請(qǐng)求瀏覽器會(huì)進(jìn)行限制請(qǐng)求數(shù),也會(huì)給后端造成請(qǐng)求壓力。
場(chǎng)景說(shuō)明
現(xiàn)在有個(gè)場(chǎng)景:
請(qǐng)你實(shí)現(xiàn)一個(gè)并發(fā)請(qǐng)求函數(shù)concurrencyRequest(urls, maxNum),要求如下:
- 要求最大并發(fā)數(shù) maxNum。
- 每當(dāng)有一個(gè)請(qǐng)求返回,就留下一個(gè)空位,可以增加新的請(qǐng)求。
- 所有請(qǐng)求完成后,結(jié)果按照 urls 里面的順序依次打出(發(fā)送請(qǐng)求的函數(shù)可以直接使用fetch即可)。
初始實(shí)現(xiàn):
const preloadManger = (urls, maxCount = 5) => {
let count = 0; // 計(jì)數(shù) -- 用于控制并發(fā)數(shù)
const createTask = () => {
if (count < maxCount) {
const url = urls.pop(); // 從請(qǐng)求數(shù)組中取值
if (url) {
// 無(wú)論請(qǐng)求是否成功,都要執(zhí)行taskFinish
loader(url).finally(taskFinish);
// 添加下一個(gè)請(qǐng)求
count++;
createTask();
}
}
};
const taskFinish = () => {
count--;
createTask();
};
createTask();
};
// 進(jìn)行異步請(qǐng)求
const loader = async (url) => {
const res = await fetch(url).then(res=>res.json());
console.log("res",res);
return res
}
const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
preloadManger(urls, 5)
請(qǐng)求狀態(tài):
可以看到上面的請(qǐng)求是每五個(gè)一組進(jìn)行請(qǐng)求,當(dāng)一個(gè)請(qǐng)求無(wú)論返回成功或是失敗,都會(huì)從請(qǐng)求數(shù)組中再取一個(gè)請(qǐng)求進(jìn)行補(bǔ)充。
設(shè)計(jì)思路
那么,我們可以考慮使用隊(duì)列去請(qǐng)求大量接口。
思路如下:
假定最大并發(fā)數(shù)是maxNum=5,圖中對(duì)接口進(jìn)行了定義編號(hào),當(dāng)請(qǐng)求隊(duì)列池中有一個(gè)請(qǐng)求返回后,就向池子中新增一個(gè)接口進(jìn)行請(qǐng)求,依次直到最后一個(gè)請(qǐng)求執(zhí)行完畢。
當(dāng)然,要保證程序的健壯性,需要考慮一些邊界情況,如下:
- 當(dāng)初始請(qǐng)求數(shù)組urls的長(zhǎng)度為0時(shí),此時(shí)請(qǐng)求結(jié)果數(shù)組results是個(gè)空數(shù)組。
- 最大并發(fā)數(shù)maxNums>urls的長(zhǎng)度時(shí),請(qǐng)求數(shù)為urls的長(zhǎng)度。
- 需要定義計(jì)數(shù)器count去判斷是否全部請(qǐng)求完畢。
- 無(wú)論請(qǐng)求成功與否,都應(yīng)該將結(jié)果存在結(jié)果數(shù)組results中。
- 結(jié)果數(shù)組results和urls數(shù)組的順序保持一致,方便存取。
代碼實(shí)現(xiàn)
在前面的初始實(shí)現(xiàn)的代碼中,雖然都能滿足基本需求,但是并沒(méi)有考慮一些邊界條件,對(duì)此需要根據(jù)上面設(shè)計(jì)思路重新實(shí)現(xiàn)得到:
// 并發(fā)請(qǐng)求函數(shù)
const concurrencyRequest = (urls, maxNum) => {
return new Promise((resolve) => {
if (urls.length === 0) {
resolve([]);
return;
}
const results = [];
let index = 0; // 下一個(gè)請(qǐng)求的下標(biāo)
let count = 0; // 當(dāng)前請(qǐng)求完成的數(shù)量
// 發(fā)送請(qǐng)求
async function request() {
if (index === urls.length) return;
const i = index; // 保存序號(hào),使result和urls相對(duì)應(yīng)
const url = urls[index];
index++;
console.log(url);
try {
const resp = await fetch(url);
// resp 加入到results
results[i] = resp;
} catch (err) {
// err 加入到results
results[i] = err;
} finally {
count++;
// 判斷是否所有的請(qǐng)求都已完成
if (count === urls.length) {
console.log('完成了');
resolve(results);
}
request();
}
}
// maxNum和urls.length取最小進(jìn)行調(diào)用
const times = Math.min(maxNum, urls.length);
for(let i = 0; i < times; i++) {
request();
}
})
}
測(cè)試代碼:
const urls = [];
for (let i = 1; i <= 20; i++) {
urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
concurrencyRequest(urls, 5).then(res => {
console.log(res);
})
請(qǐng)求結(jié)果:
上面代碼基本實(shí)現(xiàn)了前端并發(fā)請(qǐng)求的需求,也基本滿足需求,在生產(chǎn)中其實(shí)有很多已經(jīng)封裝好的庫(kù)可以直接使用。比如:p-limit【https://github.com/sindresorhus/p-limit】
閱讀p-limit源碼
import Queue from 'yocto-queue';
import {AsyncResource} from '#async_hooks';
export default function pLimit(concurrency) {
// 判斷這個(gè)參數(shù)是否是一個(gè)大于0的整數(shù),如果不是就拋出一個(gè)錯(cuò)誤
if (
!((Number.isInteger(concurrency)
|| concurrency === Number.POSITIVE_INFINITY)
&& concurrency > 0)
) {
throw new TypeError('Expected `concurrency` to be a number from 1 and up');
}
// 創(chuàng)建隊(duì)列 -- 用于存取請(qǐng)求
const queue = new Queue();
// 計(jì)數(shù)
let activeCount = 0;
// 用來(lái)處理并發(fā)數(shù)的函數(shù)
const next = () => {
activeCount--;
if (queue.size > 0) {
// queue.dequeue()可以理解為[].shift(),取出隊(duì)列中的第一個(gè)任務(wù),由于確定里面是一個(gè)函數(shù),所以直接執(zhí)行就可以了;
queue.dequeue()();
}
};
// run函數(shù)就是用來(lái)執(zhí)行異步并發(fā)任務(wù)
const run = async (function_, resolve, arguments_) => {
// activeCount加1,表示當(dāng)前并發(fā)數(shù)加1
activeCount++;
// 執(zhí)行傳入的異步函數(shù),將結(jié)果賦值給result,注意:現(xiàn)在的result是一個(gè)處在pending狀態(tài)的Promise
const result = (async () => function_(...arguments_))();
// resolve函數(shù)就是enqueue函數(shù)中返回的Promise的resolve函數(shù)
resolve(result);
// 等待result的狀態(tài)發(fā)生改變,這里使用了try...catch,因?yàn)閞esult可能會(huì)出現(xiàn)異常,所以需要捕獲異常;
try {
await result;
} catch {}
next();
};
// 將run函數(shù)添加到請(qǐng)求隊(duì)列中
const enqueue = (function_, resolve, arguments_) => {
queue.enqueue(
// 將run函數(shù)綁定到AsyncResource上,不需要立即執(zhí)行,對(duì)此添加了一個(gè)bind方法
AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
);
// 立即執(zhí)行一個(gè)異步函數(shù),等待下一個(gè)微任務(wù)(注意:因?yàn)閍ctiveCount是異步更新的,所以需要等待下一個(gè)微任務(wù)執(zhí)行才能獲取新的值)
(async () => {
// This function needs to wait until the next microtask before comparing
// `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
// when the run function is dequeued and called. The comparison in the if-statement
// needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
await Promise.resolve();
// 判斷activeCount是否小于concurrency,并且隊(duì)列中有任務(wù),如果滿足條件就會(huì)將隊(duì)列中的任務(wù)取出來(lái)執(zhí)行
if (activeCount < concurrency && queue.size > 0) {
// 注意:queue.dequeue()()執(zhí)行的是run函數(shù)
queue.dequeue()();
}
})();
};
// 接收一個(gè)函數(shù)fn和參數(shù)args,然后返回一個(gè)Promise,執(zhí)行出隊(duì)操作
const generator = (function_, ...arguments_) => new Promise(resolve => {
enqueue(function_, resolve, arguments_);
});
// 向外暴露當(dāng)前的并發(fā)數(shù)和隊(duì)列中的任務(wù)數(shù),并且手動(dòng)清空隊(duì)列
Object.defineProperties(generator, {
// 當(dāng)前并發(fā)數(shù)
activeCount: {
get: () => activeCount,
},
// 隊(duì)列中的任務(wù)數(shù)
pendingCount: {
get: () => queue.size,
},
// 清空隊(duì)列
clearQueue: {
value() {
queue.clear();
},
},
});
return generator;
}
整個(gè)庫(kù)只有短短71行代碼,在代碼中導(dǎo)入了yocto-queue庫(kù),它是一個(gè)微型的隊(duì)列數(shù)據(jù)結(jié)構(gòu)。
手寫(xiě)源碼
在進(jìn)行手撕源碼時(shí),可以借助數(shù)組進(jìn)行簡(jiǎn)易的實(shí)現(xiàn):
class PLimit {
constructor(concurrency) {
this.concurrency = concurrency;
this.activeCount = 0;
this.queue = [];
return (fn, ...args) => {
return new Promise(resolve => {
this.enqueue(fn, resolve, args);
});
}
}
enqueue(fn, resolve, args) {
this.queue.push(this.run.bind(this, fn, resolve, args));
(async () => {
await Promise.resolve();
if (this.activeCount < this.concurrency && this.queue.length > 0) {
this.queue.shift()();
}
})();
}
async run(fn, resolve, args) {
this.activeCount++;
const result = (async () => fn(...args))();
resolve(result);
try {
await result;
} catch {
}
this.next();
}
next() {
this.activeCount--;
if (this.queue.length > 0) {
this.queue.shift()();
}
}
}
小結(jié)
在這篇文章中,簡(jiǎn)要介紹了為什么要進(jìn)行并發(fā)請(qǐng)求,闡述了使用請(qǐng)求池隊(duì)列實(shí)現(xiàn)并發(fā)請(qǐng)求的設(shè)計(jì)思路,簡(jiǎn)要實(shí)現(xiàn)代碼。
此外,還閱讀分析了p-limit的源碼,并使用數(shù)組進(jìn)行簡(jiǎn)要的源碼編寫(xiě),以實(shí)現(xiàn)要求。
參考文章
- 【源碼共讀】大并發(fā)量如何控制并發(fā)數(shù)https://juejin.cn/post/7179220832575717435?searchId=20240430092814392DC2208C545E691A26
- 前端實(shí)現(xiàn)并發(fā)控制網(wǎng)絡(luò)請(qǐng)求https://mp.weixin.qq.com/s/9uq2SqkcMSSWjks0x7RQJg。