優(yōu)雅的讀取http請求或響應(yīng)的數(shù)據(jù)
從 http.Request.Body 或 http.Response.Body 中讀取數(shù)據(jù)方法或許很多,標(biāo)準(zhǔn)庫中大多數(shù)使用 ioutil.ReadAll 方法一次讀取所有數(shù)據(jù),如果是 json 格式的數(shù)據(jù)還可以使用 json.NewDecoder 從 io.Reader 創(chuàng)建一個解析器,假使使用 pprof 來分析程序總是會發(fā)現(xiàn) bytes.makeSlice 分配了大量內(nèi)存,且總是排行***,今天就這個問題來說一下如何高效優(yōu)雅的讀取 http 中的數(shù)據(jù)。
背景介紹
我們有許多 api 服務(wù),全部采用 json 數(shù)據(jù)格式,請求體就是整個 json 字符串,當(dāng)一個請求到服務(wù)端會經(jīng)過一些業(yè)務(wù)處理,然后再請求后面更多的服務(wù),所有的服務(wù)之間都用 http 協(xié)議來通信(啊, 為啥不用 RPC,因為所有的服務(wù)都會對第三方開放,http + json 更好對接),大多數(shù)請求數(shù)據(jù)大小在 1K~4K,響應(yīng)的數(shù)據(jù)在 1K~8K,早期所有的服務(wù)都使用 ioutil.ReadAll 來讀取數(shù)據(jù),隨著流量增加使用 pprof 來分析發(fā)現(xiàn) bytes.makeSlice 總是排在***,并且占用了整個程序 1/10 的內(nèi)存分配,我決定針對這個問題進行優(yōu)化,下面是整個優(yōu)化過程的記錄。
pprof 分析
這里使用 https://github.com/thinkeridea/go-extend/blob/master/exnet/exhttp/expprof/pprof.go 中的 API 來實現(xiàn)生產(chǎn)環(huán)境的 /debug/pprof 監(jiān)測接口,沒有使用標(biāo)準(zhǔn)庫的 net/http/pprof 包因為會自動注冊路由,且長期開放 API,這個包可以設(shè)定 API 是否開放,并在規(guī)定時間后自動關(guān)閉接口,避免存在工具嗅探。
服務(wù)部署上線穩(wěn)定后(大約過了一天半),通過 curl 下載 allocs 數(shù)據(jù),然后使用下面的命令查看分析。
- $ go tool pprof allocs
- File: xxx
- Type: alloc_space
- Time: Jan 25, 2019 at 3:02pm (CST)
- Entering interactive mode (type "help" for commands, "o" for options)
- (pprof) top
- Showing nodes accounting for 604.62GB, 44.50% of 1358.61GB total
- Dropped 776 nodes (cum <= 6.79GB)
- Showing top 10 nodes out of 155
- flat flat% sum% cum cum%
- 111.40GB 8.20% 8.20% 111.40GB 8.20% bytes.makeSlice
- 107.72GB 7.93% 16.13% 107.72GB 7.93% github.com/sirupsen/logrus.(*Entry).WithFields
- 65.94GB 4.85% 20.98% 65.94GB 4.85% strings.Replace
- 54.10GB 3.98% 24.96% 56.03GB 4.12% github.com/json-iterator/go.(*frozenConfig).Marshal
- 47.54GB 3.50% 28.46% 47.54GB 3.50% net/url.unescape
- 47.11GB 3.47% 31.93% 48.16GB 3.55% github.com/json-iterator/go.(*Iterator).readStringSlowPath
- 46.63GB 3.43% 35.36% 103.04GB 7.58% handlers.(*AdserviceHandler).returnAd
- 42.43GB 3.12% 38.49% 84.62GB 6.23% models.LogItemsToBytes
- 42.22GB 3.11% 41.59% 42.22GB 3.11% strings.Join
- 39.52GB 2.91% 44.50% 87.06GB 6.41% net/url.parseQuery
從結(jié)果中可以看出采集期間一共分配了 1358.61GB top 10 占用了 44.50% 其中 bytes.makeSlice 占了接近 1/10,那么看看都是誰在調(diào)用 bytes.makeSlice 吧。
- (pprof) web bytes.makeSlice
從上圖可以看出調(diào)用 bytes.makeSlice 的最終方法是 ioutil.ReadAll, (受篇幅影響就沒有截取 ioutil.ReadAll 上面的方法了),而 90% 都是 ioutil.ReadAll 讀取 http 數(shù)據(jù)調(diào)用,找到地方先別急想優(yōu)化方案,先看看為啥 ioutil.ReadAll 會導(dǎo)致這么多內(nèi)存分配。
- func readAll(r io.Reader, capacity int64) (b []byte, err error) {
- var buf bytes.Buffer
- // If the buffer overflows, we will get bytes.ErrTooLarge.
- // Return that as an error. Any other panic remains.
- defer func() {
- e := recover()
- if e == nil {
- return
- }
- if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
- err = panicErr
- } else {
- panic(e)
- }
- }()
- if int64(int(capacity)) == capacity {
- buf.Grow(int(capacity))
- }
- _, err = buf.ReadFrom(r)
- return buf.Bytes(), err
- }
- func ReadAll(r io.Reader) ([]byte, error) {
- return readAll(r, bytes.MinRead)
- }
以上是標(biāo)準(zhǔn)庫 ioutil.ReadAll 的代碼,每次會創(chuàng)建一個 var buf bytes.Buffer 并且初始化 buf.Grow(int(capacity)) 的大小為 bytes.MinRead, 這個值呢就是 512,按這個 buffer 的大小讀取一次數(shù)據(jù)需要分配 2~16 次內(nèi)存,天啊簡直不能忍,我自己創(chuàng)建一個 buffer 好不好。
看一下火焰圖🔥吧,其中紅框標(biāo)記的就是 ioutil.ReadAll 的部分,顏色比較鮮艷。
優(yōu)化讀取方法
自己創(chuàng)建足夠大的 buffer 減少因為容量不夠?qū)е碌亩啻螖U容問題。
- buffer := bytes.NewBuffer(make([]byte, 4096))
- _, err := io.Copy(buffer, request.Body)
- if err !=nil{
- return nil, err
- }
恩恩這樣應(yīng)該差不多了,為啥是初始化 4096 的大小,這是個均值,即使比 4096 大基本也就多分配一次內(nèi)存即可,而且大多數(shù)數(shù)據(jù)都是比 4096 小的。
但是這樣真的就算好了嗎,當(dāng)然不能這樣,這個 buffer 個每請求都要創(chuàng)建一次,是不是應(yīng)該考慮一下復(fù)用呢,使用 sync.Pool 建立一個緩沖池效果就更好了。
以下是優(yōu)化讀取請求的簡化代碼:
- package adapter
- import (
- "bytes"
- "io"
- "net/http"
- "sync"
- "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "github.com/thinkeridea/go-extend/exbytes"
- )
- type Adapter struct {
- pool sync.Pool
- }
- func New() *Adapter {
- return &Adapter{
- pool: sync.Pool{
- New: func() interface{} {
- return bytes.NewBuffer(make([]byte, 4096))
- },
- },
- }
- }
- func (api *Adapter) GetRequest(r *http.Request) (*Request, error) {
- buffer := api.pool.Get().(*bytes.Buffer)
- buffer.Reset()
- defer func() {
- if buffer != nil {
- api.pool.Put(buffer)
- buffer = nil
- }
- }()
- _, err := io.Copy(buffer, r.Body)
- if err != nil {
- return nil, err
- }
- request := &Request{}
- if err = jsoniter.Unmarshal(buffer.Bytes(), request); err != nil {
- logrus.WithFields(logrus.Fields{
- "json": exbytes.ToString(buffer.Bytes()),
- }).Errorf("jsoniter.UnmarshalJSON fail. error:%v", err)
- return nil, err
- }
- api.pool.Put(buffer)
- buffer = nil
- // ....
- return request, nil
- }
使用 sync.Pool 的方式是不是有點怪,主要是 defer 和 api.pool.Put(buffer);buffer = nil 這里解釋一下,為了提高 buufer 的復(fù)用率會在不使用時盡快把 buffer 放回到緩沖池中,defer 之所以會判斷 buffer != nil 主要是在業(yè)務(wù)邏輯出現(xiàn)錯誤時,但是 buffer 還沒有放回緩沖池時把 buffer 放回到緩沖池,因為在每個錯誤處理之后都寫 api.pool.Put(buffer) 不是一個好的方法,而且容易忘記,但是如果在確定不再使用時 api.pool.Put(buffer);buffer = nil 就可以盡早把 buffer 放回到緩沖池中,提高復(fù)用率,減少新建 buffer。
這樣就好了嗎,別急,之前說服務(wù)里面還會構(gòu)建請求,看看構(gòu)建請求如何優(yōu)化吧。
- package adapter
- import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "sync"
- "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "github.com/thinkeridea/go-extend/exbytes"
- )
- type Adapter struct {
- pool sync.Pool
- }
- func New() *Adapter {
- return &Adapter{
- pool: sync.Pool{
- New: func() interface{} {
- return bytes.NewBuffer(make([]byte, 4096))
- },
- },
- }
- }
- func (api *Adapter) Request(r *Request) (*Response, error) {
- var err error
- buffer := api.pool.Get().(*bytes.Buffer)
- buffer.Reset()
- defer func() {
- if buffer != nil {
- api.pool.Put(buffer)
- buffer = nil
- }
- }()
- e := jsoniter.NewEncoder(buffer)
- err = e.Encode(r)
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "request": r,
- }).Errorf("jsoniter.Marshal failure: %v", err)
- return nil, fmt.Errorf("jsoniter.Marshal failure: %v", err)
- }
- data := buffer.Bytes()
- req, err := http.NewRequest("POST", "http://xxx.com", buffer)
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "data": exbytes.ToString(data),
- }).Errorf("http.NewRequest failed: %v", err)
- return nil, fmt.Errorf("http.NewRequest failed: %v", err)
- }
- req.Header.Set("User-Agent", "xxx")
- httpResponse, err := http.DefaultClient.Do(req)
- if httpResponse != nil {
- defer func() {
- io.Copy(ioutil.Discard, httpResponse.Body)
- httpResponse.Body.Close()
- }()
- }
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "url": "http://xxx.com",
- }).Errorf("query service failed %v", err)
- return nil, fmt.Errorf("query service failed %v", err)
- }
- if httpResponse.StatusCode != 200 {
- logrus.WithFields(logrus.Fields{
- "url": "http://xxx.com",
- "status": httpResponse.Status,
- "status_code": httpResponse.StatusCode,
- }).Errorf("invalid http status code")
- return nil, fmt.Errorf("invalid http status code")
- }
- buffer.Reset()
- _, err = io.Copy(buffer, httpResponse.Body)
- if err != nil {
- return nil, fmt.Errorf("adapter io.copy failure error:%v", err)
- }
- respData := buffer.Bytes()
- logrus.WithFields(logrus.Fields{
- "response_json": exbytes.ToString(respData),
- }).Debug("response json")
- res := &Response{}
- err = jsoniter.Unmarshal(respData, res)
- if err != nil {
- logrus.WithFields(logrus.Fields{
- "data": exbytes.ToString(respData),
- "url": "http://xxx.com",
- }).Errorf("adapter jsoniter.Unmarshal failed, error:%v", err)
- return nil, fmt.Errorf("adapter jsoniter.Unmarshal failed, error:%v", err)
- }
- api.pool.Put(buffer)
- buffer = nil
- // ...
- return res, nil
- }
這個示例和之前差不多,只是不僅用來讀取 http.Response.Body 還用來創(chuàng)建一個 jsoniter.NewEncoder 用來把請求壓縮成 json 字符串,并且作為 http.NewRequest 的 body 參數(shù), 如果直接用 jsoniter.Marshal 同樣會創(chuàng)建很多次內(nèi)存,jsoniter 也使用 buffer 做為緩沖區(qū),并且默認大小為 512, 代碼如下:
- func (cfg Config) Froze() API {
- api := &frozenConfig{
- sortMapKeys: cfg.SortMapKeys,
- indentionStep: cfg.IndentionStep,
- objectFieldMustBeSimpleString: cfg.ObjectFieldMustBeSimpleString,
- onlyTaggedField: cfg.OnlyTaggedField,
- disallowUnknownFields: cfg.DisallowUnknownFields,
- }
- api.streamPool = &sync.Pool{
- New: func() interface{} {
- return NewStream(api, nil, 512)
- },
- }
- // .....
- return api
- }
而且序列化之后會進行一次數(shù)據(jù)拷貝:
- func (cfg *frozenConfig) Marshal(v interface{}) ([]byte, error) {
- stream := cfg.BorrowStream(nil)
- defer cfg.ReturnStream(stream)
- stream.WriteVal(v)
- if stream.Error != nil {
- return nil, stream.Error
- }
- result := stream.Buffer()
- copied := make([]byte, len(result))
- copy(copied, result)
- return copied, nil
- }
既然要用 buffer 那就一起吧^_^,這樣可以減少多次內(nèi)存分配,下讀取 http.Response.Body 之前一定要記得 buffer.Reset(), 這樣基本就已經(jīng)完成了 http.Request.Body 和 http.Response.Body 的數(shù)據(jù)讀取優(yōu)化了,具體效果等上線跑一段時間穩(wěn)定之后來查看吧。
效果分析
上線跑了一天,來看看效果吧。
- $ go tool pprof allocs2
- File: connect_server
- Type: alloc_space
- Time: Jan 26, 2019 at 10:27am (CST)
- Entering interactive mode (type "help" for commands, "o" for options)
- (pprof) top
- Showing nodes accounting for 295.40GB, 40.62% of 727.32GB total
- Dropped 738 nodes (cum <= 3.64GB)
- Showing top 10 nodes out of 174
- flat flat% sum% cum cum%
- 73.52GB 10.11% 10.11% 73.52GB 10.11% git.tvblack.com/tvblack/connect_server/vendor/github.com/sirupsen/logrus.(*Entry).WithFields
- 31.70GB 4.36% 14.47% 31.70GB 4.36% net/url.unescape
- 27.49GB 3.78% 18.25% 54.87GB 7.54% git.tvblack.com/tvblack/connect_server/models.LogItemsToBytes
- 27.41GB 3.77% 22.01% 27.41GB 3.77% strings.Join
- 25.04GB 3.44% 25.46% 25.04GB 3.44% bufio.NewWriterSize
- 24.81GB 3.41% 28.87% 24.81GB 3.41% bufio.NewReaderSize
- 23.91GB 3.29% 32.15% 23.91GB 3.29% regexp.(*bitState).reset
- 23.06GB 3.17% 35.32% 23.06GB 3.17% math/big.nat.make
- 19.90GB 2.74% 38.06% 20.35GB 2.80% git.tvblack.com/tvblack/connect_server/vendor/github.com/json-iterator/go.(*Iterator).readStringSlowPath
- 18.58GB 2.56% 40.62% 19.12GB 2.63% net/textproto.(*Reader).ReadMIMEHeader
哇塞 bytes.makeSlice 終于從前十中消失了,真的太棒了,還是看看 bytes.makeSlice 的其它調(diào)用情況吧。
- (pprof) web bytes.makeSlice
從圖中可以發(fā)現(xiàn) bytes.makeSlice 的分配已經(jīng)很小了, 且大多數(shù)是 http.Request.ParseForm 讀取 http.Request.Body 使用 ioutil.ReadAll 原因,這次優(yōu)化的效果非常的好。
看一下更直觀的火焰圖吧,和優(yōu)化前對比一下很明顯 ioutil.ReadAll 看不到了。
優(yōu)化期間遇到的問題
比較慚愧在優(yōu)化的過程出現(xiàn)了一個過失,導(dǎo)致生產(chǎn)環(huán)境2分鐘故障,通過自動部署立即回滾才得以快速恢復(fù),之后分析代碼解決之后上線才***優(yōu)化,下面總結(jié)一下出現(xiàn)的問題吧。
在構(gòu)建 http 請求時我分了兩個部分優(yōu)化,序列化 json 和讀取 http.Response.Body 數(shù)據(jù),保持一個觀點就是盡早把 buffer 放回到緩沖池,因為 http.DefaultClient.Do(req) 是網(wǎng)絡(luò)請求會相對耗時,在這個之前我把 buffer 放回到緩沖池中,之后讀取 http.Response.Body 時在重新獲取一個 buffer,大概代碼如下:
- package adapter
- import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "net/http"
- "sync"
- "github.com/json-iterator/go"
- "github.com/sirupsen/logrus"
- "github.com/thinkeridea/go-extend/exbytes"
- )
- type Adapter struct {
- pool sync.Pool
- }
- func New() *Adapter {
- return &Adapter{
- pool: sync.Pool{
- New: func() interface{} {
- return bytes.NewBuffer(make([]byte, 4096))
- },
- },
- }
- }
- func (api *Adapter) Request(r *Request) (*Response, error) {
- var err error
- buffer := api.pool.Get().(*bytes.Buffer)
- buffer.Reset()
- defer func() {
- if buffer != nil {
- api.pool.Put(buffer)
- buffer = nil
- }
- }()
- e := jsoniter.NewEncoder(buffer)
- err = e.Encode(r)
- if err != nil {
- return nil, fmt.Errorf("jsoniter.Marshal failure: %v", err)
- }
- data := buffer.Bytes()
- req, err := http.NewRequest("POST", "http://xxx.com", buffer)
- if err != nil {
- return nil, fmt.Errorf("http.NewRequest failed: %v", err)
- }
- req.Header.Set("User-Agent", "xxx")
- api.pool.Put(buffer)
- buffer = nil
- httpResponse, err := http.DefaultClient.Do(req)
- // ....
- buffer = api.pool.Get().(*bytes.Buffer)
- buffer.Reset()
- defer func() {
- if buffer != nil {
- api.pool.Put(buffer)
- buffer = nil
- }
- }()
- _, err = io.Copy(buffer, httpResponse.Body)
- if err != nil {
- return nil, fmt.Errorf("adapter io.copy failure error:%v", err)
- }
- // ....
- api.pool.Put(buffer)
- buffer = nil
- // ...
- return res, nil
- }
上線之后馬上發(fā)生了錯誤 http: ContentLength=2090 with Body length 0 發(fā)送請求的時候從 buffer 讀取數(shù)據(jù)發(fā)現(xiàn)數(shù)據(jù)不見了或者數(shù)據(jù)不夠了,我去這是什么鬼,馬上回滾恢復(fù)業(yè)務(wù),然后分析 http.DefaultClient.Do(req) 和 http.NewRequest,在調(diào)用 http.NewRequest 是并沒有從 buffer 讀取數(shù)據(jù),而只是創(chuàng)建了一個 req.GetBody 之后在 http.DefaultClient.Do 是才讀取數(shù)據(jù),因為在 http.DefaultClient.Do 之前把 buffer 放回到緩沖池中,其它 goroutine 獲取到 buffer 并進行 Reset 就發(fā)生了數(shù)據(jù)爭用,當(dāng)然會導(dǎo)致數(shù)據(jù)讀取不完整了,真實汗顏,對 http.Client 了解太少,爭取有空擼一遍源碼。
總結(jié)
使用合適大小的 buffer 來減少內(nèi)存分配,sync.Pool 可以幫助復(fù)用 buffer, 一定要自己寫這些邏輯,避免使用三方包,三方包即使使用同樣的技巧為了避免數(shù)據(jù)爭用,在返回數(shù)據(jù)時候必然會拷貝一個新的數(shù)據(jù)返回,就像 jsoniter 雖然使用了 sync.Pool 和 buffer 但是返回數(shù)據(jù)時還需要拷貝,另外這種通用包并不能給一個非常貼合業(yè)務(wù)的初始 buffer 大小,過小會導(dǎo)致數(shù)據(jù)發(fā)生拷貝,過大會太過浪費內(nèi)存。
程序中善用 buffer 和 sync.Pool 可以大大的改善程序的性能,并且這兩個組合在一起使用非常的簡單,并不會使代碼變的復(fù)雜。