Node.js中的異步Generator函數(shù)和Websockets
異步 generator 函數(shù)是 ES2018 中新增的特性。Node.js 從 v10 版本增加了對異步 generator 函數(shù)的支持。異步 generator 函數(shù)看似一個相當小眾特性特性,但是卻為 node.js websocket 框架提供了一個靈巧的使用機會。
在這篇文章中,我將說明 Node.js websocket 框架將如何使用異步 generator 函數(shù)。
HTTP 框架分類
首先,想一下 Express 或 Hapi 之類的 HTTP 服務(wù)器框架。一般來說,大多數(shù) HTTP 服務(wù)器框架都屬于以下三種之一:
1. 顯式響應(yīng)。 在 Express 中發(fā)送一個 HTTP 響應(yīng),你必須調(diào)用 res.end(),res.json() 或者 res 對象上的一些其他方法。換句話說,你必須顯式調(diào)用一個方法來發(fā)送一個響應(yīng)。
2. 使用 return 隱式響應(yīng)。 另一方面,Hapi 在 v17 中明確地刪除了 reply() 函數(shù),也就是說 Hapi 沒有等同于 res 的方式。如果需要發(fā)送一個響應(yīng)。你只需在請求的處理方法中 return 一個返回值。之后 Hapi 就會將 return 的值封裝進一個 HTTP 響應(yīng)中。
3. 在適當?shù)奈恢眯薷捻憫?yīng)。 Koa 使用了一種混合了以上兩種實現(xiàn)的獨特處理方式。你將以修改 ctx 對象的方式,替代調(diào)用 res 對象的方法來構(gòu)建響應(yīng)。
換句話說,一些 HTTP 框架要求你顯式調(diào)用方法來發(fā)送 HTTP 響應(yīng),另一些框架會提供給你一個可更改的 HTTP 響應(yīng)對象,還有一些框架僅需要處理函數(shù)中 return 一個值。
Websockets 和 HTTP 的區(qū)別在于,Websockets 服務(wù)器可以在任何時間向 socket 推送消息,不管是不是基于某條消息的響應(yīng)。也就是說,初級的 websocket 框架,例如 ws, 看起來很像 “顯式響應(yīng)” 模式:你需要顯式調(diào)用一個方法用于發(fā)送一條消息。
然而,是否可以在保持允許消息多發(fā)這個優(yōu)點的同時,使 websockets 可以實現(xiàn)隱式響應(yīng)?這就是異步 generator 產(chǎn)生的原因。
從服務(wù)器上讀取大塊數(shù)據(jù)
假設(shè)你有一個一次讀取一堆文檔的 Mongoose 指針,并且你希望用 websocket 在每一個文檔讀出時盡快將它發(fā)送出去。這種方式有助于在任何時刻都使服務(wù)器的內(nèi)存使用量保持在最?。嚎蛻舳丝梢垣@取所有的數(shù)據(jù),而服務(wù)器卻不用為此在內(nèi)存中一次保存所有的數(shù)據(jù)。舉個例子,這是使用 async/await 方式讀取一個指針的實現(xiàn):
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- console.log(doc.name); // Print user names 1 by 1.
- }
使 generator 函數(shù)變得有趣的地方在于,在一個函數(shù)中 yield 方法可以被調(diào)用多次,并且在上次停止的地方繼續(xù)運行,除了這點以外,yield 方法和 return 方法類似。
- const User = mongoose.model('User', mongoose.Schema({ name: String }));
- async function* streamUsers() {
- const cursor = Model.find().cursor();
- for await (const doc of cursor) {
- // Yielding each doc behaves like multiple implicit responses, if you have
- // a framework that supports it.
- yield doc;
- }
- }
以下是如何使用 Node.js 編寫一個 Websocket 服務(wù)器:
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- // Handle message
- });
- });
至此,接下來要做的是為 websocket 服務(wù)器添加 streamUsers() 方法。假設(shè)收到的每條消息都是有效的 JSON,并且都有屬性 action 和 id。當 action === 'streamUsers'時,streamUsers() 就會被執(zhí)行,并且基于 socket 向外發(fā)送每個被 Mongoose cursor 查詢出來的用戶。
- const WebSocket = require('ws');
- const server = new WebSocket.Server({
- port: 8080
- });
- server.on('connection', function(socket) {
- socket.on('message', function(msg) {
- msg = JSON.parse(msg);
- if (msg.action === 'streamUsers') {
- void async function() {
- // Send 1 message per user, as opposed to loading all users and then
- // sending them all in 1 message.
- for await (const doc of streamUsers()) {
- socket.send(JSON.stringify({ id: msg.id, doc }));
- }
- }().catch(err => socket.send(JSON.stringify({ id: msg.id, error: err.message })));
- }
- });
- });
以下是如何通過 websocket 客戶端調(diào)用 streamUsers() 方法:
- const client = new WebSocket('ws://localhost:8080');
- // Will print each user doc 1 at a time.
- client.on('message', msg => console.log(msg));
- await new Promise(resolve => client.once('open', resolve));
- client.send(JSON.stringify({ action: 'streamUsers', id: 1 }));
后續(xù)
異步 generator 函數(shù)提供了一種創(chuàng)建更高級的,如同一些 HTTP 框架(例如 Hapi 和 Fastify)那樣,基于隱式響應(yīng)的 websocket 框架的機會。而隱式響應(yīng)的主要優(yōu)勢就在于,你在業(yè)務(wù)邏輯中不需要關(guān)注框架是通過 websocket,HTTP 輪詢或是其他某種方式來發(fā)送結(jié)果??蚣茏杂墒?Javascript 編程更輕便并且更容易測試。
通過將所有產(chǎn)生的值存放在一個數(shù)組中,或者讓客戶端發(fā)起多次請求對一個指針進行迭代,streamUsers() 方法就可以很容易的在一個 HTTP 框架,或者是一個使用輪詢的 HTTP 框架中重用。沒有異步 generator 函數(shù),所有這些都是不能實現(xiàn)的。