自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

前端開(kāi)發(fā)中大并發(fā)量如何控制并發(fā)數(shù)

開(kāi)發(fā) 前端
在這篇文章中,簡(jiǎn)要介紹了為什么要進(jìn)行并發(fā)請(qǐng)求,闡述了使用請(qǐng)求池隊(duì)列實(shí)現(xiàn)并發(fā)請(qǐng)求的設(shè)計(jì)思路,簡(jiǎn)要實(shí)現(xiàn)代碼。此外,還閱讀分析了p-limit的源碼,并使用數(shù)組進(jìn)行簡(jiǎn)要的源碼編寫(xiě),以實(shí)現(xiàn)要求。

寫(xiě)在前面

最近在進(jìn)行移動(dòng)端h5開(kāi)發(fā),首頁(yè)需要加載的資源很多,一個(gè)lottie動(dòng)效需要請(qǐng)求70多張圖片,但是遇到安卓webview限制請(qǐng)求并發(fā)數(shù),導(dǎo)致部分圖片請(qǐng)求失敗破圖。當(dāng)然圖片資源可以做閑時(shí)加載和預(yù)加載,可以減輕播放動(dòng)效時(shí)資源未加載的問(wèn)題。

同樣的,業(yè)務(wù)開(kāi)發(fā)也會(huì)遇到需要異步請(qǐng)求幾十個(gè)接口,如果同時(shí)并發(fā)請(qǐng)求瀏覽器會(huì)進(jìn)行限制請(qǐng)求數(shù),也會(huì)給后端造成請(qǐng)求壓力。

場(chǎng)景說(shuō)明

現(xiàn)在有個(gè)場(chǎng)景:

請(qǐng)你實(shí)現(xiàn)一個(gè)并發(fā)請(qǐng)求函數(shù)concurrencyRequest(urls, maxNum),要求如下:

  • 要求最大并發(fā)數(shù) maxNum。
  • 每當(dāng)有一個(gè)請(qǐng)求返回,就留下一個(gè)空位,可以增加新的請(qǐng)求。
  • 所有請(qǐng)求完成后,結(jié)果按照 urls 里面的順序依次打出(發(fā)送請(qǐng)求的函數(shù)可以直接使用fetch即可)。

初始實(shí)現(xiàn):

const preloadManger = (urls, maxCount = 5) => {
  let count = 0; // 計(jì)數(shù) -- 用于控制并發(fā)數(shù)
  const createTask = () => {
    if (count < maxCount) {
      const url = urls.pop(); // 從請(qǐng)求數(shù)組中取值
      if (url) {
        // 無(wú)論請(qǐng)求是否成功,都要執(zhí)行taskFinish
        loader(url).finally(taskFinish);
        // 添加下一個(gè)請(qǐng)求
        count++;
        createTask();
      }
    }
  };

  const taskFinish = () => {
    count--;
    createTask();
  };

  createTask();
};

// 進(jìn)行異步請(qǐng)求
const loader = async (url) => {
  const res = await fetch(url).then(res=>res.json());
  console.log("res",res);
  return res
}

const urls = [];
for (let i = 1; i <= 20; i++) {
    urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}

preloadManger(urls, 5)

請(qǐng)求狀態(tài):

可以看到上面的請(qǐng)求是每五個(gè)一組進(jìn)行請(qǐng)求,當(dāng)一個(gè)請(qǐng)求無(wú)論返回成功或是失敗,都會(huì)從請(qǐng)求數(shù)組中再取一個(gè)請(qǐng)求進(jìn)行補(bǔ)充。

設(shè)計(jì)思路

那么,我們可以考慮使用隊(duì)列去請(qǐng)求大量接口。

思路如下:

假定最大并發(fā)數(shù)是maxNum=5,圖中對(duì)接口進(jìn)行了定義編號(hào),當(dāng)請(qǐng)求隊(duì)列池中有一個(gè)請(qǐng)求返回后,就向池子中新增一個(gè)接口進(jìn)行請(qǐng)求,依次直到最后一個(gè)請(qǐng)求執(zhí)行完畢。

當(dāng)然,要保證程序的健壯性,需要考慮一些邊界情況,如下:

  • 當(dāng)初始請(qǐng)求數(shù)組urls的長(zhǎng)度為0時(shí),此時(shí)請(qǐng)求結(jié)果數(shù)組results是個(gè)空數(shù)組。
  • 最大并發(fā)數(shù)maxNums>urls的長(zhǎng)度時(shí),請(qǐng)求數(shù)為urls的長(zhǎng)度。
  • 需要定義計(jì)數(shù)器count去判斷是否全部請(qǐng)求完畢。
  • 無(wú)論請(qǐng)求成功與否,都應(yīng)該將結(jié)果存在結(jié)果數(shù)組results中。
  • 結(jié)果數(shù)組results和urls數(shù)組的順序保持一致,方便存取。

代碼實(shí)現(xiàn)

在前面的初始實(shí)現(xiàn)的代碼中,雖然都能滿足基本需求,但是并沒(méi)有考慮一些邊界條件,對(duì)此需要根據(jù)上面設(shè)計(jì)思路重新實(shí)現(xiàn)得到:

// 并發(fā)請(qǐng)求函數(shù)
const concurrencyRequest = (urls, maxNum) => {
    return new Promise((resolve) => {
        if (urls.length === 0) {
            resolve([]);
            return;
        }
        const results = [];
        let index = 0; // 下一個(gè)請(qǐng)求的下標(biāo)
        let count = 0; // 當(dāng)前請(qǐng)求完成的數(shù)量

        // 發(fā)送請(qǐng)求
        async function request() {
            if (index === urls.length) return;
            const i = index; // 保存序號(hào),使result和urls相對(duì)應(yīng)
            const url = urls[index];
            index++;
            console.log(url);
            try {
                const resp = await fetch(url);
                // resp 加入到results
                results[i] = resp;
            } catch (err) {
                // err 加入到results
                results[i] = err;
            } finally {
                count++;
                // 判斷是否所有的請(qǐng)求都已完成
                if (count === urls.length) {
                    console.log('完成了');
                    resolve(results);
                }
                request();
            }
        }

        // maxNum和urls.length取最小進(jìn)行調(diào)用
        const times = Math.min(maxNum, urls.length);
        for(let i = 0; i < times; i++) {
            request();
        }
    })
}

測(cè)試代碼:

const urls = [];
for (let i = 1; i <= 20; i++) {
    urls.push(`https://jsonplaceholder.typicode.com/todos/${i}`);
}
concurrencyRequest(urls, 5).then(res => {
    console.log(res);
})

請(qǐng)求結(jié)果:

上面代碼基本實(shí)現(xiàn)了前端并發(fā)請(qǐng)求的需求,也基本滿足需求,在生產(chǎn)中其實(shí)有很多已經(jīng)封裝好的庫(kù)可以直接使用。比如:p-limit【https://github.com/sindresorhus/p-limit】

閱讀p-limit源碼

import Queue from 'yocto-queue';
import {AsyncResource} from '#async_hooks';

export default function pLimit(concurrency) {
 // 判斷這個(gè)參數(shù)是否是一個(gè)大于0的整數(shù),如果不是就拋出一個(gè)錯(cuò)誤
 if (
  !((Number.isInteger(concurrency)
  || concurrency === Number.POSITIVE_INFINITY)
  && concurrency > 0)
 ) {
  throw new TypeError('Expected `concurrency` to be a number from 1 and up');
 }

 // 創(chuàng)建隊(duì)列 -- 用于存取請(qǐng)求
 const queue = new Queue();
 // 計(jì)數(shù)
 let activeCount = 0;

 // 用來(lái)處理并發(fā)數(shù)的函數(shù)
 const next = () => {
  activeCount--;

  if (queue.size > 0) {
   // queue.dequeue()可以理解為[].shift(),取出隊(duì)列中的第一個(gè)任務(wù),由于確定里面是一個(gè)函數(shù),所以直接執(zhí)行就可以了;
   queue.dequeue()();
  }
 };

 // run函數(shù)就是用來(lái)執(zhí)行異步并發(fā)任務(wù)
 const run = async (function_, resolve, arguments_) => {
  // activeCount加1,表示當(dāng)前并發(fā)數(shù)加1
  activeCount++;

  // 執(zhí)行傳入的異步函數(shù),將結(jié)果賦值給result,注意:現(xiàn)在的result是一個(gè)處在pending狀態(tài)的Promise
  const result = (async () => function_(...arguments_))();

  // resolve函數(shù)就是enqueue函數(shù)中返回的Promise的resolve函數(shù)
  resolve(result);

  // 等待result的狀態(tài)發(fā)生改變,這里使用了try...catch,因?yàn)閞esult可能會(huì)出現(xiàn)異常,所以需要捕獲異常;
  try {
   await result;
  } catch {}

  next();
 };

 // 將run函數(shù)添加到請(qǐng)求隊(duì)列中
 const enqueue = (function_, resolve, arguments_) => {
  queue.enqueue(
   // 將run函數(shù)綁定到AsyncResource上,不需要立即執(zhí)行,對(duì)此添加了一個(gè)bind方法
   AsyncResource.bind(run.bind(undefined, function_, resolve, arguments_)),
  );

  // 立即執(zhí)行一個(gè)異步函數(shù),等待下一個(gè)微任務(wù)(注意:因?yàn)閍ctiveCount是異步更新的,所以需要等待下一個(gè)微任務(wù)執(zhí)行才能獲取新的值)
  (async () => {
   // This function needs to wait until the next microtask before comparing
   // `activeCount` to `concurrency`, because `activeCount` is updated asynchronously
   // when the run function is dequeued and called. The comparison in the if-statement
   // needs to happen asynchronously as well to get an up-to-date value for `activeCount`.
   await Promise.resolve();

   // 判斷activeCount是否小于concurrency,并且隊(duì)列中有任務(wù),如果滿足條件就會(huì)將隊(duì)列中的任務(wù)取出來(lái)執(zhí)行
   if (activeCount < concurrency && queue.size > 0) {
    // 注意:queue.dequeue()()執(zhí)行的是run函數(shù)
    queue.dequeue()();
   }
  })();
 };

 // 接收一個(gè)函數(shù)fn和參數(shù)args,然后返回一個(gè)Promise,執(zhí)行出隊(duì)操作
 const generator = (function_, ...arguments_) => new Promise(resolve => {
  enqueue(function_, resolve, arguments_);
 });

 // 向外暴露當(dāng)前的并發(fā)數(shù)和隊(duì)列中的任務(wù)數(shù),并且手動(dòng)清空隊(duì)列
 Object.defineProperties(generator, {
  // 當(dāng)前并發(fā)數(shù)
  activeCount: {
   get: () => activeCount,
  },
  // 隊(duì)列中的任務(wù)數(shù)
  pendingCount: {
   get: () => queue.size,
  },
  // 清空隊(duì)列
  clearQueue: {
   value() {
    queue.clear();
   },
  },
 });

 return generator;
}

整個(gè)庫(kù)只有短短71行代碼,在代碼中導(dǎo)入了yocto-queue庫(kù),它是一個(gè)微型的隊(duì)列數(shù)據(jù)結(jié)構(gòu)。

手寫(xiě)源碼

在進(jìn)行手撕源碼時(shí),可以借助數(shù)組進(jìn)行簡(jiǎn)易的實(shí)現(xiàn):

class PLimit {
    constructor(concurrency) {
        this.concurrency = concurrency;
        this.activeCount = 0;
        this.queue = [];
        
        return (fn, ...args) => {
            return new Promise(resolve => {
               this.enqueue(fn, resolve, args);
            });
        }
    }
    
    enqueue(fn, resolve, args) {
        this.queue.push(this.run.bind(this, fn, resolve, args));

        (async () => {
            await Promise.resolve();
            if (this.activeCount < this.concurrency && this.queue.length > 0) {
                this.queue.shift()();
            }
        })();
    }
    
    async run(fn, resolve, args) {
        this.activeCount++;

        const result = (async () => fn(...args))();

        resolve(result);

        try {
            await result;
        } catch {
        }

        this.next();
    }
    
    next() {
        this.activeCount--;

        if (this.queue.length > 0) {
            this.queue.shift()();
        }
    }
}

小結(jié)

在這篇文章中,簡(jiǎn)要介紹了為什么要進(jìn)行并發(fā)請(qǐng)求,闡述了使用請(qǐng)求池隊(duì)列實(shí)現(xiàn)并發(fā)請(qǐng)求的設(shè)計(jì)思路,簡(jiǎn)要實(shí)現(xiàn)代碼。

此外,還閱讀分析了p-limit的源碼,并使用數(shù)組進(jìn)行簡(jiǎn)要的源碼編寫(xiě),以實(shí)現(xiàn)要求。

參考文章

  • 【源碼共讀】大并發(fā)量如何控制并發(fā)數(shù)https://juejin.cn/post/7179220832575717435?searchId=20240430092814392DC2208C545E691A26
  • 前端實(shí)現(xiàn)并發(fā)控制網(wǎng)絡(luò)請(qǐng)求https://mp.weixin.qq.com/s/9uq2SqkcMSSWjks0x7RQJg。
責(zé)任編輯:姜華 來(lái)源: 宇宙一碼平川
相關(guān)推薦

2017-08-21 10:56:55

MySQL并發(fā)控制

2021-01-12 10:22:45

JavaScript并發(fā)控制前端

2021-04-07 06:00:18

JavaScript 前端并發(fā)控制

2024-06-17 08:40:16

2009-09-24 14:43:53

Hibernate樂(lè)觀

2022-12-12 09:07:06

Redis并發(fā)限流

2009-06-09 09:32:48

LinuxApache帶寬控制

2023-11-20 08:01:38

并發(fā)處理數(shù)Tomcat

2010-11-08 10:57:05

SQL Server的

2021-06-29 23:40:19

Golang語(yǔ)言并發(fā)

2021-02-25 22:17:19

開(kāi)發(fā)技術(shù)編程

2009-11-25 11:41:56

IIS最大并發(fā)數(shù)

2017-02-28 17:46:15

Linux驅(qū)動(dòng)技術(shù)并發(fā)控制

2009-02-09 10:06:03

并發(fā)控制Web應(yīng)用悲觀鎖

2020-01-13 10:20:30

架構(gòu)聊天架構(gòu)百萬(wàn)并發(fā)量

2024-08-26 13:23:26

2017-11-06 17:16:55

Linux設(shè)備驅(qū)動(dòng)并發(fā)控制

2023-01-30 15:41:10

Channel控制并發(fā)

2025-02-28 00:03:22

高并發(fā)TPS系統(tǒng)

2025-02-26 03:00:00

點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)