自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

JavaScript 中如何實現(xiàn)并發(fā)控制?

開發(fā) 前端
本文阿寶哥帶大家詳細分析async-pool 異步任務并發(fā)控制的具體實現(xiàn),同時為了讓大家能夠更好地理解 async-pool 的核心代碼。

[[391506]]

一、并發(fā)控制簡介

在日常開發(fā)過程中,你可能會遇到并發(fā)控制的場景,比如控制請求并發(fā)數。那么在 JavaScript 中如何實現(xiàn)并發(fā)控制呢?在回答這個問題之前,我們來簡單介紹一下并發(fā)控制。

假設有 6 個待辦任務要執(zhí)行,而我們希望限制同時執(zhí)行的任務個數,即最多只有 2 個任務能同時執(zhí)行。當 正在執(zhí)行任務列表 中的任何 1 個任務完成后,程序會自動從 待辦任務列表 中獲取新的待辦任務并把該任務添加到 正在執(zhí)行任務列表 中。為了讓大家能夠更直觀地理解上述的過程,阿寶哥特意畫了以下 3 張圖:

1.1 階段一

1.2 階段二

1.3 階段三

好的,介紹完并發(fā)控制之后,阿寶哥將以 Github 上 async-pool 這個庫來介紹一下異步任務并發(fā)控制的具體實現(xiàn)。

  1. https://github.com/rxaviers/async-pool 
  2.  
  3.  
  4.  
  5. Run multiple promise-returning & async functions with limited concurrency using native ES6/ES7。 

 二、并發(fā)控制的實現(xiàn)

async-pool 這個庫提供了 ES7 和 ES6 兩種不同版本的實現(xiàn),在分析其具體實現(xiàn)之前,我們來看一下它如何使用。

2.1 asyncPool 的使用

  1. const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); 
  2. await asyncPool(2, [1000, 5000, 3000, 2000], timeout); 

在以上代碼中,我們使用 async-pool 這個庫提供的 asyncPool 函數來實現(xiàn)異步任務的并發(fā)控制。asyncPool 函數的簽名如下所示:

  1. function asyncPool(poolLimit, array, iteratorFn){ ... } 

 該函數接收 3 個參數:

  • poolLimit(數字類型):表示限制的并發(fā)數;
  • array(數組類型):表示任務數組;
  • iteratorFn(函數類型):表示迭代函數,用于實現(xiàn)對每個任務項進行處理,該函數會返回一個 Promise 對象或異步函數。

對于以上示例來說,在使用了 asyncPool 函數之后,對應的執(zhí)行過程如下所示:

  1. const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i)); 
  2. await asyncPool(2, [1000, 5000, 3000, 2000], timeout); 
  3. // Call iterator (i = 1000) 
  4. // Call iterator (i = 5000) 
  5. // Pool limit of 2 reached, wait for the quicker one to complete... 
  6. // 1000 finishes 
  7. // Call iterator (i = 3000) 
  8. // Pool limit of 2 reached, wait for the quicker one to complete... 
  9. // 3000 finishes 
  10. // Call iterator (i = 2000) 
  11. // Itaration is complete, wait until running ones complete... 
  12. // 5000 finishes 
  13. // 2000 finishes 
  14. // Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`. 

通過觀察以上的注釋信息,我們可以大致地了解 asyncPool 函數內部的控制流程。下面我們先來分析 asyncPool 函數的 ES7 實現(xiàn)。

2.2 asyncPool ES7 實現(xiàn)

  1. async function asyncPool(poolLimit, array, iteratorFn) { 
  2.   const ret = []; // 存儲所有的異步任務 
  3.   const executing = []; // 存儲正在執(zhí)行的異步任務 
  4.   for (const item of array) { 
  5.     // 調用iteratorFn函數創(chuàng)建異步任務 
  6.     const p = Promise.resolve().then(() => iteratorFn(item, array)); 
  7.     ret.push(p); // 保存新的異步任務 
  8.  
  9.     // 當poolLimit值小于或等于總任務個數時,進行并發(fā)控制 
  10.     if (poolLimit <= array.length) { 
  11.       // 當任務完成后,從正在執(zhí)行的任務數組中移除已完成的任務 
  12.       const e = p.then(() => executing.splice(executing.indexOf(e), 1)); 
  13.       executing.push(e); // 保存正在執(zhí)行的異步任務 
  14.       if (executing.length >= poolLimit) { 
  15.         await Promise.race(executing); // 等待較快的任務執(zhí)行完成 
  16.       } 
  17.     } 
  18.   } 
  19.   return Promise.all(ret); 

在以上代碼中,充分利用了 Promise.all 和 Promise.race 函數特點,再結合 ES7 中提供的 async await 特性,最終實現(xiàn)了并發(fā)控制的功能。利用 await Promise.race(executing); 這行語句,我們會等待 正在執(zhí)行任務列表 中較快的任務執(zhí)行完成之后,才會繼續(xù)執(zhí)行下一次循環(huán)。

asyncPool ES7 實現(xiàn)相對比較簡單,接下來我們來看一下不使用 async await 特性要如何實現(xiàn)同樣的功能。

2.3 asyncPool ES6 實現(xiàn)

  1. function asyncPool(poolLimit, array, iteratorFn) { 
  2.   let i = 0; 
  3.   const ret = []; // 存儲所有的異步任務 
  4.   const executing = []; // 存儲正在執(zhí)行的異步任務 
  5.   const enqueue = function () { 
  6.     if (i === array.length) { 
  7.       return Promise.resolve(); 
  8.     } 
  9.     const item = array[i++]; // 獲取新的任務項 
  10.     const p = Promise.resolve().then(() => iteratorFn(item, array)); 
  11.     ret.push(p); 
  12.  
  13.     let r = Promise.resolve(); 
  14.  
  15.     // 當poolLimit值小于或等于總任務個數時,進行并發(fā)控制 
  16.     if (poolLimit <= array.length) { 
  17.       // 當任務完成后,從正在執(zhí)行的任務數組中移除已完成的任務 
  18.       const e = p.then(() => executing.splice(executing.indexOf(e), 1)); 
  19.       executing.push(e); 
  20.       if (executing.length >= poolLimit) { 
  21.         r = Promise.race(executing);  
  22.       } 
  23.     } 
  24.   
  25.     // 正在執(zhí)行任務列表 中較快的任務執(zhí)行完成之后,才會從array數組中獲取新的待辦任務 
  26.     return r.then(() => enqueue()); 
  27.   }; 
  28.   return enqueue().then(() => Promise.all(ret)); 

在 ES6 的實現(xiàn)版本中,通過內部封裝的 enqueue 函數來實現(xiàn)核心的控制邏輯。當 Promise.race(executing) 返回的 Promise 對象變成已完成狀態(tài)時,才會調用 enqueue 函數,從 array 數組中獲取新的待辦任務。

三、阿寶哥有話說

在 asyncPool 這個庫的 ES7 和 ES6 的具體實現(xiàn)中,我們都使用到了 Promise.all 和 Promise.race 函數。其中手寫 Promise.all 是一道常見的面試題。剛好趁著這個機會,阿寶哥跟大家一起來手寫簡易版的 Promise.all 和 Promise.race 函數。

3.1 手寫 Promise.all

Promise.all(iterable) 方法會返回一個 promise 對象,當輸入的所有 promise 對象的狀態(tài)都變成 resolved 時,返回的 promise 對象就會以數組的形式,返回每個 promise 對象 resolve 后的結果。當輸入的任何一個 promise 對象狀態(tài)變成 rejected 時,則返回的 promise 對象會 reject 對應的錯誤信息。

  1. Promise.all = function (iterators) { 
  2.   return new Promise((resolve, reject) => { 
  3.     if (!iterators || iterators.length === 0) { 
  4.       resolve([]); 
  5.     } else { 
  6.       let count = 0; // 計數器,用于判斷所有任務是否執(zhí)行完成 
  7.       let result = []; // 結果數組 
  8.       for (let i = 0; i < iterators.length; i++) { 
  9.         // 考慮到iterators[i]可能是普通對象,則統(tǒng)一包裝為Promise對象 
  10.         Promise.resolve(iterators[i]).then
  11.           (data) => { 
  12.             result[i] = data; // 按順序保存對應的結果 
  13.             // 當所有任務都執(zhí)行完成后,再統(tǒng)一返回結果 
  14.             if (++count === iterators.length) { 
  15.               resolve(result); 
  16.             } 
  17.           }, 
  18.           (err) => { 
  19.             reject(err); // 任何一個Promise對象執(zhí)行失敗,則調用reject()方法 
  20.             return
  21.           } 
  22.         ); 
  23.       } 
  24.     } 
  25.   }); 
  26. }; 

需要注意的是對于 Promise.all 的標準實現(xiàn)來說,它的參數是一個可迭代對象,比如 Array、String 或 Set 等。

3.2 手寫 Promise.race

Promise.race(iterable) 方法會返回一個 promise 對象,一旦迭代器中的某個 promise 對象 resolved 或 rejected,返回的 promise 對象就會 resolve 或 reject 相應的值。

  1. Promise.race = function (iterators) { 
  2.   return new Promise((resolve, reject) => { 
  3.     for (const iter of iterators) { 
  4.       Promise.resolve(iter) 
  5.         .then((res) => { 
  6.           resolve(res); 
  7.         }) 
  8.         .catch((e) => { 
  9.           reject(e); 
  10.         }); 
  11.     } 
  12.   }); 
  13. }; 

本文阿寶哥帶大家詳細分析了 async-pool 異步任務并發(fā)控制的具體實現(xiàn),同時為了讓大家能夠更好地理解 async-pool 的核心代碼。最后阿寶哥還帶大家一起手寫簡易版的 Promise.all 和 Promise.race 函數。其實除了 Promise.all 函數之外,還存在另一個函數 —— Promise.allSettled,該函數用于解決 Promise.all 存在的問題,感興趣的小伙伴可以自行研究一下。

四、參考資源

  • Github - async-pool
  • MDN - Promise.all
  • MDN - Promise.race
  • MDN - Promise.allSettled

 

責任編輯:姜華 來源: 全棧修仙之路
相關推薦

2021-01-12 10:22:45

JavaScript并發(fā)控制前端

2021-06-01 05:15:36

JavaScript 前端大文件并發(fā)上傳

2009-02-09 10:06:03

并發(fā)控制Web應用悲觀鎖

2024-04-30 10:29:46

前端開發(fā)h5開發(fā)函數

2010-09-08 16:50:11

JavaScriptDOM操作

2024-03-04 00:02:00

Redis存儲令牌

2023-11-06 14:13:51

asyncio開發(fā)

2017-11-06 17:16:55

Linux設備驅動并發(fā)控制

2021-07-19 09:25:19

數據庫MySQL技術

2021-04-19 05:41:04

JavaScript大文件下載

2017-08-21 10:56:55

MySQL并發(fā)控制

2021-03-29 08:01:20

JavaScript數據結構

2024-06-17 08:40:16

2009-09-24 14:43:53

Hibernate樂觀

2022-12-12 09:07:06

Redis并發(fā)限流

2022-08-08 20:48:09

MQ消息中間件系統(tǒng)解耦

2023-02-07 13:42:44

代碼實現(xiàn)并發(fā)

2021-05-09 19:41:35

JavaScript 前端同源通信

2024-12-26 09:15:28

2021-05-12 22:07:43

并發(fā)編排任務
點贊
收藏

51CTO技術棧公眾號