Websocket庫(kù)Ws原理分析
前言:本文幾基于nodejs的ws模塊分析websocket的原理。
ws服務(wù)器邏輯由websocket-server.js的WebSocketServer類實(shí)現(xiàn)。該類初始化了一些參數(shù)后就執(zhí)行以下代碼
- if (this._server) {
- // 給server注冊(cè)下面事件,返回一個(gè)注銷(xiāo)函數(shù)(用于注銷(xiāo)下面注冊(cè)的事件)
- this._removeListeners = addListeners(this._server, {
- // listen成功的回調(diào)
- listening: this.emit.bind(this, 'listening'),
- error: this.emit.bind(this, 'error'),
- // 收到協(xié)議升級(jí)請(qǐng)求的回調(diào)
- upgrade: (req, socket, head) => {
- this.handleUpgrade(req, socket, head, (ws) => {
- // 處理成功,觸發(fā)鏈接成功事件
- this.emit('connection', ws, req);
- });
- }
- });
我們看到ws監(jiān)聽(tīng)了upgrade事件,當(dāng)有websocket請(qǐng)求到來(lái)時(shí)就會(huì)執(zhí)行handleUpgrade處理升級(jí)請(qǐng)求,升級(jí)成功后觸發(fā)connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗(yàn)升級(jí)請(qǐng)求的一些http頭。ws提供了一個(gè)校驗(yàn)的鉤子。處理完http頭后,會(huì)調(diào)verifyClient校驗(yàn)是否允許升級(jí)請(qǐng)求。如果成功則執(zhí)行completeUpgrade。顧名思義,completeUpgrade是完成升級(jí)請(qǐng)求的函數(shù),該函數(shù)返回同意協(xié)議升級(jí)并且設(shè)置一些http響應(yīng)頭。另外還有一些重要的邏輯處理。
- const ws = new WebSocket(null);
- // 設(shè)置管理socket的數(shù)據(jù)
- ws.setSocket(socket, head, this.options.maxPayload);
- // cb就是this.emit('connection', ws, req);
- cb(ws);
我們看到這里新建了一個(gè)WebSocket對(duì)象并且調(diào)用了他的setSocket函數(shù)。我們來(lái)看看他做了什么。setSocket的邏輯非常多,我們慢慢分析。
數(shù)據(jù)接收者
- class Receiver extends Writable {}
我們看到數(shù)據(jù)接收者是一個(gè)可寫(xiě)流。這就意味著我們可以往里面寫(xiě)數(shù)據(jù)。
- const receiver = new Receiver();
- receiver.write('hello');
我們看一下這時(shí)候Receiver的邏輯。
- _write(chunk, encoding, cb) {
- if (this._opcode === 0x08 && this._state == GET_INFO) return cb();
- this._bufferedBytes += chunk.length;
- this._buffers.push(chunk);
- this.startLoop(cb);
- }
首先記錄當(dāng)前數(shù)據(jù)的大小,然后把數(shù)據(jù)存起來(lái),最后執(zhí)行startLoop。
- startLoop(cb) {
- let err;
- this._loop = true;
- do {
- switch (this._state) {
- // 忽略其他case
- case GET_DATA:
- err = this.getData(cb);
- break;
- default:
- // `INFLATING`
- this._loop = false;
- return;
- }
- } while (this._loop);
- cb(err);
- }
我們知道websocket是基于tcp上層的應(yīng)用層協(xié)議,所以我們收到數(shù)據(jù)時(shí),需要解析出一個(gè)個(gè)數(shù)據(jù)包(粘包問(wèn)題),所以Receiver其實(shí)就是一個(gè)狀態(tài)機(jī),每次收到數(shù)據(jù)的時(shí)候,都會(huì)根據(jù)當(dāng)前的狀態(tài)進(jìn)行狀態(tài)流轉(zhuǎn)。比如當(dāng)前處于GET_DATA狀態(tài),那么就會(huì)進(jìn)行數(shù)據(jù)的處理。我們接著看一下數(shù)據(jù)處理的邏輯。
- getData(cb) {
- let data = EMPTY_BUFFER;
- // 提取數(shù)據(jù)部分
- if (this._payloadLength) {
- data = this.consume(this._payloadLength);
- if (this._masked) unmask(data, this._mask);
- }
- // 是控制報(bào)文則執(zhí)行controlMessage
- if (this._opcode > 0x07) return this.controlMessage(data);
- // 做了壓縮,則先解壓
- if (this._compressed) {
- this._state = INFLATING;
- this.decompress(data, cb);
- return;
- }
- // 沒(méi)有壓縮則直接處理(先存到_fragments,然后執(zhí)行dataMessage)
- if (data.length) {
- this._messageLength = this._totalPayloadLength;
- this._fragments.push(data);
- }
- return this.dataMessage();
- }
我們執(zhí)行websocket協(xié)議定義了報(bào)文的類型,比如控制報(bào)文,數(shù)據(jù)報(bào)文。我們分別看一下這兩個(gè)的邏輯。
- controlMessage(data) {
- // 連接關(guān)閉
- if (this._opcode === 0x08) {
- this._loop = false;
- if (data.length === 0) {
- this.emit('conclude', 1005, '');
- this.end();
- }
- } else if (this._opcode === 0x09) {
- this.emit('ping', data);
- } else {
- this.emit('pong', data);
- }
- this._state = GET_INFO;
- }
我們看到控制報(bào)文包括三種(conclude、ping、pong)。而數(shù)據(jù)報(bào)文只有this.emit('message', data);一種。這個(gè)就是接收者的整體邏輯。
2 數(shù)據(jù)發(fā)送者
數(shù)據(jù)發(fā)送者是對(duì)websocket協(xié)議的封裝,當(dāng)用戶調(diào)研數(shù)據(jù)發(fā)送者的send接口發(fā)送數(shù)據(jù)時(shí),數(shù)據(jù)發(fā)送者會(huì)組裝成一個(gè)websocket協(xié)議的包再發(fā)送出去。
- send(data, options, cb) {
- const buf = toBuffer(data);
- const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName];
- let opcode = options.binary ? 2 : 1;
- let rsv1 = options.compress;
- if (this._firstFragment) {
- this._firstFragment = false;
- if (rsv1 && perMessageDeflate) {
- rsv1 = buf.length >= perMessageDeflate._threshold;
- }
- this._compress = rsv1;
- } else {
- rsv1 = false;
- opcode = 0;
- }
- if (options.fin) this._firstFragment = true;
- // 需要壓縮
- if (perMessageDeflate) {
- const opts = {
- fin: options.fin,
- rsv1,
- opcode,
- mask: options.mask,
- readOnly: toBuffer.readOnly
- };
- // 正在壓縮,則排隊(duì)等待,否則執(zhí)行壓縮
- if (this._deflating) {
- this.enqueue([this.dispatch, buf, this._compress, opts, cb]);
- } else {
- this.dispatch(buf, this._compress, opts, cb);
- }
- } else {
- // 不需要壓縮,直接發(fā)送
- this.sendFrame(
- Sender.frame(buf, {
- fin: options.fin,
- rsv1: false,
- opcode,
- mask: options.mask,
- readOnly: toBuffer.readOnly
- }),
- cb
- );
- }
- }
send函數(shù)做了一些參數(shù)的處理后發(fā)送數(shù)據(jù),但是如果需要壓縮的話,要壓縮后才能發(fā)送。數(shù)據(jù)處理完成后調(diào)用真正的發(fā)送函數(shù)
- sendFrame(list, cb) {
- if (list.length === 2) {
- this._socket.cork();
- this._socket.write(list[0]);
- this._socket.write(list[1], cb);
- this._socket.uncork();
- } else {
- this._socket.write(list[0], cb);
- }
- }
了解了數(shù)據(jù)接收者和發(fā)送者的邏輯后,我們看一下websocket對(duì)象和setSocket函數(shù)做了什么事情,websocket對(duì)象本質(zhì)是對(duì)TCP socket的封裝。它接收來(lái)自底層的數(shù)據(jù),然后透?jìng)鹘o數(shù)據(jù)接收者,數(shù)據(jù)接收者處理完后,觸發(fā)websocket對(duì)應(yīng)的對(duì)應(yīng)的事件,比如message事件。發(fā)送數(shù)據(jù)的時(shí)候,websocket會(huì)調(diào)用數(shù)據(jù)發(fā)送者的接口,數(shù)據(jù)發(fā)送者組裝成websocket協(xié)議的數(shù)據(jù)包后再發(fā)送出去,架構(gòu)如下圖所示。
接下來(lái)我們看看setSocket的邏輯
- setSocket(socket, head, maxPayload) {
- // 數(shù)據(jù)接收者,負(fù)責(zé)處理tcp上收到的數(shù)據(jù)(socket是tcp層的socket)
- const receiver = new Receiver(...);
- // 數(shù)據(jù)發(fā)送者,負(fù)責(zé)發(fā)送數(shù)據(jù)給對(duì)端
- this._sender = new Sender(socket, this._extensions);
- // 數(shù)據(jù)接收者,負(fù)責(zé)解析數(shù)據(jù)
- this._receiver = receiver;
- // net模塊的tcp socket
- this._socket = socket;
- // 關(guān)聯(lián)起來(lái)
- receiver[kWebSocket] = this;
- socket[kWebSocket] = this;
- // 監(jiān)聽(tīng)接收者的事件,解析數(shù)據(jù)的時(shí)候會(huì)回調(diào)
- receiver.on('conclude', receiverOnConclude);
- // 下面兩個(gè)事件由Writable觸發(fā)
- receiver.on('drain', receiverOnDrain);
- receiver.on('error', receiverOnError);
- receiver.on('message', receiverOnMessage);
- receiver.on('ping', receiverOnPing);
- receiver.on('pong', receiverOnPong);
- // 清除定時(shí)器
- socket.setTimeout(0);
- // 關(guān)閉nagle算法
- socket.setNoDelay();
- // 升級(jí)請(qǐng)求中,攜帶的http body,通常是空
- if (head.length > 0) socket.unshift(head);
- // 監(jiān)聽(tīng)tcp底層的事件
- socket.on('close', socketOnClose);
- socket.on('data', socketOnData);
- socket.on('end', socketOnEnd);
- socket.on('error', socketOnError);
- this.readyState = WebSocket.OPEN;
- this.emit('open');
- }
我們看到里面監(jiān)聽(tīng)了各種事件,下面以data事件為例,看一下處理過(guò)程。當(dāng)tcp socket收到數(shù)據(jù)的時(shí)候會(huì)執(zhí)行socketOnData函數(shù)。
- function socketOnData(chunk) {
- // 會(huì)調(diào)用receiver里的_write函數(shù),其實(shí)就是換成到receiver對(duì)象上,如果數(shù)據(jù)解析出錯(cuò),會(huì)觸發(fā)socket error事件
- if (!this[kWebSocket]._receiver.write(chunk)) {
- this.pause();
- }
- }
socketOnData通過(guò)接收者的接口把數(shù)據(jù)傳給接收者,接收者會(huì)解析數(shù)據(jù),然后觸發(fā)對(duì)應(yīng)的事件,比如message。
- receiver.on('message', receiverOnMessage);
- function receiverOnMessage(data) {
- this[kWebSocket].emit('message', data);
- }
然后ws的socket對(duì)象繼續(xù)往上層觸發(fā)message事件。this[kWebSocket]的值是ws提供的socket對(duì)象本身。架構(gòu)圖如下。
這就是ws實(shí)現(xiàn)websocket協(xié)議的基本原理,具體細(xì)節(jié)可以參考源碼。