說一說 JavaScript 異步迭代器
你知道嗎?除了像Promise.finally這樣的 API 之外,ECMAScript 2018還為我們帶來了另一種處理迭代器的方式——異步迭代器。
問題
假設(shè)現(xiàn)在我們正處于這樣一個(gè)情景:需要使用Node.js逐行讀取文件。Node有個(gè)API叫做readLine,它是一個(gè)包裝器,可用于逐行從輸入流中讀取數(shù)據(jù),不需要分析輸入緩沖區(qū)、也不需要將文本分解為小塊。
你可以像這樣監(jiān)聽:
const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
input: fs.createReadStream('./file.txt'),
crlfDelay: Infinity
})
reader.on('line', (line) => console.log(line))
假設(shè)有這樣一個(gè)簡單的文件:
line 1
line 2
line 3
如果我們在創(chuàng)建的文件上運(yùn)行代碼,那么就能在控制臺上逐行輸出。但是,使用事件并不是編寫可維護(hù)代碼的最佳方法之一,因?yàn)槭录峭耆惒降?,可能?huì)中斷代碼流——因?yàn)槭菬o序觸發(fā)的,并且只能通過偵聽器分配操作。
解決方案
除了事件 API之外,readline還有async iterator?,F(xiàn)在我們可以不通過line事件中的偵聽器讀取,而是通過for關(guān)鍵字來讀取。
舉幾個(gè)使用for循環(huán)的例子。第一個(gè)是最常見的使用計(jì)數(shù)器和條件:
for (let x = 0; x < array.length; x++) {
// Code here
}
我們也可以使用for … in表示法讀取數(shù)組索引:
const a = [1,2,3,4,5,6]
for (let index in a) {
console.log(a[index])
}
在前一種情況下,console.log輸出從1到6的數(shù)字,但是如果我們使用console.log (index),那么記錄的是數(shù)組的索引,從0到5的數(shù)字。
下面,我們使用for … of表示法,直接獲取數(shù)組的可枚舉屬性,即它的直接值:
const a = [1,2,3,4,5,6]
for (let item of a) {
console.log(item)
}
注意,這些方法都是同步的。那么,如果我們有一系列promise,這時(shí)該如何按順序讀取呢?
假設(shè)我們還有另一個(gè)接口,它總是返回一個(gè)Promise。為了按順序解析promise,我們需要這樣做:
async function readLine (files) {
for (const file of files) {
const line = await readFile(file) // Imagine readFile is our cursor
console.log(line)
}
}
而現(xiàn)在,多虧異步可迭代對象(如readline)的魔力,我們可以執(zhí)行以下操作:
const fs = require('fs')
const readline = require('readline')
const reader = readline.createInterface({
input: fs.createReadStream('./xpto.txt'),
crlfDelay: Infinity
})
async function read () {
for await (const line of reader) {
console.log(line)
}
}
read()
注意,我們現(xiàn)在使用的是for、for await (const x of y)的新定義。
對于await和node.js
從10.x版開始,Node.js運(yùn)行時(shí)原生支持for await表示法。如果你使用的是8.x或9.x版本,則需要使用--harmony_async_iteration標(biāo)志啟動(dòng)Javascript文件。遺憾的是,Node.js的版本6和版本7不支持異步迭代器。
為了理解異步迭代器的概念,首先我們需要知道迭代器的本質(zhì)。簡而言之,迭代器是一個(gè)對象,公開next()函數(shù),此函數(shù)返回另一個(gè)對象,其中{value: any, done: boolean}表示當(dāng)前迭代的值,done表示序列中是否還有其他值。
遍歷數(shù)組中所有項(xiàng)的迭代器示例如下:
const array = [1,2,3]
let index = 0
const iterator = {
next: () => {
if (index >= array.length) return { done: true }
return {
value: array[index++],
done: false
}
}
}
就其本身而言,迭代器沒有實(shí)際用途,那么怎么辦呢?為此,我們需要iterable。iterable是一個(gè)對象,它有一個(gè)Symbol.iterator鍵,該鍵返回的函數(shù)返回迭代器:
// ... Iterator code here ...
const iterable = {
[Symbol.iterator]: () => iterator
}
現(xiàn)在我們可以正常使用迭代器了,通過for (const x of iterable),我們可以一個(gè)一個(gè)地迭代array中的所有值。
在后臺,所有數(shù)組和對象都有Symbol.iterator,這樣就可以執(zhí)行for (let x of [1,2,3])并返回我們想要的值。
我們可以看到,異步迭代器與迭代器完全相同,不同之處在于iterable擁有的是Symbol.asyncIterator,而不是Symbol.iterator,擁有的是解析為具有相同簽名對象的Promise,而不是返回{value, done}的對象。
讓我們把上面的迭代器變成一個(gè)異步迭代器:
const array = [1,2,3]
let index = 0
const asyncIterator = {
next: () => {
if (index >= array.length) return Promise.resolve({done: true})
return Promise.resolve({value: array[index++], done: false})
}
}
const asyncIterable = {
[Symbol.asyncIterator]: () => asyncIterator
}
異步迭代
我們可以通過調(diào)用next()函數(shù)來手動(dòng)迭代迭代器:
// ... Async iterator Code here ...
async function manual () {
const promise = asyncIterator.next() // Promise
await p // Object { value: 1, done: false }
await asyncIterator.next() // Object { value: 2, done: false }
await asyncIterator.next() // Object { value: 3, done: false }
await asyncIterator.next() // Object { done: true }
}
為了遍歷異步迭代器,我們需要使用for await,但請記住,關(guān)鍵字await只能在異步函數(shù)中使用,所以我們需要有類似這樣的代碼:
// ... Code above ...
async function iterate () {
for await (const num of asyncIterable) console.log(num)
}
iterate() // 1, 2, 3
但是,由于Node 8.x和9.x這樣的老版本不支持異步迭代器,為了在這些版本中使用異步迭代器,我們可以簡單地從對象中提取next并手動(dòng)遍歷:
// ... Async Iterator Code here ...
async function iterate () {
const {next} = asyncIterable[Symbol.asyncIterator]() // we take the next iterator function
for (let {value, done} = await next(); !done; {value, done} = await next()) {
console.log(value)
}
}
注意,for await更干凈、更簡潔,因?yàn)樗男袨轭愃朴诔R?guī)循環(huán),而且,除了更易于理解之外,還可以通過done鍵自行檢查迭代器的結(jié)束。
處理錯(cuò)誤
如果promise在迭代器中被拒絕,會(huì)發(fā)生什么?好吧,和任何被拒絕的promise一樣,通過簡單的try/catch就可以來捕獲錯(cuò)誤(因?yàn)槲覀兪褂玫氖莂wait):
const asyncIterator = { next: () => Promise.reject('Error') }
const asyncIterable = { [Symbol.asyncIterator]: () => asyncIterator
async function iterate () {
try {
for await (const num of asyncIterable) {}
} catch (e) {
console.log(e.message)
}
}
iterate()
回退
關(guān)于異步迭代器,非常有趣的一點(diǎn)是,它們有Symbol.iterator的回退,這意味著你也可以將它與常規(guī)迭代器一起使用,例如,有這樣一個(gè)promise數(shù)組:
const promiseArray = [
fetch('https://lsantos.dev'),
fetch('https://lsantos.me')
]
async function iterate () {
for await (const response of promiseArray) console.log(response.status)
}
iterate() // 200, 200
異步生成器
在大多數(shù)情況下,迭代器和異步迭代器可以創(chuàng)建自生成器。
生成器是允許暫停和恢復(fù)執(zhí)行的函數(shù),因此可以操作執(zhí)行,然后通過next()函數(shù)獲取下一個(gè)值。
異步生成器的行為類似于異步迭代器,但你必須手動(dòng)實(shí)現(xiàn)停止機(jī)制,例如,這里我們構(gòu)建一個(gè)用于git提交的隨機(jī)消息生成器:
async function* gitCommitMessageGenerator () {
const url = 'https://whatthecommit.com/index.txt'
while (true) {
const response = await fetch(url)
yield await response.text() // We return the value
}
}
注意,在任何時(shí)候都不會(huì)返回{value, done}對象,因此循環(huán)無法知道執(zhí)行何時(shí)完成。這時(shí)我們可以實(shí)現(xiàn)這樣的函數(shù):
// Previous Code
async function getCommitMessages (times) {
let execution = 1
for await (const message of gitCommitMessageGenerator()) {
console.log(message)
if (execution++ >= times) break
}
}
getCommitMessages(5)
// I'll explain this when I'm sober .. or revert it
// Never before had a small typo like this one caused so much damage.
// For real, this time.
// Too lazy to write descriptive message
// Ugh. Bad rebase.
用例
再來一個(gè)更有趣的示例,為一個(gè)真實(shí)用例構(gòu)建異步迭代器。目前,適用于Node.js的Oracle數(shù)據(jù)庫驅(qū)動(dòng)程序支持resultSet API,因此可以在數(shù)據(jù)庫上執(zhí)行查詢并返回,而數(shù)據(jù)流則可以通過getRow()方法逐個(gè)讀取。
要?jiǎng)?chuàng)建resultSet,我們需要在數(shù)據(jù)庫中執(zhí)行查詢,如下所示:
const oracle = require('oracledb')
const options = {
user: 'example',
password: 'example123',
connectString: 'string'
}
async function start () {
const connection = await oracle.getConnection(options)
const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
return resultSet
}
start().then(console.log)
resultSet有一個(gè)名為getRow()的方法,這個(gè)方法從數(shù)據(jù)庫中返回要獲取的下一行的Promise。我們可以創(chuàng)建一個(gè)逐行返回此resultSet的光標(biāo)。下面讓我們創(chuàng)建Cursor類:
class Cursor {
constructor(resultSet) {
this.resultSet = resultSet
}
getIterable() {
return {
[Symbol.asyncIterator]: () => this._buildIterator()
}
}
_buildIterator() {
return {
next: () => this.resultSet.getRow().then((row) => ({ value: row, done: row === undefined }))
}
}
}
module.exports = Cursor
查看光標(biāo)是否接收到它應(yīng)該處理的resultSet,并將其存儲(chǔ)在當(dāng)前狀態(tài)。因此,我們需要更改之前的方法,以便返回光標(biāo)而不是resultSet:
const oracle = require('oracledb')
const options = {
user: 'example',
password: 'example123',
connectString: 'string'
}
async function getResultSet() {
const connection = await oracle.getConnection(options)
const { resultSet } = await connection.execute('query', [], { outFormat: oracle.OBJECT, resultSet: true })
return resultSet
}
async function start() {
const resultSet = await getResultSet()
const cursor = new Cursor(resultSet)
for await (const row of cursor.getIterable()) {
console.log(row)
}
}
start()
這樣,我們就可以遍歷所有返回的行,不需要單獨(dú)的Promise解析。
結(jié)論
異步迭代器非常強(qiáng)大,尤其是在Javascript等動(dòng)態(tài)和異步語言中。有了它們,我們就可以將復(fù)雜的執(zhí)行變成簡單的代碼,從而向用戶隱藏復(fù)雜性,增加友好的用戶體驗(yàn)。