如何實現(xiàn)Nodejs進程間通信
本文轉載自微信公眾號「編程雜技」,作者theanarkh 。轉載本文請聯(lián)系編程雜技公眾號。
對于有繼承關系的進程,nodejs本身為我們提供了進程間通信的方式,但是對于沒有繼承關系的進程,比如兄弟進程,想要通信最簡單的方式就是通過主進程中轉,類似前端框架中子組件通過更新父組件的數據,然后父通知其他子組件。因為nodejs內置的進程間通信需要經過序列化和反序列化,所以這種方式可能會帶來一定的性能損耗,而且在實現(xiàn)上也比較麻煩。今天介紹的是實現(xiàn)兄弟進程通信的另外一種方式,在windows上使用命名管道,在非windows上使用unix域,另外本文還會介紹基于tcp的遠程進程通信的實現(xiàn)。下面具體介紹一下設計和實現(xiàn)。
1 IPC的實現(xiàn)
ipc的實現(xiàn)比較簡單,主要是對nodejs提供的功能進行封裝。首先我們需要處理一下path,因為在命名管道和unix域中他的格式是不一樣的。
- const os = require('os');
- module.exports = {
- path: os.platform() === 'win32' ? '\\\\?\\pipe\\ipc' : '/tmp/unix.sock',
- };
接著我們看看客戶端和服務器的實現(xiàn)。
1.1 IPCClient的實現(xiàn)
- const net = require('net');
- const { EventEmitter } = require('events');
- const { path } = require('../config');
- class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = { path, ...options };
- const socket = net.connect(this.options);
- socket.on('error', (error) => {
- console.error(error);
- });
- return socket;
- }
- }
- module.exports = {
- Client,
- };
1.2 IPCServer的實現(xiàn)
- const fs = require('fs');
- const net = require('net');
- const { EventEmitter } = require('events');
- const { path } = require('../config');
- class Server extends EventEmitter {
- constructor(options, connectionListener) {
- super();
- if (typeof options === 'function') {
- options = {
- connectionListener: options,
- };
- } else {
- options = { ...options, connectionListener };
- }
- try {
- fs.existsSync(options.path) && fs.unlinkSync(options.path);
- } catch(e) {
- }
- this.options = { path, ...options };
- return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
- client.on('error', (error) => {
- console.error(error);
- });
- typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
- }).listen(this.options);
- }
- }
- module.exports = {
- Server,
- };
2 RPC的實現(xiàn)
我們知道tcp是面向流的服務,他本身只負責傳輸數據,不負責數據的解析和解釋。通過tcp傳輸數據時,需要自己解析數據,我們需要從一串字節(jié)流中解析出一個個數據包。這就涉及到協(xié)議的設計。所以首先我們要定義一個應用層協(xié)議。
2.1 應用層協(xié)議的設計和實現(xiàn)
null應用層協(xié)議的設計非常簡單
1 總長度是除了開頭標記之外的其他數據長度。因為數據部分是變長的,所以我們需要一個總長度來判斷后續(xù)的數據長度是多少。
2 序列號是用于關聯(lián)請求和響應,因為我們在一個連接上可能會串行發(fā)送多個數據包,當我們收到一個回包的時候,我們不知道是來自哪個請求的響應,通過響應體中的seq,我們就知道是來自哪個請求的響應。設計了通信協(xié)議后,我們就需要對協(xié)議進行封包解包。首先我們看一下封包邏輯。
- function seq() {
- return ~~(Math.random() * Math.pow(2, 31))
- }
- function packet(data, sequnce) {
- // 轉成buffer
- const bufferData = Buffer.from(data, 'utf-8');
- // 開始標記長度
- const startFlagLength = Buffer.from([PACKET_START]).byteLength;
- // 序列號
- const _seq = sequnce || seq();
- // 分配一個buffer存儲數據
- let buffer = Buffer.allocUnsafe(startFlagLength + TOTAL_LENGTH + SEQ_LEN);
- // 設計開始標記
- buffer[0] = 0x3;
- // 寫入總長度字段的值
- buffer.writeUIntBE(TOTAL_LENGTH + SEQ_LEN + bufferData.byteLength, 1, TOTAL_LENGTH);
- // 寫入序列號的值
- buffer.writeUIntBE(_seq, startFlagLength + TOTAL_LENGTH, SEQ_LEN);
- // 把協(xié)議元數據和數據組裝到一起
- buffer = Buffer.concat([buffer, bufferData], buffer.byteLength + bufferData.byteLength);
- return buffer;
- }
接著我們看一下解包的邏輯,因為數據的傳輸是字節(jié)流,所以有可能多個數據包的數據會粘在一起,所以我們第一步首先要根據協(xié)議解析出一個個數據包,然后再解析每一個數據包。我們通過有限狀態(tài)機實現(xiàn)數據的解析。下面是狀態(tài)機的狀態(tài)集。
- const PARSE_STATE = {
- PARSE_INIT: 0,
- PARSE_HEADER: 1,
- PARSE_DATA: 2,
- PARSE_END: 3,
- };
接著我們定義狀態(tài)集的轉換規(guī)則。
- class StateSwitcher {
- constructor(options) {
- this.options = options;
- }
- [PARSE_STATE.PARSE_INIT](data) {
- // 數據不符合預期
- if (data[0] !== PACKET_START) {
- // 跳過部分數據,找到開始標記
- const position = data.indexOf(PACKET_START);
- // 沒有開始標記,說明這部分數據無效,丟棄
- if (position === -1) {
- return [NEED_MORE_DATA, null];
- }
- // 否則返回有效數據部分,繼續(xù)解析
- return [PARSE_STATE.PACKET_START, data.slice(position)];
- }
- // 保存當前正在解析的數據包
- this.packet = new Packet();
- // 跳過開始標記的字節(jié)數,進入解析協(xié)議頭階段
- return [PARSE_STATE.PARSE_HEADER, data.slice(Buffer.from([PACKET_START]).byteLength)];
- }
- [PARSE_STATE.PARSE_HEADER](data) {
- // 數據不夠頭部的大小則等待數據到來
- if (data.length < TOTAL_LENGTH + SEQ_LEN) {
- return [NEED_MORE_DATA, data];
- }
- // 有效數據包的長度 = 整個數據包長度 - 頭部長度
- this.packet.set('length', data.readUInt32BE() - (TOTAL_LENGTH + SEQ_LEN));
- // 序列號
- this.packet.set('seq', data.readUInt32BE(TOTAL_LENGTH));
- // 解析完頭部了,跳過去
- data = data.slice(TOTAL_LENGTH + SEQ_LEN);
- // 進入解析數據階段
- return [PARSE_STATE.PARSE_DATA, data];
- }
- [PARSE_STATE.PARSE_DATA](data) {
- const len = this.packet.get('length');
- // 數據部分的長度小于協(xié)議頭中定義的長度,則繼續(xù)等待
- if (data.length < len) {
- return [NEED_MORE_DATA, data];
- }
- // 截取數據部分
- this.packet.set('data', data.slice(0, len));
- // 解析完數據了,完成一個包的解析,跳過數據部分
- data = data.slice(len);
- typeof this.options.cb === 'function' && this.options.cb(this.packet);
- this.packet = null;
- // 解析完一個數據包,進入結束標記階段
- return [PARSE_STATE.PARSE_INIT, data];
- }
- }
我們再看一下狀態(tài)機的實現(xiàn)
- class FSM {
- constructor(options) {
- this.options = options;
- // 狀態(tài)處理機,定義了狀態(tài)轉移集合
- this.stateSwitcher = new StateSwitcher({cb: options.cb});
- // 當前狀態(tài)
- this.state = PARSE_STATE.PARSE_INIT;
- // 結束狀態(tài)
- this.endState = PARSE_STATE.PARSE_END;
- // 當前待解析的數據
- this.buffer = null;
- }
- run(data) {
- // 沒有數據或者解析結束了直接返回
- if (this.state === this.endState || !data || !data.length) {
- return;
- }
- // 保存待解析的數據
- this.buffer = this.buffer ? Buffer.concat([this.buffer, data]) : data;
- // 還沒結束,并且還有數據可以處理則繼續(xù)執(zhí)行
- while(this.state !== this.endState && this.buffer && this.buffer.length) {
- // 執(zhí)行狀態(tài)處理函數,返回[下一個狀態(tài), 剩下的數據]
- const result = this.stateSwitcher[this.state](this.buffer);
- // 如果下一個狀態(tài)是NEED_MORE_DATA則說明需要更多的數據才能繼續(xù)解析,并保持當前狀態(tài)
- if (result[0] === NEED_MORE_DATA) {
- return;
- }
- // 記錄下一個狀態(tài)和數據
- [this.state, this.buffer] = result;
- }
- }
- }
狀態(tài)機就是對開始狀態(tài)、結束狀態(tài)、狀態(tài)轉換集的封裝。實現(xiàn)了協(xié)議的封包和解析后我們看一下如何使用。
2.2 RPC客戶端實現(xiàn)
- const net = require('net');
- const { EventEmitter } = require('events');
- const { FSM } = require('tiny-application-layer-protocol');
- class Client extends EventEmitter {
- constructor(options) {
- super();
- this.options = { ...options };
- const socket = net.connect(this.options);
- socket.on('error', (error) => {
- console.error(error);
- });
- const fsm = new FSM({
- cb: (packet) => {
- socket.emit('message', packet);
- }
- });
- socket.on('data', fsm.run.bind(fsm));
- return socket;
- }
- }
- module.exports = {
- Client,
- };
我們做的事情主要是負責數據的解析。
2.3 RPC服務器實現(xiàn)
- const fs = require('fs');
- const net = require('net');
- const { EventEmitter } = require('events')
- const { FSM } = require('tiny-application-layer-protocol');
- class Server extends EventEmitter {
- constructor(options, connectionListener) {
- super();
- if (typeof options === 'function') {
- options = {
- connectionListener: options,
- };
- } else {
- options = { ...options, connectionListener };
- }
- this.options = { ...options };
- return net.createServer({allowHalfOpen: this.options.allowHalfOpen, pauseOnConnect: this.options.pauseOnConnect}, (client) => {
- const fsm = new FSM({
- cb: function(packet) {
- client.emit('message', packet);
- }
- })
- client.on('data', fsm.run.bind(fsm));
- client.on('error', (error) => {
- console.error(error);
- });
- typeof this.options.connectionListener === 'function' && this.options.connectionListener(client);
- }).listen(this.options);
- }
- }
- module.exports = {
- Server,
- };
同樣,服務器也是負責數據的解析
3 使用
接下來我們看一下如何使用。
3.1 ipc的使用
server.js
- const { IPCServer } = require('../../src');
- const { packet } = require('tiny-application-layer-protocol');
- new IPCServer(function(client) {
- console.log(1)
- client.on('data', (data) => {
- console.log('receive', data);
- client.write(packet('world', data.seq));
- });
- });
client.js
- const { IPCClient } = require('../../src');
- const { packet, seq } = require('tiny-application-layer-protocol');
- const client = new IPCClient();
- client.write(packet('hello', seq()));
- client.on('data', function(res) {
- console.log('receive', res);
- })
服務器輸出
客戶端輸出
3.2 RPC的使用
server.js
- const { RPCServer } = require('../../src');
- const { packet } = require('tiny-application-layer-protocol');
- new RPCServer({host: '127.0.0.1', port: 80}, function(client) {
- client.on('message', (data) => {
- console.log('receive', data);
- client.write(packet('world', data.seq));
- });
- });
client.js
- const { RPCClient } = require('../../src');
- const { packet, seq } = require('tiny-application-layer-protocol');
- const client = new RPCClient({host: '127.0.0.1', port: 80});
- client.write(packet('hello', seq()));
- client.on('message', function(res) {
- console.log('receive', res);
- })
服務器輸出
客戶端輸出
4 RPC拓展
我們實現(xiàn)了數據的傳輸和解析,但是如何我們希望數據的請求和響應是一一對應的怎么辦呢?比如像http在tcp上可以并發(fā)發(fā)起多個請求一樣,響應是否可以亂序返回,我們又如何知道某個響應對應的是哪個請求?接下來介紹如何解決這個問題。首先我們實現(xiàn)一個請求管理的類。
- class RequestManager {
- constructor(options) {
- this.options = { timeout: 10000, ...options };
- this.map = {};
- this.timerId = null;
- this.startPollTimeout();
- }
- set(key, context) {
- if (typeof context.cb !== 'function') {
- throw new Error('cb is required');
- }
- this.map[key] = {
- startTime: Date.now(),
- ...context,
- };
- }
- get(key) {
- return this.map[key];
- }
- del(key) {
- return delete this.map[key];
- }
- // 執(zhí)行上下文
- exec(key, data) {
- const context = this.get(key);
- if (context) {
- this.del(key);
- context.cb(data);
- }
- }
- execAll(data) {
- for (const [key] of Object.entries(this.map)) {
- this.exec(key, data);
- }
- }
- // 定時輪詢是否超時
- startPollTimeout() {
- this.timerId = setTimeout(() => {
- if (!this.timerId) {
- return;
- }
- const nextMap = {};
- for (const [key, context] of Object.entries(this.map)) {
- if (Date.now() - context.startTime < (context.timeout || this.options.timeout)) {
- nextMap[key] = context;
- } else {
- context.cb(new Error('timeout'));
- }
- }
- this.map = nextMap;
- this.startPollTimeout();
- }, 1000);
- }
- }
該類的邏輯主要是請求的seq保存對應的上下文,然后收到響應的時候,我們根據響應的seq拿到對應的上下文,從而執(zhí)行對應的回調。我們看看如何使用該類。
server.js
- const { RPCServer } = require('../../src');
- const { packet } = require('tiny-application-layer-protocol');
- new RPCServer({host: '127.0.0.1', port: 80}, function(client) {
- client.on('message', (data) => {
- console.log('receive', data);
- client.end(packet('world', data.seq));
- });
- client.on('end', (data) => {
- client.end();
- });
- });
client.js
- const { RPCClient, RequestManager } = require('../../src');
- const { packet, seq } = require('tiny-application-layer-protocol');
- const requestManager = new RequestManager({timeout: 3000});
- const client = new RPCClient({host: '127.0.0.1', port: 80});
- const _seq = seq();
- requestManager.set(_seq, {
- cb: function() {
- console.log(...arguments);
- }
- })
- client.write(packet('hello', _seq));
- client.on('message', function(packet) {
- requestManager.exec(packet.seq, packet);
- })
輸出 服務器輸出
客戶端輸出null
github倉庫:https://github.com/theanarkh/nodejs-ipc
github倉庫:https://github.com/theanarkh/tiny-application-layer-protocol
npm install nodejs-i-p-c(ipc和rpc庫,依賴tiny-application-layer-protocol)
npm install tiny-application-layer-protocol(基于tcp的小型應用層協(xié)議,包含協(xié)議的定義、封包、解包功能)