慢聊Golang的Websocket使用和實(shí)現(xiàn)代碼分析
本期將會繼續(xù)上次話題,上篇主要是理論還是停留在文字層面,今天帶來的是websocket實(shí)操,分享它使用和底層實(shí)現(xiàn)!
相信很多使用Golang的小伙伴都知道Gorilla這個工具包,長久以來gorilla/websocket 都是比官方包更好的websocket包。
題外話 gorilla:大猩猩(不過這個猩猩還挺可愛的)
圖片
gorilla/websocket 框架開源地址為: https://github.com/gorilla/websocket
今天小許就用【gorilla/websocket】框架來展開本期文章內(nèi)容,文章會涉及到核心代碼的走讀,會涉及到不少代碼,需要小伙伴們保持耐心往下看,然后結(jié)合之前分享的websocket基礎(chǔ),徹底學(xué)個明白!
圖片
簡單使用
安裝Gorilla Websocket Go軟件包,您只需要使用即可go get
go get github.com/gorilla/websocket
在正式使用之前我們先簡單了解下兩個數(shù)據(jù)結(jié)構(gòu) Upgrader 和 Conn
Upgrader
Upgrader指定用于將 HTTP 連接升級到 WebSocket 連接
type Upgrader struct {
HandshakeTimeout time.Duration
ReadBufferSize, WriteBufferSize int
WriteBufferPool BufferPool
Subprotocols []string
Error func(w http.ResponseWriter, r *http.Request, status int, reason error)
CheckOrigin func(r *http.Request) bool
EnableCompression bool
}
- ? HandshakeTimeout:握手完成的持續(xù)時(shí)間
- ? ReadBufferSize和WriteBufferSize:以字節(jié)為單位指定I/O緩沖區(qū)大小。如果緩沖區(qū)大小為零,則使用HTTP服務(wù)器分配的緩沖區(qū)
- ? CheckOrigin :函數(shù)應(yīng)仔細(xì)驗(yàn)證請求來源 防止跨站點(diǎn)請求偽造
這里一般會設(shè)置下CheckOrigin來解決跨域問題
Conn
Conn類型表示W(wǎng)ebSocket連接,這個結(jié)構(gòu)體的組成包括兩部分,寫入字段(Write fields)和 讀取字段(Read fields)
type Conn struct {
conn net.Conn
isServer bool
...
// Write fields
writeBuf []byte
writePool BufferPool
writeBufSize int
writer io.WriteCloser
isWriting bool
...
// Read fields
readRemaining int64
readFinal bool
readLength int64
messageReader *messageReader
...
}
isServer :字段來區(qū)分我們是否用Conn作為客戶端還是服務(wù)端,也就是說說gorilla/websocket中同時(shí)編寫客戶端程序和服務(wù)器程序,但是一般是Web應(yīng)用程序使用單獨(dú)的前端作為客戶端程序。
部分字段說明如下圖:
圖片
服務(wù)端示例
出于說明的目的,我們將在Go中同時(shí)編寫客戶端程序和服務(wù)端程序(其實(shí)小許是前端小趴菜?? ??)。
當(dāng)然我們在開發(fā)程序的時(shí)候基本都是單獨(dú)的前端,通常使用(Javascript,vue等)實(shí)現(xiàn)websocket客戶端,這里為了讓大家有比較直觀的感受,用【gorilla/websocket】分別寫了服務(wù)端和客戶端示例。
var upGrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
return true
},
}
func main() {
http.HandleFunc("/ws", wsUpGrader)
err := http.ListenAndServe("localhost:8080", nil)
if err != nil {
log.Println("server start err", err)
}
}
func wsUpGrader(w http.ResponseWriter, r *http.Request) {
//轉(zhuǎn)換為升級為websocket
conn, err := upGrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
//釋放連接
defer conn.Close()
for {
//接收消息
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Println(err)
return
}
log.Println("server receive messageType", messageType, "message", string(message))
//發(fā)送消息
err = conn.WriteMessage(messageType, []byte("pong"))
if err != nil {
log.Println(err)
return
}
}
}
我們知道websocket協(xié)議是基于http協(xié)議進(jìn)行upgrade升級的, 這里使用 net/http提供原始的http連接。
http.HandleFunc接受兩個參數(shù):第一個參數(shù)是字符串表示的 url 路徑,第二個參數(shù)是該 url 實(shí)際的處理對象
http.ListenAndServe 監(jiān)聽在某個端口,啟動服務(wù),準(zhǔn)備接受客戶端的請求
HandleFunc的作用:通過類型轉(zhuǎn)換讓我們可以將普通的函數(shù)作為HTTP處理器使用
圖片
服務(wù)端代碼流程:
? Gorilla在使用websocket之前是先將初始化的upGrader結(jié)構(gòu)體變量調(diào)用Upgrade方法進(jìn)行請求協(xié)議升級
? 升級后返回 *Conn(此時(shí)isServer = true),后續(xù)使用它來處理websocket連接
? 服務(wù)端消息讀寫分別用 ReadMessage()、WriteMessage()
客戶端示例
import (
"fmt"
"github.com/gorilla/websocket"
"log"
"time"
)
func main() {
//服務(wù)器地址 websocket 統(tǒng)一使用 ws://
url := "ws://localhost:8080/ws"
//使用默認(rèn)撥號器,向服務(wù)器發(fā)送連接請求
ws, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
log.Fatal(err)
}
//關(guān)閉連接
defer conn.Close()
//發(fā)送消息
go func() {
for {
err := ws.WriteMessage(websocket.BinaryMessage, []byte("ping"))
if err != nil {
log.Fatal(err)
}
//休眠兩秒
time.Sleep(time.Second * 2)
}
}()
//接收消息
for {
_, data, err := ws.ReadMessage()
if err != nil {
log.Fatal(err)
}
fmt.Println("client receive message: ", string(data))
}
}
客戶端的實(shí)現(xiàn)看起來也是簡單,先使用默認(rèn)撥號器,向服務(wù)器地址發(fā)送連接請求,撥號成功時(shí)也返回一個*Conn,開啟一個協(xié)程每隔兩秒向服務(wù)端發(fā)送消息,同樣都是使用ReadMessage和W riteMessage讀寫消息。
示例代碼運(yùn)行結(jié)果如下:
圖片
源碼走讀
看完上面基本的客戶端和服務(wù)端案例之后,我們對整個消息發(fā)送和接收的使用已經(jīng)熟悉了,實(shí)際開發(fā)中要做的就是如何結(jié)合業(yè)務(wù)去定義消息類型和發(fā)送場景了,我們接著走讀下底層的實(shí)現(xiàn)邏輯!
代碼走讀我們分了四部分,主要了解協(xié)議是如何升級、已經(jīng)消息如何讀寫、解析數(shù)據(jù)幀【 ?? ??核心】!
Upgrade 協(xié)議升級
Upgrade顧名思義【升級】,在進(jìn)行協(xié)議升級之前是需要對協(xié)議進(jìn)行校驗(yàn)的,之前我們知道待升級的http請求是有固定請求頭的,這里列舉幾個:
圖片
?? Upgrade進(jìn)行校驗(yàn)的目的是看該請求是否符合協(xié)議升級的規(guī)定
Upgrade的部分校驗(yàn)代碼如下,return處進(jìn)行了省略
func (u *Upgrader) Upgrade(w http.ResponseWriter, r *http.Request, responseHeader http.Header) (*Conn, error) {
if !tokenListContainsValue(r.Header, "Connection", "upgrade") {
return ...
}
if !tokenListContainsValue(r.Header, "Upgrade", "websocket") {
return ...
}
//必須是get請求方法
if r.Method != http.MethodGet {
return ...
}
if !tokenListContainsValue(r.Header, "Sec-Websocket-Version", "13") {
return ...
}
if _, ok := responseHeader["Sec-Websocket-Extensions"]; ok {
return ...
}
...
c := newConn(netConn, true, u.ReadBufferSize, u.WriteBufferSize, u.WriteBufferPool, br, writeBuf)
...
}
tokenListContainsValue的目的是校驗(yàn)請求的Header中是否有upgrade需要的特定參數(shù),比如我們上圖列舉的一些。
newConn就是初始化部分Conn結(jié)構(gòu)體的,方法中的第二個參數(shù)為true代表這是服務(wù)端
圖片
computeAcceptKey 計(jì)算接受密鑰:
這個函數(shù)重點(diǎn)說下,在上一期中在websocket【連接確認(rèn)】這一章節(jié)中知道,websocket協(xié)議升級時(shí),需要滿足如下條件:
??只有當(dāng)請求頭參數(shù)Sec-WebSocket-Key字段的值經(jīng)過固定算法加密后的數(shù)據(jù)和響應(yīng)頭里的Sec-WebSocket-Accept的值保持一致,該連接才會被認(rèn)可建立。
圖片
var keyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
func computeAcceptKey(challengeKey string) string {
h := sha1.New()
h.Write([]byte(challengeKey))
h.Write(keyGUID)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
上面 computeAcceptKey 函數(shù)的實(shí)現(xiàn),驗(yàn)證了之前說的關(guān)于 Sec-WebSocket-Accept的生成
服務(wù)端需將Sec-WebSocket-Key和固定的 GUID 字符串( 258EAFA5-E914-47DA-95CA-C5AB0DC85B11) 拼接后使用 SHA-1 進(jìn)行哈希,并采用 base64 編碼后返回
ReadMessage 讀消息
ReadMessage方法內(nèi)部使用NextReader獲取讀取器并從該讀取器讀取到緩沖區(qū),如果是一條消息由多個數(shù)據(jù)幀,則會拼接成完整的消息,返回給業(yè)務(wù)層。
func (c *Conn) ReadMessage() (messageType int, p []byte, err error) {
var r io.Reader
messageType, r, err = c.NextReader()
if err != nil {
return messageType, nil, err
}
//ReadAll從r讀取,直到出現(xiàn)錯誤或EOF,并返回讀取的數(shù)據(jù)
p, err = io.ReadAll(r)
return messageType, p, err
}
該方法,返回三個參數(shù),分別是消息類型、內(nèi)容、error
messageType是int型,值可能是 BinaryMessage(二進(jìn)制消息) 或 TextMessage(文本消息)
NextReader: 該方法得到一個消息類型 messageType,io.Reader,err
func (c *Conn) NextReader() (messageType int, r io.Reader, err error) {
...
for c.readErr == nil {
//解析數(shù)據(jù)幀方法advanceFrame
// frameType : 幀類型
frameType, err := c.advanceFrame()
if err != nil {
c.readErr = hideTempErr(err)
break
}
//數(shù)據(jù)類型是 文本或二進(jìn)制類型
if frameType == TextMessage || frameType == BinaryMessage {
c.messageReader = &messageReader{c}
c.reader = c.messageReader
if c.readDecompress {
c.reader = c.newDecompressionReader(c.reader)
}
return frameType, c.reader, nil
}
}
...
}
c.advanceFrame() 是核心代碼,主要是實(shí)現(xiàn)解析這條消息,這里在最后章節(jié)會講。
這里有個 c.messageReader (當(dāng)前的低級讀取器),賦值給c.reader,為什么要這樣呢?
c.messageReader 是更低級讀取器,而 c.reader 的作用是當(dāng)前讀取器返回到應(yīng)用程序。簡單就是messageReader 是實(shí)現(xiàn)了 c.reader 接口的結(jié)構(gòu)體, 從而也實(shí)現(xiàn)了 io.Reader接口
圖片
圖上加一個 bufio.Read方法:Read讀取數(shù)據(jù)寫入p。本方法返回寫入p的字節(jié)數(shù)。本方法一次調(diào)用最多會調(diào)用下層Reader接口一次Read方法,因此返回值n可能小于len(p)。讀取到達(dá)結(jié)尾時(shí),返回值n將為0而err將為io.EOF
messageReader的 Read方法: 我們看下Read的具體實(shí)現(xiàn),Read方法主要是讀取數(shù)據(jù)幀內(nèi)容,直到出現(xiàn)并返回io.EOF或者其他錯誤為止,而實(shí)際調(diào)用它的正是 io.ReadAll。
func (r *messageReader) Read(b []byte) (int, error) {
...
for c.readErr == nil {
//當(dāng)前幀中剩余的字節(jié)
if c.readRemaining > 0 {
if int64(len(b)) > c.readRemaining {
b = b[:c.readRemaining]
}
//讀取到切片b中
n, err := c.br.Read(b)
c.readErr = hideTempErr(err)
//當(dāng)Conn是服務(wù)端
if c.isServer {
c.readMaskPos = maskBytes(c.readMaskKey, c.readMaskPos, b[:n])
}
//readRemaining字節(jié)數(shù)轉(zhuǎn)int64
rem := c.readRemaining
rem -= int64(n)
//跟蹤連接上剩余的字節(jié)數(shù)
if err := c.setReadRemaining(rem); err != nil {
return 0, err
}
if c.readRemaining > 0 && c.readErr == io.EOF {
c.readErr = errUnexpectedEOF
}
//返回讀后字節(jié)數(shù)
return n, c.readErr
}
//標(biāo)記是否最后一個數(shù)據(jù)幀
if c.readFinal {
// messageRader 置為nil
c.messageReader = nil
return 0, io.EOF
}
//獲取數(shù)據(jù)幀類型
frameType, err := c.advanceFrame()
switch {
case err != nil:
c.readErr = hideTempErr(err)
case frameType == TextMessage || frameType == BinaryMessage:
c.readErr = errors.New("websocket: internal error, unexpected text or binary in Reader")
}
}
err := c.readErr
if err == io.EOF && c.messageReader == r {
err = errUnexpectedEOF
}
return 0, err
}
io.ReadAll : ReadAll從r讀取,這里是實(shí)現(xiàn)如果一條消息由多個數(shù)據(jù)幀,會一直讀直到最后一幀的關(guān)鍵。
func ReadAll(r Reader) ([]byte, error) {
b := make([]byte, 0, 512)
for {
if len(b) == cap(b) {
// 給[]byte添加更多容量
b = append(b, 0)[:len(b)]
}
n, err := r.Read(b[len(b):cap(b)])
b = b[:len(b)+n]
if err != nil {
if err == EOF {
err = nil
}
return b, err
}
}
}
可以看出在for 循環(huán)中一直讀取,直至讀取到最后一幀,直到返回io.EOF或網(wǎng)絡(luò)原因錯誤為止,否則一直進(jìn)行阻塞讀,這些 error 可以從上面講到的messageReader的 Read方法可以看出來。
總結(jié)下,整個流程如下:
圖片
整個讀消息的流程就結(jié)束了,我們繼續(xù)看如何寫消息!
WriteMessage 寫消息
既然讀消息是對數(shù)據(jù)幀進(jìn)行解析,那么寫消息就自然會聯(lián)想到將數(shù)據(jù)按照數(shù)據(jù)幀的規(guī)范組裝寫入到一個writebuf中,然后寫入到網(wǎng)絡(luò)中。
圖片
我們繼續(xù)看WriteMessage是如何實(shí)現(xiàn)的
func (c *Conn) WriteMessage(messageType int, data []byte) error {
...
//w 是一個io.WriteCloser
w, err := c.NextWriter(messageType)
if err != nil {
return err
}
//將data寫入writeBuf中
if _, err = w.Write(data); err != nil {
return err
}
return w.Close()
}
WriteMessage方法接收一個消息類型和數(shù)據(jù),主要邏輯是先調(diào)用Conn的NextWriter方法得到一個io.WriteCloser,然后寫消息到這個Conn的writeBuf,寫完消息后close它。
NextWriter實(shí)現(xiàn)如下:
func (c *Conn) NextWriter(messageType int) (io.WriteCloser, error) {
var mw messageWriter
if err := c.beginMessage(&mw, messageType); err != nil {
return nil, err
}
c.writer = &mw
...
return c.writer, nil
}
注意看這里有個messageWriter賦值給了Conn的writer,也就是說messageWriter實(shí)現(xiàn)了io.WriterCloser接口。
這里的實(shí)現(xiàn)跟讀消息中的NextReader方法中的messageReader很像,也是通過實(shí)現(xiàn)io.Reader接口,然后賦值給了Conn的Reader,這里可以做個小聯(lián)動,找到讀寫消息實(shí)際的實(shí)現(xiàn)者 messageReader、messageWriter。
messageWriter的Write實(shí)現(xiàn):
前置知識:如果沒有設(shè)置Conn中writeBufferSize, 默認(rèn)情況下會設(shè)置為 4096個字節(jié),另外加上14字節(jié)的數(shù)據(jù)幀頭部大小【這些在newConn中初始化的時(shí)候有代碼說明】
func (w *messageWriter) Write(p []byte) (int, error) {
...
//如果字節(jié)長度大于初始化的writeBuf空間大小
if len(p) > 2*len(w.c.writeBuf) && w.c.isServer {
//寫入方法
err := w.flushFrame(false, p)
...
}
//字節(jié)長度不大于初始化的writeBuf空間大小
nn := len(p)
for len(p) > 0 {
//內(nèi)部也是調(diào)用的flushFrame
n, err := w.ncopy(len(p))
...
}
return nn, nil
}
messageWriter中的Write方法主要的目的是將數(shù)據(jù)寫入到writeBuf中,它主要存儲結(jié)構(gòu)化的數(shù)據(jù)幀內(nèi)容,所謂結(jié)構(gòu)化就是按照數(shù)據(jù)幀的格式,用Go實(shí)現(xiàn)寫入的。
總結(jié)下,整個流程如下:
圖片
而flushFrame方法將緩沖數(shù)據(jù)和額外數(shù)據(jù)作為幀寫入網(wǎng)絡(luò),這個final參數(shù)表示這是消息中的最后一幀。
至于flushFrame內(nèi)部是如何實(shí)現(xiàn)寫入網(wǎng)絡(luò)中的,你可以看看 net.Conn 是怎么Write的,因?yàn)樽罱K就是調(diào)這個寫入網(wǎng)絡(luò)的,這里就不再深究了,有興趣的同學(xué)可以自己挖一挖!
advanceFrame 解析數(shù)據(jù)幀
解析數(shù)據(jù)幀放在最后,前面的代碼走讀主要是為了方便大家能把整體流程搞清楚,而數(shù)據(jù)幀的解析,是更加需要對websocket基礎(chǔ)有了解,特別是數(shù)據(jù)幀的組成,因?yàn)榻馕鼍褪前凑諈f(xié)定用Go代碼實(shí)現(xiàn)的一種方式而已!
強(qiáng)烈推薦大家看完# 為什么有了http,還需要websocket,懂了!]
圖片
根據(jù)上圖【來自網(wǎng)絡(luò)】回顧下數(shù)據(jù)幀各部分代表的意思:
FIN : 1個bit位,用來標(biāo)記當(dāng)前數(shù)據(jù)幀是不是最后一個數(shù)據(jù)幀
RSV1, RSV2, RSV3 :這三個各占用一個bit位用做擴(kuò)展用途,沒有這個需求的話設(shè)置為0
Opcode : 該值定義的是數(shù)據(jù)幀的數(shù)據(jù)類型 1 表示文本 2 表示二進(jìn)制
MASK:表示數(shù)據(jù)有沒有使用掩碼
Payload length :數(shù)據(jù)的長度,Payload data的長度,占7bits,7+16bits,7+64bits
Masking-key :數(shù)據(jù)掩碼 (設(shè)置為0,則該部分可以省略,如果設(shè)置為1,則用來解碼客戶端發(fā)送給服務(wù)端的數(shù)據(jù)幀)
Payload data : 幀真正要發(fā)送的數(shù)據(jù),可以是任意長度
advanceFrame 解析方法
實(shí)現(xiàn)代碼會比較長,如果直接貼代碼,會看不下去,該方法返回?cái)?shù)據(jù)類型和error, 這里我們只會截取其中一部分
func (c *Conn) advanceFrame() (int, error) {
...
//讀取前兩個字節(jié)
p, err := c.read(2)
if err != nil {
return noFrame, err
}
//數(shù)據(jù)幀類型
frameType := int(p[0] & 0xf)
// FIN 標(biāo)記位
final := p[0]&finalBit != 0
//三個擴(kuò)展用
rsv1 := p[0]&rsv1Bit != 0
rsv2 := p[0]&rsv2Bit != 0
rsv3 := p[0]&rsv3Bit != 0
//mask :是否使用掩碼
mask := p[1]&maskBit != 0
...
switch c.readRemaining {
case 126:
p, err := c.read(2)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint16(p))); err != nil {
return noFrame, err
}
case 127:
p, err := c.read(8)
if err != nil {
return noFrame, err
}
if err := c.setReadRemaining(int64(binary.BigEndian.Uint64(p))); err != nil {
return noFrame, err
}
}
..
}
整個流程分為了 7 個部分:
1. 跳過前一幀的剩余部分,畢竟這是之前幀的數(shù)據(jù)
2. 讀取并解析幀頭的前兩個字節(jié)(從上面圖中可以看出只讀取到 Payload len)
3. 根據(jù)讀取和解析幀長度(根據(jù) Payload length的值來獲取Payload data的長度)
4. 處理數(shù)據(jù)幀的mask掩碼
5. 如果是文本和二進(jìn)制消息,強(qiáng)制執(zhí)行讀取限制并返回 (結(jié)束)
6. 讀取控制幀有效載荷 即 play data,設(shè)置setReadRemaining以安全地更新此值并防止溢出
7. 過程控制幀有效載荷,如果是ping/pong/close消息類型,返回 -1 (noFrame) (結(jié)束)
advanceFrame方法的主要目的就是解析數(shù)據(jù)幀,獲取數(shù)據(jù)幀的消息類型,而對于數(shù)據(jù)幀的解析都是按照上圖幀格式來的!
heartbeat 心跳
WebSocket 為了確??蛻舳?、服務(wù)端之間的 TCP 通道連接沒有斷開,使用心跳機(jī)制來判斷連接狀態(tài)。如果超時(shí)時(shí)間內(nèi)沒有收到應(yīng)答則認(rèn)為連接斷開,關(guān)閉連接,釋放資源。流程如下
? 發(fā)送方 -> 接收方:ping
? 接收方 -> 發(fā)送方:pong
ping、pong 消息:它們對應(yīng)的是 WebSocket 的兩個控制幀,opcode分別是0x9、0xA,對應(yīng)的消息類型分別是PingMessage, PongMessage,前提是應(yīng)用程序需要先讀取連接中的消息才能處理從對等方發(fā)送的 close、ping 和 pong 消息。
?? 當(dāng)然關(guān)于源碼的部分我只是拿了其中一部分比如:控制類消息、并發(fā)、緩沖等,大家要知道有這些功能,有興趣的可以去看看
總結(jié)
本期主要和大家一起了解 gorilla/websocket 框架的使用和部分底層實(shí)現(xiàn)原理代碼走讀,通篇讀下來想必大家對websocket用程序語言實(shí)現(xiàn)有了更深刻的認(rèn)識吧!
不過流行的開源 Go 語言 Web 工具包 Gorilla 宣布已正式歸檔,目前已進(jìn)入只讀模式?!八l(fā)出的信號是,這些庫在未來將不會有任何發(fā)展。
也就是說 gorilla/websocket 這個被廣泛使用的 websocket 庫也會停止更新了,真是個令人悲傷的消息!
正如作者所說的那樣:“沒有一個項(xiàng)目需要永遠(yuǎn)存在。這可能不會讓每個人都開心,但生活就是這樣?!?/p>
好了,通過兩期對websocket的講解,相信大家心里已經(jīng)對它有了比較深刻的印象,還是那句話知道的越多,不知道的也越多,一起前行讓自己知道的更多一點(diǎn)!