自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

fasthttp是如何做到比net/http快十倍的

開發(fā) 前端
fasthttp和net/http在實現(xiàn)上還是有較大區(qū)別,通過對實現(xiàn)原理的分析,知道了fasthttp速度快是利用了大量sync.Pool對象復(fù)用 、[]byte 和 string利用萬能指針unsafe.Pointer進行轉(zhuǎn)換等優(yōu)化技巧。

哇,性能太強了吧,話不多說,本期小許和大家一起看看fasthttp Server端的底層實現(xiàn),來看看到底是如何做到性能如此之快的,有哪些優(yōu)秀的特性值得我們學習和借鑒的!

Server端處理流程對比

在進行了解fasthttp底層代碼實現(xiàn)之前,我們先對兩者處理請求的方式進行一個回顧和對比,了解完兩者的基本的情況之后,再對fasthttp的實現(xiàn)最進一步分析。

net/http處理流程

在小許文章《圖文講透Golang標準庫 net/http實現(xiàn)原理 -- 服務(wù)端》中講的比較詳細了,這里再把大致流程整理以下,整體流程如下:

圖片圖片

  1. 1. 將路由和對應(yīng)的handler注冊到一個 map 中,用做后續(xù)鍵值路由匹配
  2. 2. 注冊完之后就是開啟循環(huán)監(jiān)聽連接,每獲取到一個連接就會創(chuàng)建一個 Goroutine進行處理
  3. 3. 在創(chuàng)建好的 Goroutine 里面會循環(huán)的等待接收請求數(shù)據(jù),然后根據(jù)請求的地址去鍵值路由map中匹配對應(yīng)的handler
  4. 4. 執(zhí)行匹配到的處理器handler

net/http 的實現(xiàn)是一個連接新建一個 goroutine,如果在連接數(shù)非常多的時候,,每個連接都會創(chuàng)建一個 Goroutine 就會給系統(tǒng)帶來一定的壓力。這也就造成了 net/http在處理高并發(fā)時的瓶頸。

每次來了一個連接,都要實例化一個連接對象,這誰受得了,哈哈

fasthttp處理流程

再看看fasthttp處理請求的流程:

圖片圖片

  1. 1. 啟動監(jiān)聽
  2. 2. 循環(huán)監(jiān)聽端口獲取連接,建立workerPool
  3. 3. 循環(huán)嘗試獲取連接 net.Conn,先會去 ready 隊列里獲取 workerChan,獲取不到就會去對象池獲取
  4. 4. 將獲取到的的連接net.Conn 發(fā)送到 workerChan 的 channel 中
  5. 5. 開啟一個 Goroutine 一直循環(huán)獲取 workerChan 這個 channel 中的數(shù)據(jù)
  6. 6. 獲取到channel中的net.Conn之后就會對請求進行處理

workerChan 其實就是一個連接處理對象,這個對象里面有一個 channel 用來傳遞連接;每個 workerChan 在后臺都會有一個 Goroutine 循環(huán)獲取 channel 中的連接,然后進行處理。

fasthttp為什么快

fasthttp的優(yōu)化主要有以下幾個點:

  • ? 連接復(fù)用,如slice中有可復(fù)用的workerChan就從ready這個slice中獲取,沒有可復(fù)用的就在workerChanPool創(chuàng)建一個,萬一池子滿了(默認是 256 * 1024個)就報錯。
  • ? 對于內(nèi)存復(fù)用,就是大量使用了sync.Pool(你知道的,sync.Pool復(fù)用對象有啥好處),有人統(tǒng)計過,用了整整30個sync.Pool,context、request對象、header、response對象都用了sync.Pool ....
  • ? 利用unsafe.Pointer指針進行[]byte 和 string 轉(zhuǎn)換,避免[]byte到string轉(zhuǎn)換時帶來的內(nèi)存分配和拷貝帶來的消耗 。

知道了fasthttp為什么快,接下來我們看下它是如何處理監(jiān)聽處理請求的,在哪些地方用到了這些特性。

底層實現(xiàn)

簡單案例

import (
    "github.com/buaazp/fasthttprouter"
    "github.com/valyala/fasthttp"
    "log"
)

func main() {
    //創(chuàng)建路由
    r := fasthttprouter.New()
    r.GET("/", Index)
    if err := fasthttp.ListenAndServe(":8083", r.Handler); err != nil {
        log.Fatalf("ListenAndServe fatal: %s", err)
    }

}
func Index(ctx *fasthttp.RequestCtx) {
    ctx.WriteString("hello xiaou code!")
}

這個案例同樣是幾樣代碼就啟動了一個服務(wù)。

創(chuàng)建路由、為不同的路由執(zhí)行關(guān)聯(lián)不同的處理函數(shù)handler,接著跟net/http一樣調(diào)用 ListenAndServe 函數(shù)進行啟動服務(wù)監(jiān)聽,等待請求進行處理。

workerPool結(jié)構(gòu)

workerpool 對象表示 連接處理 工作池,這樣可以控制連接建立后的處理方式,而不是像標準庫 net/http 一樣,對每個請求連接都啟動一個 goroutine 處理, 內(nèi)部的 ready 字段存儲空閑的 workerChan 對象,workerChanPool 字段表示管理 workerChan 的對象池。

workerPool結(jié)構(gòu)體如下:

type workerPool struct {
    //匹配請求對應(yīng)的handler
    WorkerFunc ServeHandler
    //最大同時處理的請求數(shù)
    MaxWorkersCount int
    
    LogAllErrors bool
    //最大空閑工作時間
    MaxIdleWorkerDuration time.Duration

    Logger Logger
    //互斥鎖
    lock         sync.Mutex
    //work數(shù)量
    workersCount int
    mustStop     bool
    // 空閑的 workerChan
    ready []*workerChan
    //是否關(guān)閉workerPool
    stopCh chan struct{}
    //sync.Pool  workerChan 的對象池
    workerChanPool sync.Pool

    connState func(net.Conn, ConnState)
}

WorkerFunc :這個屬性挺重要的,因為給它賦值的是Server.serveConn

ready:存儲了空閑的workerChan

workerChanPool:是workerChan 的對象池,在sync.Pool中存取臨時對象,可減少內(nèi)存分配

啟動服務(wù)

ListenAndServe是啟動服務(wù)監(jiān)聽的入口,內(nèi)部的調(diào)用過程如下:

圖片圖片

Server.Serve

Serve方法為來自給監(jiān)聽到的連接提供處理服務(wù),直到超過了最大限制(256 * 1024)才會報錯。

func (s *Server) Serve(ln net.Listener) error {
    //最大連接處理數(shù)
    maxWorkersCount := s.getConcurrency()

    s.mu.Lock()
    s.ln = append(s.ln, ln)
    if s.done == nil {
        s.done = make(chan struct{})
    }
    if s.concurrencyCh == nil {
        s.concurrencyCh = make(chan struct{}, maxWorkersCount)
    }
    s.mu.Unlock()
    //workerPool進行初始化
    wp := &workerPool{
        WorkerFunc:            s.serveConn,
        MaxWorkersCount:       maxWorkersCount,
        LogAllErrors:          s.LogAllErrors,
        MaxIdleWorkerDuration: s.MaxIdleWorkerDuration,
        Logger:                s.logger(),
        connState:             s.setState,
    }
    //開啟協(xié)程,處理協(xié)程池的清理工作
    wp.Start()
    atomic.AddInt32(&s.open, 1)
    defer atomic.AddInt32(&s.open, -1)

    for {
        // 阻塞等待,獲取連接net.Conn
        if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {
            ...
            return err
        }
        s.setState(c, StateNew)
        atomic.AddInt32(&s.open, 1)
        //處理獲取到的連接net.Conn
        if !wp.Serve(c) {
            //未能處理,說明已達到最大worker限制
            ...
        }
        c = nil
    }
}

從上面的注釋中我們可以看出 Server 方法主要做了以下幾件事:

  1. 1. 初始化 worker Pool,并啟動
  2. 2. net.Listener循環(huán)接收請求
  3. 3. 將接收到的請求交給workerChan 處理

注意:這里如果超過了設(shè)定的最大連接數(shù)(默認是 256 * 1024個)就直接報錯了

Start開啟協(xié)程池

workerPool進行初始化之后接著就調(diào)用Start開啟,這里主要是指定sync.Pool變量workerChanPool的創(chuàng)建函數(shù)。

接著開啟一個協(xié)程,該Goroutine的目的是進行定時清理 workerPool 中的 ready 中保存的空閑 workerChan,清理頻率為每 10s 啟動一次。

??清理規(guī)則是使用二進制搜索算法找出最近可以清理的工作者的索引

func (wp *workerPool) Start() {
    //wp的關(guān)閉channel是否為空
    if wp.stopCh != nil {
        return
    }
    wp.stopCh = make(chan struct{})
    stopCh := wp.stopCh
    //指定workerChanPool的創(chuàng)建函數(shù)
    wp.workerChanPool.New = func() interface{} {
        return &workerChan{
            ch: make(chan net.Conn, workerChanCap),
        }
    }
    //開啟協(xié)程
    go func() {
        var scratch []*workerChan
        for {
            //清理空閑超時的 workerChan
            wp.clean(&scratch)
            select {
            case <-stopCh:
                return
            default:
                // 間隔10 s
                time.Sleep(wp.getMaxIdleWorkerDuration())
            }
        }
    }()
}

開啟一個清理Goroutine的目的是為了避免在流量高峰創(chuàng)建了大量協(xié)程,之后不再使用,造成協(xié)程浪費。

清理流程是在wp.clean()方法中實現(xiàn)的。

接收連接

acceptConn函數(shù)通過調(diào)用net.Listener的accept方法去接受連接,這里獲取連接的方式跟net/http調(diào)用的其實都是一樣的。

func acceptConn(s *Server, ln net.Listener, lastPerIPErrorTime *time.Time) (net.Conn, error) {
    for {
        c, err := ln.Accept()
        if err != nil {
            //err判斷
            ...
        }
        //校驗是否net.TCPConn連接
       // 校驗每個ip對應(yīng)的連接數(shù)
        if s.MaxConnsPerIP > 0 {
            pic := wrapPerIPConn(s, c)
            if pic == nil {
                ...
                continue
            }
            c = pic
        }
        return c, nil
    }
}

獲取 workerChan

func (wp *workerPool) Serve(c net.Conn) bool {
    //獲取 workerChan 
    ch := wp.getCh()
    if ch == nil {
        return false
    }
    //將連接放到channel中
    ch.ch <- c
    //返回true
    return true
}

這里調(diào)用的getCh()函數(shù)實現(xiàn)了獲取workerChan,獲取到之后將之前接受的連接net.Conn放到workerChan結(jié)構(gòu)體的channel通道中。

我們看下workerChan這個結(jié)構(gòu)體

type workerChan struct {
    lastUseTime time.Time
    ch          chan net.Conn
}

lastUseTime:最后一次被使用的時間,這個值在進行清理workerChan的時候是會用到的

ch:用來傳遞獲取到的連接net.Conn,獲取到連接時接收,處理請求時獲取

getCh方法:

func (wp *workerPool) getCh() *workerChan {
    var ch *workerChan
    createWorker := false

    wp.lock.Lock()
    //從ready隊列中拿workerChan
    ready := wp.ready
    n := len(ready) - 1
    if n < 0 {
        if wp.workersCount < wp.MaxWorkersCount {
            createWorker = true
            wp.workersCount++
        }
    } else {
        //ready隊列不為空,從隊尾拿workerChan
        ch = ready[n]
        //隊尾置為nil
        ready[n] = nil
        //重新將ready賦值給wp.ready
        wp.ready = ready[:n]
    }
    wp.lock.Unlock()
    //ready中獲取不到workerChan,則從對象池中新建一個
    if ch == nil {
        if !createWorker {
            return nil
        }
        vch := wp.workerChanPool.Get()
        ch = vch.(*workerChan)
        //開啟一個goroutine執(zhí)行
        go func() {
            //處理ch中channel中的數(shù)據(jù)
            wp.workerFunc(ch)
            //處理完后將workerChan放回對象池
            wp.workerChanPool.Put(vch)
        }()
    }
    return ch
}

圖片圖片

getCh()方法的目的就是獲取workerChan,流程如下:

? 先會去 ready 空閑隊列中獲取 workerChan

? ready 獲取不到則從對象池中創(chuàng)建一個新的 workerChan

? 并啟動 Goroutine 用來處理 channel 中的數(shù)據(jù)

workPool中的ready是一個FILO的棧,每次從隊尾取出workChan

處理連接

func (wp *workerPool) workerFunc(ch *workerChan) {
    var c net.Conn

    var err error
    for c = range ch.ch {
        //channel的值是nil,退出
        if c == nil {
            break
        }
        //執(zhí)行請求,并處理
        if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
            ...
        }
        ...
        //將當前workerChan放入ready隊列
        if !wp.release(ch) {
            break
        }
    }

    wp.lock.Lock()
    wp.workersCount--
    wp.lock.Unlock()
}

執(zhí)行流程

? 先遍歷workerChan的channel,看是否有連接net.Conn

? 獲取到連接之后就執(zhí)行WorkerFunc 函數(shù)處理請求

? 請求處理完之后將當前workerChan放入ready隊列

?? WorkerFunc 函數(shù)實際上是 Server 的 serveConn 方法

一開始開代碼的時候我還沒發(fā)現(xiàn)呢,細看了之后在Server.Serve()啟動服務(wù)時將Server.serveConn()方法賦值給了workerPool的WorkerFunc()。

要想了解實現(xiàn)的朋友可以搜下這方面的代碼

func (s *Server) ServeConn(c net.Conn) error {
    ...
    err := s.serveConn(c)
    ...
}

里面的代碼會比較多,不過里面的流程就是是獲取到請求的參數(shù),找到對應(yīng)的 handler 進行請求處理,然后返回 響應(yīng)給客戶端。

這里的實現(xiàn)代碼可以看到context、request對象的sync.Pool實現(xiàn),這里就不一一貼出來了。

總結(jié)

fasthttp和net/http在實現(xiàn)上還是有較大區(qū)別,通過對實現(xiàn)原理的分析,知道了fasthttp速度快是利用了大量sync.Pool對象復(fù)用 、[]byte 和 string利用萬能指針unsafe.Pointer進行轉(zhuǎn)換等優(yōu)化技巧。

如果你的業(yè)務(wù)需要支撐較高的 QPS 并且保持一致的低延遲時間,那么采用 fasthttp 是一個較好的選擇。不過net/http兼容性更高,在多數(shù)情況下反而是更好的選擇!

責任編輯:武曉燕 來源: 小許code
相關(guān)推薦

2023-04-07 08:17:39

fasthttp場景設(shè)計HTTP

2022-10-27 07:09:34

DjangoAPIRedis

2023-03-07 08:34:01

2025-04-24 09:31:09

Visio畫圖工具

2018-01-29 05:38:20

5G4G運營商

2021-01-20 15:41:05

人工智能自然語言技術(shù)

2020-02-09 16:18:45

Redis快 5 倍中間件

2016-08-09 21:18:31

5G4G5G網(wǎng)絡(luò)

2024-04-19 14:50:00

find文件查找

2023-11-30 10:13:17

TensorRT架構(gòu)

2022-04-13 09:44:16

5G萬兆網(wǎng)絡(luò)數(shù)字化

2025-03-03 13:12:33

C#代碼Python

2022-10-27 08:31:31

架構(gòu)

2024-03-26 10:13:54

日志引擎SigLens

2023-09-07 11:29:36

API開發(fā)

2017-11-14 08:25:36

數(shù)據(jù)庫MySQL安全登陸

2025-03-13 11:59:00

2018-07-11 15:08:35

2016-07-07 15:38:07

京東

2018-06-06 10:15:21

區(qū)塊鏈互聯(lián)網(wǎng)CCTV
點贊
收藏

51CTO技術(shù)棧公眾號