自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Websocket庫(kù)Ws原理分析

開(kāi)發(fā) 前端
我們看到ws監(jiān)聽(tīng)了upgrade事件,當(dāng)有websocket請(qǐng)求到來(lái)時(shí)就會(huì)執(zhí)行handleUpgrade處理升級(jí)請(qǐng)求,升級(jí)成功后觸發(fā)connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗(yàn)升級(jí)請(qǐng)求的一些http頭。

 [[394780]]

前言:本文幾基于nodejs的ws模塊分析websocket的原理。

ws服務(wù)器邏輯由websocket-server.js的WebSocketServer類實(shí)現(xiàn)。該類初始化了一些參數(shù)后就執(zhí)行以下代碼

  1. if (this._server) { 
  2.       // 給server注冊(cè)下面事件,返回一個(gè)注銷(xiāo)函數(shù)(用于注銷(xiāo)下面注冊(cè)的事件) 
  3.       this._removeListeners = addListeners(this._server, { 
  4.         // listen成功的回調(diào) 
  5.         listening: this.emit.bind(this, 'listening'), 
  6.         error: this.emit.bind(this, 'error'), 
  7.         // 收到協(xié)議升級(jí)請(qǐng)求的回調(diào) 
  8.         upgrade: (req, socket, head) => { 
  9.           this.handleUpgrade(req, socket, head, (ws) => { 
  10.             // 處理成功,觸發(fā)鏈接成功事件 
  11.             this.emit('connection', ws, req); 
  12.           }); 
  13.         } 
  14.       }); 

我們看到ws監(jiān)聽(tīng)了upgrade事件,當(dāng)有websocket請(qǐng)求到來(lái)時(shí)就會(huì)執(zhí)行handleUpgrade處理升級(jí)請(qǐng)求,升級(jí)成功后觸發(fā)connection事件。我們先看handleUpgrade。handleUpgrade邏輯不多,主要是處理和校驗(yàn)升級(jí)請(qǐng)求的一些http頭。ws提供了一個(gè)校驗(yàn)的鉤子。處理完http頭后,會(huì)調(diào)verifyClient校驗(yàn)是否允許升級(jí)請(qǐng)求。如果成功則執(zhí)行completeUpgrade。顧名思義,completeUpgrade是完成升級(jí)請(qǐng)求的函數(shù),該函數(shù)返回同意協(xié)議升級(jí)并且設(shè)置一些http響應(yīng)頭。另外還有一些重要的邏輯處理。

  1. const ws = new WebSocket(null); 
  2. // 設(shè)置管理socket的數(shù)據(jù) 
  3. ws.setSocket(socket, head, this.options.maxPayload); 
  4. // cb就是this.emit('connection', ws, req); 
  5. cb(ws); 

我們看到這里新建了一個(gè)WebSocket對(duì)象并且調(diào)用了他的setSocket函數(shù)。我們來(lái)看看他做了什么。setSocket的邏輯非常多,我們慢慢分析。

數(shù)據(jù)接收者

  1. class Receiver extends Writable {} 

我們看到數(shù)據(jù)接收者是一個(gè)可寫(xiě)流。這就意味著我們可以往里面寫(xiě)數(shù)據(jù)。

  1. const receiver = new Receiver(); 
  2. receiver.write('hello'); 

我們看一下這時(shí)候Receiver的邏輯。

  1. _write(chunk, encoding, cb) { 
  2.     if (this._opcode === 0x08 && this._state == GET_INFO) return cb(); 
  3.     this._bufferedBytes += chunk.length; 
  4.     this._buffers.push(chunk); 
  5.     this.startLoop(cb); 
  6.   } 

首先記錄當(dāng)前數(shù)據(jù)的大小,然后把數(shù)據(jù)存起來(lái),最后執(zhí)行startLoop。

  1. startLoop(cb) { 
  2.     let err; 
  3.     this._loop = true
  4.  
  5.     do { 
  6.       switch (this._state) { 
  7.         // 忽略其他case 
  8.         case GET_DATA: 
  9.           err = this.getData(cb); 
  10.           break; 
  11.         default
  12.           // `INFLATING` 
  13.           this._loop = false
  14.           return
  15.       } 
  16.     } while (this._loop); 
  17.  
  18.     cb(err); 
  19.   } 

我們知道websocket是基于tcp上層的應(yīng)用層協(xié)議,所以我們收到數(shù)據(jù)時(shí),需要解析出一個(gè)個(gè)數(shù)據(jù)包(粘包問(wèn)題),所以Receiver其實(shí)就是一個(gè)狀態(tài)機(jī),每次收到數(shù)據(jù)的時(shí)候,都會(huì)根據(jù)當(dāng)前的狀態(tài)進(jìn)行狀態(tài)流轉(zhuǎn)。比如當(dāng)前處于GET_DATA狀態(tài),那么就會(huì)進(jìn)行數(shù)據(jù)的處理。我們接著看一下數(shù)據(jù)處理的邏輯。

  1. getData(cb) { 
  2.     let data = EMPTY_BUFFER; 
  3.     // 提取數(shù)據(jù)部分 
  4.     if (this._payloadLength) { 
  5.       data = this.consume(this._payloadLength); 
  6.       if (this._masked) unmask(data, this._mask); 
  7.     } 
  8.     // 是控制報(bào)文則執(zhí)行controlMessage 
  9.     if (this._opcode > 0x07) return this.controlMessage(data); 
  10.     // 做了壓縮,則先解壓 
  11.     if (this._compressed) { 
  12.       this._state = INFLATING; 
  13.       this.decompress(data, cb); 
  14.       return
  15.     } 
  16.     // 沒(méi)有壓縮則直接處理(先存到_fragments,然后執(zhí)行dataMessage) 
  17.     if (data.length) { 
  18.       this._messageLength = this._totalPayloadLength; 
  19.       this._fragments.push(data); 
  20.     } 
  21.  
  22.     return this.dataMessage(); 
  23.   } 

我們執(zhí)行websocket協(xié)議定義了報(bào)文的類型,比如控制報(bào)文,數(shù)據(jù)報(bào)文。我們分別看一下這兩個(gè)的邏輯。

  1. controlMessage(data) { 
  2.     // 連接關(guān)閉 
  3.     if (this._opcode === 0x08) { 
  4.       this._loop = false
  5.       if (data.length === 0) { 
  6.         this.emit('conclude', 1005, ''); 
  7.         this.end(); 
  8.       } 
  9.     } else if (this._opcode === 0x09) { 
  10.       this.emit('ping', data); 
  11.     } else { 
  12.       this.emit('pong', data); 
  13.     } 
  14.     this._state = GET_INFO; 
  15.   } 

我們看到控制報(bào)文包括三種(conclude、ping、pong)。而數(shù)據(jù)報(bào)文只有this.emit('message', data);一種。這個(gè)就是接收者的整體邏輯。

2 數(shù)據(jù)發(fā)送者

數(shù)據(jù)發(fā)送者是對(duì)websocket協(xié)議的封裝,當(dāng)用戶調(diào)研數(shù)據(jù)發(fā)送者的send接口發(fā)送數(shù)據(jù)時(shí),數(shù)據(jù)發(fā)送者會(huì)組裝成一個(gè)websocket協(xié)議的包再發(fā)送出去。

  1. send(data, options, cb) { 
  2.     const buf = toBuffer(data); 
  3.     const perMessageDeflate = this._extensions[PerMessageDeflate.extensionName]; 
  4.     let opcode = options.binary ? 2 : 1; 
  5.     let rsv1 = options.compress; 
  6.  
  7.     if (this._firstFragment) { 
  8.       this._firstFragment = false
  9.       if (rsv1 && perMessageDeflate) { 
  10.         rsv1 = buf.length >= perMessageDeflate._threshold; 
  11.       } 
  12.       this._compress = rsv1; 
  13.     } else { 
  14.       rsv1 = false
  15.       opcode = 0; 
  16.     } 
  17.  
  18.     if (options.fin) this._firstFragment = true
  19.     // 需要壓縮 
  20.     if (perMessageDeflate) { 
  21.       const opts = { 
  22.         fin: options.fin, 
  23.         rsv1, 
  24.         opcode, 
  25.         mask: options.mask, 
  26.         readOnly: toBuffer.readOnly 
  27.       }; 
  28.       // 正在壓縮,則排隊(duì)等待,否則執(zhí)行壓縮 
  29.       if (this._deflating) { 
  30.         this.enqueue([this.dispatch, buf, this._compress, opts, cb]); 
  31.       } else { 
  32.         this.dispatch(buf, this._compress, opts, cb); 
  33.       } 
  34.     } else { 
  35.       // 不需要壓縮,直接發(fā)送 
  36.       this.sendFrame( 
  37.         Sender.frame(buf, { 
  38.           fin: options.fin, 
  39.           rsv1: false
  40.           opcode, 
  41.           mask: options.mask, 
  42.           readOnly: toBuffer.readOnly 
  43.         }), 
  44.         cb 
  45.       ); 
  46.     } 
  47.   } 

send函數(shù)做了一些參數(shù)的處理后發(fā)送數(shù)據(jù),但是如果需要壓縮的話,要壓縮后才能發(fā)送。數(shù)據(jù)處理完成后調(diào)用真正的發(fā)送函數(shù)

  1. sendFrame(list, cb) { 
  2.     if (list.length === 2) { 
  3.       this._socket.cork(); 
  4.       this._socket.write(list[0]); 
  5.       this._socket.write(list[1], cb); 
  6.       this._socket.uncork(); 
  7.     } else { 
  8.       this._socket.write(list[0], cb); 
  9.     } 
  10.   } 

了解了數(shù)據(jù)接收者和發(fā)送者的邏輯后,我們看一下websocket對(duì)象和setSocket函數(shù)做了什么事情,websocket對(duì)象本質(zhì)是對(duì)TCP socket的封裝。它接收來(lái)自底層的數(shù)據(jù),然后透?jìng)鹘o數(shù)據(jù)接收者,數(shù)據(jù)接收者處理完后,觸發(fā)websocket對(duì)應(yīng)的對(duì)應(yīng)的事件,比如message事件。發(fā)送數(shù)據(jù)的時(shí)候,websocket會(huì)調(diào)用數(shù)據(jù)發(fā)送者的接口,數(shù)據(jù)發(fā)送者組裝成websocket協(xié)議的數(shù)據(jù)包后再發(fā)送出去,架構(gòu)如下圖所示。

接下來(lái)我們看看setSocket的邏輯

  1. setSocket(socket, head, maxPayload) { 
  2.     // 數(shù)據(jù)接收者,負(fù)責(zé)處理tcp上收到的數(shù)據(jù)(socket是tcp層的socket) 
  3.     const receiver = new Receiver(...); 
  4.     // 數(shù)據(jù)發(fā)送者,負(fù)責(zé)發(fā)送數(shù)據(jù)給對(duì)端 
  5.     this._sender = new Sender(socket, this._extensions); 
  6.     // 數(shù)據(jù)接收者,負(fù)責(zé)解析數(shù)據(jù) 
  7.     this._receiver = receiver; 
  8.     // net模塊的tcp socket 
  9.     this._socket = socket; 
  10.     // 關(guān)聯(lián)起來(lái) 
  11.     receiver[kWebSocket] = this; 
  12.     socket[kWebSocket] = this; 
  13.     // 監(jiān)聽(tīng)接收者的事件,解析數(shù)據(jù)的時(shí)候會(huì)回調(diào) 
  14.     receiver.on('conclude', receiverOnConclude); 
  15.     // 下面兩個(gè)事件由Writable觸發(fā) 
  16.     receiver.on('drain', receiverOnDrain); 
  17.     receiver.on('error', receiverOnError); 
  18.     receiver.on('message', receiverOnMessage); 
  19.     receiver.on('ping', receiverOnPing); 
  20.     receiver.on('pong', receiverOnPong); 
  21.     // 清除定時(shí)器 
  22.     socket.setTimeout(0); 
  23.     // 關(guān)閉nagle算法 
  24.     socket.setNoDelay(); 
  25.     // 升級(jí)請(qǐng)求中,攜帶的http body,通常是空 
  26.     if (head.length > 0) socket.unshift(head); 
  27.     // 監(jiān)聽(tīng)tcp底層的事件 
  28.     socket.on('close', socketOnClose); 
  29.     socket.on('data', socketOnData); 
  30.     socket.on('end', socketOnEnd); 
  31.     socket.on('error', socketOnError); 
  32.  
  33.     this.readyState = WebSocket.OPEN
  34.     this.emit('open'); 
  35.   } 

我們看到里面監(jiān)聽(tīng)了各種事件,下面以data事件為例,看一下處理過(guò)程。當(dāng)tcp socket收到數(shù)據(jù)的時(shí)候會(huì)執(zhí)行socketOnData函數(shù)。

  1. function socketOnData(chunk) { 
  2.   // 會(huì)調(diào)用receiver里的_write函數(shù),其實(shí)就是換成到receiver對(duì)象上,如果數(shù)據(jù)解析出錯(cuò),會(huì)觸發(fā)socket error事件 
  3.   if (!this[kWebSocket]._receiver.write(chunk)) { 
  4.     this.pause(); 
  5.   } 

socketOnData通過(guò)接收者的接口把數(shù)據(jù)傳給接收者,接收者會(huì)解析數(shù)據(jù),然后觸發(fā)對(duì)應(yīng)的事件,比如message。

  1. receiver.on('message', receiverOnMessage); 
  2. function receiverOnMessage(data) { 
  3.   this[kWebSocket].emit('message', data); 

然后ws的socket對(duì)象繼續(xù)往上層觸發(fā)message事件。this[kWebSocket]的值是ws提供的socket對(duì)象本身。架構(gòu)圖如下。

這就是ws實(shí)現(xiàn)websocket協(xié)議的基本原理,具體細(xì)節(jié)可以參考源碼。

責(zé)任編輯:武曉燕 來(lái)源: 編程雜技
相關(guān)推薦

2017-07-11 13:58:10

WebSocket

2021-04-21 07:52:39

核心SignalR應(yīng)用

2010-04-14 14:23:26

2024-01-11 08:53:58

2023-01-26 01:41:27

核心全局過(guò)濾器

2023-06-27 07:09:39

2017-08-17 17:48:06

2009-06-14 17:19:09

ibmdwWebSphere

2012-09-18 14:23:54

2021-04-27 18:12:22

WebSocket持久化連接HTTP

2022-04-13 08:23:31

Golang并發(fā)

2021-10-12 17:19:17

Random局限性變量

2012-09-29 13:18:23

分布式數(shù)據(jù)庫(kù)Google Span

2020-10-13 07:35:22

JUC - Count

2023-04-26 08:39:41

Bitmap元素存儲(chǔ)

2023-11-28 08:49:01

短輪詢WebSocket長(zhǎng)輪詢

2010-04-19 15:29:31

2012-12-03 16:57:37

HDFS

2013-01-11 11:20:47

Windows SerVMware vSph

2015-09-23 16:14:03

Ryu拓?fù)浣Y(jié)構(gòu)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)