自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

微服務(wù)架構(gòu)下的熔斷框架:Hystrix-Go

開發(fā) 架構(gòu)
伴隨著微服務(wù)架構(gòu)被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務(wù),就離不開這幾個字:高內(nèi)聚低耦合;微服務(wù)的架構(gòu)設(shè)計最終目的也就是實現(xiàn)這幾個字。

[[421890]]

本文轉(zhuǎn)載自微信公眾號「Golang夢工廠」,作者AsongGo 。轉(zhuǎn)載本文請聯(lián)系Golang夢工廠公眾號。

背景

伴隨著微服務(wù)架構(gòu)被宣傳得如火如茶,一些概念也被推到了我們的面前。一提到微服務(wù),就離不開這幾個字:高內(nèi)聚低耦合;微服務(wù)的架構(gòu)設(shè)計最終目的也就是實現(xiàn)這幾個字。在微服務(wù)架構(gòu)中,微服務(wù)就是完成一個單一的業(yè)務(wù)功能,每個微服務(wù)可以獨立演進,一個應(yīng)用可能會有多個微服務(wù)組成,微服務(wù)之間的數(shù)據(jù)交可以通過遠程調(diào)用來完成,這樣在一個微服務(wù)架構(gòu)下就會形成這樣的依賴關(guān)系:

微服務(wù)A調(diào)用微服務(wù)C、D,微服務(wù)B又依賴微服務(wù)B、E,微服務(wù)D依賴于服務(wù)F,這只是一個簡單的小例子,實際業(yè)務(wù)中服務(wù)之間的依賴關(guān)系比這還復雜,這樣在調(diào)用鏈路上如果某個微服務(wù)的調(diào)用響應(yīng)時間過長或者不可用,那么對上游服務(wù)(按調(diào)用關(guān)系命名)的調(diào)用就會占用越來越多的系統(tǒng)資源,進而引起系統(tǒng)崩潰,這就是微服務(wù)的雪蹦效應(yīng)。

為了解決微服務(wù)的雪蹦效應(yīng),提出來使用熔斷機制為微服務(wù)鏈路提供保護機制。熔斷機制大家應(yīng)該都不陌生,電路的中保險絲就是一種熔斷機制,在微服務(wù)中的熔斷機制是什么樣的呢?

當鏈路中的某個微服務(wù)不可用或者響應(yīng)的時間太長時,會進行服務(wù)的降級,進而熔斷該節(jié)點微服務(wù)的調(diào)用,快速返回錯誤的響應(yīng)信息,當檢測到該節(jié)點微服務(wù)調(diào)用響應(yīng)正常后,恢復調(diào)用鏈路。

本文我們就介紹一個開源熔斷框架:hystrix-go。

熔斷框架(hystrix-go)

Hystrix是一個延遲和容錯庫,旨在隔離對遠程系統(tǒng)、服務(wù)和第三方服務(wù)的訪問點,停止級聯(lián)故障并在故障不可避免的復雜分布式系統(tǒng)中實現(xiàn)彈性。hystrix-go 旨在允許 Go 程序員輕松構(gòu)建具有與基于 Java 的 Hystrix 庫類似的執(zhí)行語義的應(yīng)用程序。所以本文就從使用開始到源碼分析一下hystrix-go。

快速安裝

  1. go get -u github.com/afex/hystrix-go/hystrix 

快速使用

hystrix-go真的是開箱即用,使用還是比較簡單的,主要分為兩個步驟:

  • 配置熔斷規(guī)則,否則將使用默認配置??梢哉{(diào)用的方法
  1. func Configure(cmds map[string]CommandConfig)  
  2. func ConfigureCommand(name string, config CommandConfig) 

Configure方法內(nèi)部也是調(diào)用的ConfigureCommand方法,就是傳參數(shù)不一樣,根據(jù)自己的代碼風格選擇。

  • 定義依賴于外部系統(tǒng)的應(yīng)用程序邏輯 - runFunc 和服務(wù)中斷期間執(zhí)行的邏輯代碼 - fallbackFunc,可以調(diào)用的方法:
  1. func Go(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用Goc方法 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) // 內(nèi)部調(diào)用的是Doc方法 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) // 內(nèi)部調(diào)用Goc方法,處理了異步過程 

Go和Do的區(qū)別在于異步還是同步,Do方法在調(diào)用Doc方法內(nèi)處理了異步過程,他們最終都是調(diào)用的Goc方法。后面我們進行分析。

舉一個例子:我們在Gin框架上加一個接口級的熔斷中間件

  1. // 代碼已上傳github: 文末查看地址 
  2. var CircuitBreakerName = "api_%s_circuit_breaker" 
  3. func CircuitBreakerWrapper(ctx *gin.Context){ 
  4.  name := fmt.Sprintf(CircuitBreakerName,ctx.Request.URL) 
  5.  hystrix.Do(name, func() error { 
  6.   ctx.Next() 
  7.   code := ctx.Writer.Status() 
  8.   if code != http.StatusOK{ 
  9.    return errors.New(fmt.Sprintf("status code %d", code)) 
  10.   } 
  11.   return nil 
  12.  
  13.  }, func(err error) error { 
  14.   if err != nil{ 
  15.    // 監(jiān)控上報(未實現(xiàn)) 
  16.    _, _ = io.WriteString(f, fmt.Sprintf("circuitBreaker and err is %s\n",err.Error())) //寫入文件(字符串) 
  17.    fmt.Printf("circuitBreaker and err is %s\n",err.Error()) 
  18.    // 返回熔斷錯誤 
  19.    ctx.JSON(http.StatusServiceUnavailable,gin.H{ 
  20.     "msg": err.Error(), 
  21.    }) 
  22.   } 
  23.   return nil 
  24.  }) 
  25.  
  26. func init()  { 
  27.  hystrix.ConfigureCommand(CircuitBreakerName,hystrix.CommandConfig{ 
  28.   Timeout:                int(3*time.Second), // 執(zhí)行command的超時時間為3s 
  29.   MaxConcurrentRequests:  10, // command的最大并發(fā)量 
  30.   RequestVolumeThreshold: 100, // 統(tǒng)計窗口10s內(nèi)的請求數(shù)量,達到這個請求數(shù)量后才去判斷是否要開啟熔斷 
  31.   SleepWindow:            int(2 * time.Second), // 當熔斷器被打開后,SleepWindow的時間就是控制過多久后去嘗試服務(wù)是否可用了 
  32.   ErrorPercentThreshold:  20, // 錯誤百分比,請求數(shù)量大于等于RequestVolumeThreshold并且錯誤率到達這個百分比后就會啟動熔斷 
  33.  }) 
  34.  if checkFileIsExist(filename) { //如果文件存在 
  35.   f, errfile = os.OpenFile(filename, os.O_APPEND, 0666) //打開文件 
  36.  } else { 
  37.   f, errfile = os.Create(filename) //創(chuàng)建文件 
  38.  } 
  39.  
  40.  
  41. func main()  { 
  42.  defer f.Close() 
  43.  hystrixStreamHandler := hystrix.NewStreamHandler() 
  44.  hystrixStreamHandler.Start() 
  45.  go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 
  46.  r := gin.Default() 
  47.  r.GET("/api/ping/baidu", func(c *gin.Context) { 
  48.   _, err := http.Get("https://www.baidu.com"
  49.   if err != nil { 
  50.    c.JSON(http.StatusInternalServerError, gin.H{"msg": err.Error()}) 
  51.    return 
  52.   } 
  53.   c.JSON(http.StatusOK, gin.H{"msg""success"}) 
  54.  }, CircuitBreakerWrapper) 
  55.  r.Run()  // listen and serve on 0.0.0.0:8080 (for windows "localhost:8080"
  56.  
  57. func checkFileIsExist(filename string) bool { 
  58.  if _, err := os.Stat(filename); os.IsNotExist(err) { 
  59.   return false 
  60.  } 
  61.  return true 

指令:wrk -t100 -c100 -d1s http://127.0.0.1:8080/api/ping/baidu

運行結(jié)果:

  1. circuitBreaker and err is status code 500 
  2. circuitBreaker and err is status code 500 
  3. .....  
  4. circuitBreaker and err is hystrix: max concurrency 
  5. circuitBreaker and err is hystrix: max concurrency 
  6. ..... 
  7. circuitBreaker and err is hystrix: circuit open 
  8. circuitBreaker and err is hystrix: circuit open 
  9. ..... 

對錯誤進行分析:

  • circuitBreaker and err is status code 500:因為我們關(guān)閉了網(wǎng)絡(luò),所以請求是沒有響應(yīng)的
  • circuitBreaker and err is hystrix: max concurrency:我們設(shè)置的最大并發(fā)量MaxConcurrentRequests是10,我們的壓測工具使用的是100并發(fā),所有會觸發(fā)這個熔斷
  • circuitBreaker and err is hystrix: circuit open:我們設(shè)置熔斷開啟的請求數(shù)量RequestVolumeThreshold是100,所以當10s內(nèi)的請求數(shù)量大于100時就會觸發(fā)熔斷。

簡單對上面的例子做一個解析:

  • 添加接口級的熔斷中間件
  • 初始化熔斷相關(guān)配置
  • 開啟dashboard 可視化hystrix的上報信息,瀏覽器打開http://localhost:81,可以看到如下結(jié)果:

hystrix-go流程分析

本來想對源碼進行分析,代碼量有點大,所以就針對流程來分析,順便看一些核心代碼。

配置熔斷規(guī)則

既然是熔斷,就要有熔斷規(guī)則,我們可以調(diào)用兩個方法配置熔斷規(guī)則,不會最終調(diào)用的都是ConfigureCommand,這里沒有特別的邏輯,如果我們沒有配置,系統(tǒng)將使用默認熔斷規(guī)則:

  1. var ( 
  2.  // DefaultTimeout is how long to wait for command to complete, in milliseconds 
  3.  DefaultTimeout = 1000 
  4.  // DefaultMaxConcurrent is how many commands of the same type can run at the same time 
  5.  DefaultMaxConcurrent = 10 
  6.  // DefaultVolumeThreshold is the minimum number of requests needed before a circuit can be tripped due to health 
  7.  DefaultVolumeThreshold = 20 
  8.  // DefaultSleepWindow is how long, in milliseconds, to wait after a circuit opens before testing for recovery 
  9.  DefaultSleepWindow = 5000 
  10.  // DefaultErrorPercentThreshold causes circuits to open once the rolling measure of errors exceeds this percent of requests 
  11.  DefaultErrorPercentThreshold = 50 
  12.  // DefaultLogger is the default logger that will be used in the Hystrix package. By default prints nothing. 
  13.  DefaultLogger = NoopLogger{} 

配置規(guī)則如下:

  • Timeout:定義執(zhí)行command的超時時間,時間單位是ms,默認時間是1000ms;
  • MaxConcurrnetRequests:定義command的最大并發(fā)量,默認值是10并發(fā)量;
  • SleepWindow:熔斷器被打開后使用,在熔斷器被打開后,根據(jù)SleepWindow設(shè)置的時間控制多久后嘗試服務(wù)是否可用,默認時間為5000ms;
  • RequestVolumeThreshold:判斷熔斷開關(guān)的條件之一,統(tǒng)計10s(代碼中寫死了)內(nèi)請求數(shù)量,達到這個請求數(shù)量后再根據(jù)錯誤率判斷是否要開啟熔斷;
  • ErrorPercentThreshold:判斷熔斷開關(guān)的條件之一,統(tǒng)計錯誤百分比,請求數(shù)量大于等于RequestVolumeThreshold并且錯誤率到達這個百分比后就會啟動熔斷 默認值是50;

這些規(guī)則根據(jù)command的name進行區(qū)分存放到一個map中。

執(zhí)行command

執(zhí)行command主要可以調(diào)用四個方法,分別是:

  1. func Go(name string, run runFunc, fallback fallbackFunc) 
  2. func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC)  
  3. func Do(name string, run runFunc, fallback fallbackFunc) 
  4. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) 

Do內(nèi)部調(diào)用的Doc方法,Go內(nèi)部調(diào)用的是Goc方法,在Doc方法內(nèi)部最終調(diào)用的還是Goc方法,只是在Doc方法內(nèi)做了同步邏輯:

  1. func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error { 
  2.   ..... 省略部分封裝代碼 
  3.   var errChan chan error 
  4.  if fallback == nil { 
  5.   errChan = GoC(ctx, name, r, nil) 
  6.  } else { 
  7.   errChan = GoC(ctx, name, r, f) 
  8.  } 
  9.  
  10.  select { 
  11.  case <-done: 
  12.   return nil 
  13.  case err := <-errChan: 
  14.   return err 
  15.  } 

因為他們最終都是調(diào)用的Goc方法,所以我們執(zhí)行分析Goc方法的內(nèi)部邏輯;代碼有點長,我們分邏輯來分析:

創(chuàng)建command對象

  1. cmd := &command{ 
  2.  run:      run, 
  3.  fallback: fallback, 
  4.  start:    time.Now(), 
  5.  errChan:  make(chan error, 1), 
  6.  finished: make(chan bool, 1), 
  7. // 獲取熔斷器 
  8. circuit, _, err := GetCircuit(name
  9. if err != nil { 
  10.  cmd.errChan <- err 
  11.  return cmd.errChan 

介紹一下command的數(shù)據(jù)結(jié)構(gòu):

  1. type command struct { 
  2.  sync.Mutex 
  3.  
  4.  ticket      *struct{} 
  5.  start       time.Time 
  6.  errChan     chan error 
  7.  finished    chan bool 
  8.  circuit     *CircuitBreaker 
  9.  run         runFuncC 
  10.  fallback    fallbackFuncC 
  11.  runDuration time.Duration 
  12.  events      []string 

字段介紹:

  • ticket:用來做最大并發(fā)量控制,這個就是一個令牌
  • start:記錄command執(zhí)行的開始時間
  • errChan:記錄command執(zhí)行錯誤
  • finished:標志command執(zhí)行結(jié)束,用來做協(xié)程同步
  • circuit:存儲熔斷器相關(guān)信息
  • run:應(yīng)用程序
  • fallback:應(yīng)用程序執(zhí)行失敗后要執(zhí)行的函數(shù)
  • runDuration:記錄command執(zhí)行消耗時間
  • events:events主要是存儲事件類型信息,比如執(zhí)行成功的success,或者失敗的timeout、context_canceled等

上段代碼重點是GetCircuit方法,這一步的目的就是獲取熔斷器,使用動態(tài)加載的方式,如果沒有就創(chuàng)建一個熔斷器,熔斷器結(jié)構(gòu)如下:

  1. type CircuitBreaker struct { 
  2.  Name                   string 
  3.  open                   bool 
  4.  forceOpen              bool 
  5.  mutex                  *sync.RWMutex 
  6.  openedOrLastTestedTime int64 
  7.  
  8.  executorPool *executorPool 
  9.  metrics      *metricExchange 

解釋一下這幾個字段:

  • name:熔斷器的名字,其實就是創(chuàng)建的command名字
  • open:判斷熔斷器是否打開的標志
  • forceopen:手動觸發(fā)熔斷器的開關(guān),單元測試使用
  • mutex:使用讀寫鎖保證并發(fā)安全
  • openedOrLastTestedTime:記錄上一次打開熔斷器的時間,因為要根據(jù)這個時間和SleepWindow時間來做恢復嘗試
  • executorPool:用來做流量控制,因為我們有一個最大并發(fā)量控制,就是根據(jù)這個來做的流量控制,每次請求都要獲取令牌

metrics:用來上報執(zhí)行狀態(tài)的事件,通過它把執(zhí)行狀態(tài)信息存儲到實際熔斷器執(zhí)行各個維度狀態(tài) (成功次數(shù),失敗次數(shù),超時……) 的數(shù)據(jù)集合中。

后面會單獨分析executorPool、metrics的實現(xiàn)邏輯。

定義令牌相關(guān)的方法和變量

因為我們有一個條件是最大并發(fā)控制,采用的是令牌的方式進行流量控制,每一個請求都要獲取一個令牌,使用完畢要把令牌還回去,先看一下這段代碼:

  1. ticketCond := sync.NewCond(cmd) 
  2. ticketChecked := false 
  3. // When the caller extracts error from returned errChan, it's assumed that 
  4. // the ticket's been returned to executorPool. Therefore, returnTicket() can 
  5. // not run after cmd.errorWithFallback(). 
  6. returnTicket := func() { 
  7.  cmd.Lock() 
  8.  // Avoid releasing before a ticket is acquired. 
  9.  for !ticketChecked { 
  10.   ticketCond.Wait() 
  11.  } 
  12.  cmd.circuit.executorPool.Return(cmd.ticket) 
  13.  cmd.Unlock() 

使用sync.NewCond創(chuàng)建一個條件變量,用來協(xié)調(diào)通知你可以歸還令牌了。

然后定義一個返回令牌的方法,調(diào)用Return方法歸還令牌。

定義上報執(zhí)行事件的方法

前面我們也提到了,我們的熔斷器會上報執(zhí)行狀態(tài)的事件,通過它把執(zhí)行狀態(tài)信息存儲到實際熔斷器執(zhí)行各個維度狀態(tài) (成功次數(shù),失敗次數(shù),超時……) 的數(shù)據(jù)集合中。所以要定義一個上報的方法:

  1. reportAllEvent := func() { 
  2.  err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration) 
  3.  if err != nil { 
  4.   log.Printf(err.Error()) 
  5.  } 

開啟協(xié)程一:執(zhí)行應(yīng)用程序邏輯 - runFunc

協(xié)程一的主要目的就是執(zhí)行應(yīng)用程序邏輯:

  1. go func() { 
  2.   defer func() { cmd.finished <- true }() // 標志協(xié)程一的command執(zhí)行結(jié)束,同步到協(xié)程二 
  3.  
  4.   // 當最近執(zhí)行的并發(fā)數(shù)量超過閾值并且錯誤率很高時,就會打開熔斷器。  
  5.    // 如果熔斷器打開,直接拒絕拒絕請求并返回令牌,當感覺健康狀態(tài)恢復時,熔斷器將允許新的流量。 
  6.   if !cmd.circuit.AllowRequest() { 
  7.    cmd.Lock() 
  8.    // It's safe for another goroutine to go ahead releasing a nil ticket. 
  9.    ticketChecked = true 
  10.    ticketCond.Signal() // 通知釋放ticket信號 
  11.    cmd.Unlock() 
  12.       // 使用sync.Onece保證只執(zhí)行一次。 
  13.    returnOnce.Do(func() { 
  14.         // 返還令牌 
  15.     returnTicket() 
  16.         // 執(zhí)行fallback邏輯 
  17.     cmd.errorWithFallback(ctx, ErrCircuitOpen) 
  18.         // 上報狀態(tài)事件 
  19.     reportAllEvent() 
  20.    }) 
  21.    return 
  22.   } 
  23.    // 控制并發(fā) 
  24.   cmd.Lock() 
  25.   select { 
  26.     // 獲取到令牌 
  27.   case cmd.ticket = <-circuit.executorPool.Tickets: 
  28.       // 發(fā)送釋放令牌信號 
  29.    ticketChecked = true 
  30.    ticketCond.Signal() 
  31.    cmd.Unlock() 
  32.   default
  33.       // 沒有令牌可用了, 也就是達到最大并發(fā)數(shù)量則直接處理fallback邏輯 
  34.    ticketChecked = true 
  35.    ticketCond.Signal() 
  36.    cmd.Unlock() 
  37.    returnOnce.Do(func() { 
  38.     returnTicket() 
  39.     cmd.errorWithFallback(ctx, ErrMaxConcurrency) 
  40.     reportAllEvent() 
  41.    }) 
  42.    return 
  43.   } 
  44.   // 執(zhí)行應(yīng)用程序邏輯 
  45.   runStart := time.Now() 
  46.   runErr := run(ctx) 
  47.   returnOnce.Do(func() { 
  48.    defer reportAllEvent() // 狀態(tài)事件上報 
  49.       // 統(tǒng)計應(yīng)用程序執(zhí)行時長 
  50.    cmd.runDuration = time.Since(runStart) 
  51.       // 返還令牌 
  52.    returnTicket() 
  53.       // 如果應(yīng)用程序執(zhí)行失敗執(zhí)行fallback函數(shù) 
  54.    if runErr != nil { 
  55.     cmd.errorWithFallback(ctx, runErr) 
  56.     return 
  57.    } 
  58.    cmd.reportEvent("success"
  59.   }) 
  60.  }() 

總結(jié)一下這個協(xié)程:

  • 判斷熔斷器是否打開,如果打開了熔斷器直接進行熔斷,不在進行后面的請求
  • 運行應(yīng)用程序邏輯

開啟協(xié)程二:同步協(xié)程一并監(jiān)聽錯誤

先看代碼:

  1. go func() { 
  2.     //  使用定時器來做超時控制,這個超時時間就是我們配置的,默認1000ms 
  3.   timer := time.NewTimer(getSettings(name).Timeout) 
  4.   defer timer.Stop() 
  5.  
  6.   select { 
  7.       // 同步協(xié)程一 
  8.   case <-cmd.finished: 
  9.    // returnOnce has been executed in another goroutine 
  10.        
  11.     // 是否收到context取消信號 
  12.   case <-ctx.Done(): 
  13.    returnOnce.Do(func() { 
  14.     returnTicket() 
  15.     cmd.errorWithFallback(ctx, ctx.Err()) 
  16.     reportAllEvent() 
  17.    }) 
  18.    return 
  19.     // command執(zhí)行超時了 
  20.   case <-timer.C: 
  21.    returnOnce.Do(func() { 
  22.     returnTicket() 
  23.     cmd.errorWithFallback(ctx, ErrTimeout) 
  24.     reportAllEvent() 
  25.    }) 
  26.    return 
  27.   } 
  28.  }() 

這個協(xié)程的邏輯比較清晰明了,目的就是監(jiān)聽業(yè)務(wù)執(zhí)行被取消以及超時。

畫圖總結(jié)command執(zhí)行流程

上面我們都是通過代碼來進行分析的,看起來還是有點亂,最后畫個圖總結(jié)一下:

上面我們分析了整個具體流程,接下來我們針對一些核心點就行分析

上報狀態(tài)事件

hystrix-go為每一個Command設(shè)置了一個默認統(tǒng)計控制器,用來保存熔斷器的所有狀態(tài),包括調(diào)用次數(shù)、失敗次數(shù)、被拒絕次數(shù)等,存儲指標結(jié)構(gòu)如下:

  1. type DefaultMetricCollector struct { 
  2.  mutex *sync.RWMutex 
  3.  
  4.  numRequests *rolling.Number 
  5.  errors      *rolling.Number 
  6.  
  7.  successes               *rolling.Number 
  8.  failures                *rolling.Number 
  9.  rejects                 *rolling.Number 
  10.  shortCircuits           *rolling.Number 
  11.  timeouts                *rolling.Number 
  12.  contextCanceled         *rolling.Number 
  13.  contextDeadlineExceeded *rolling.Number 
  14.  
  15.  fallbackSuccesses *rolling.Number 
  16.  fallbackFailures  *rolling.Number 
  17.  totalDuration     *rolling.Timing 
  18.  runDuration       *rolling.Timing 

使用rolling.Number結(jié)構(gòu)保存狀態(tài)指標,使用rolling.Timing保存時間指標。

最終監(jiān)控上報都依靠metricExchange來實現(xiàn),數(shù)據(jù)結(jié)構(gòu)如下:

  1. type metricExchange struct { 
  2.  Name    string 
  3.  Updates chan *commandExecution 
  4.  Mutex   *sync.RWMutex 
  5.  
  6.  metricCollectors []metricCollector.MetricCollector 

上報command的信息結(jié)構(gòu):

  1. type commandExecution struct { 
  2.  Types            []string      `json:"types"` // 區(qū)分事件類型,比如success、failure.... 
  3.  Start            time.Time     `json:"start_time"` // command開始時間 
  4.  RunDuration      time.Duration `json:"run_duration"` // command結(jié)束時間 
  5.  ConcurrencyInUse float64       `json:"concurrency_inuse"` // command 線程池使用率 

說了這么多,大家還是有點懵,其實用一個類圖就能表明他們之間的關(guān)系:

我們可以看到類mertricExchange提供了一個Monitor方法,這個方法主要邏輯就是監(jiān)聽狀態(tài)事件,然后寫入指標,所以整個上報流程就是這個樣子:

流量控制

hystrix-go對流量控制采用的是令牌算法,能得到令牌的就可以執(zhí)行后繼的工作,執(zhí)行完后要返還令牌。結(jié)構(gòu)體executorPool就是hystrix-go 流量控制的具體實現(xiàn)。字段Max就是每秒最大的并發(fā)值。

  1. type executorPool struct { 
  2.  Name    string 
  3.  Metrics *poolMetrics // 上報執(zhí)行數(shù)量指標 
  4.  Max     int // 最大并發(fā)數(shù)量 
  5.  Tickets chan *struct{} // 代表令牌 

這里還有一個上報指標,這個又單獨實現(xiàn)一套方法用來統(tǒng)計執(zhí)行數(shù)量,比如執(zhí)行的總數(shù)量、最大并發(fā)數(shù)等,我們依賴畫一個類圖來表示:

上報執(zhí)行數(shù)量邏輯與上報狀態(tài)事件的邏輯是一樣的,使用channel進行數(shù)據(jù)通信的,上報與返還令牌都在Return方法中:

  1. func (p *executorPool) Return(ticket *struct{}) { 
  2.  if ticket == nil { 
  3.   return 
  4.  } 
  5.  
  6.  p.Metrics.Updates <- poolMetricsUpdate{ 
  7.   activeCount: p.ActiveCount(), 
  8.  } 
  9.  p.Tickets <- ticket 

主要邏輯兩步:

  • 上報當前可用的令牌數(shù)
  • 返回令牌

熔斷器

我們最后來分析熔斷器中一個比較重要的方法:AllowRequest,我們在執(zhí)行Command是會根據(jù)這個方法來判斷是否可以執(zhí)行command,接下來我們就來看一下這個判斷的主要邏輯:

  1. func (circuit *CircuitBreaker) AllowRequest() bool { 
  2.  return !circuit.IsOpen() || circuit.allowSingleTest() 

內(nèi)部就是調(diào)用IsOpen()、allowSingleTest這兩個方法:

  • IsOpen()
  1. func (circuit *CircuitBreaker) IsOpen() bool { 
  2.  circuit.mutex.RLock() 
  3.  o := circuit.forceOpen || circuit.open 
  4.  circuit.mutex.RUnlock() 
  5.  // 熔斷已經(jīng)開啟 
  6.  if o { 
  7.   return true 
  8.  } 
  9.  // 判斷10s內(nèi)的并發(fā)數(shù)是否超過設(shè)置的最大并發(fā)數(shù),沒有超過時,不需要開啟熔斷器 
  10.  if uint64(circuit.metrics.Requests().Sum(time.Now())) < getSettings(circuit.Name).RequestVolumeThreshold { 
  11.   return false 
  12.  } 
  13.  // 此時10s內(nèi)的并發(fā)數(shù)已經(jīng)超過設(shè)置的最大并發(fā)數(shù)了,如果此時系統(tǒng)錯誤率超過了預設(shè)值,那就開啟熔斷器 
  14.  if !circuit.metrics.IsHealthy(time.Now()) { 
  15.   //  
  16.   circuit.setOpen() 
  17.   return true 
  18.  } 
  19.  
  20.  return false 
  • allowSingleTest()

先解釋一下為什么要有這個方法,還記得我們之前設(shè)置了一個熔斷規(guī)則中的SleepWindow嗎,如果在開啟熔斷的情況下,在SleepWindow時間后進行嘗試,這個方法的目的就是干這個的:

  1. func (circuit *CircuitBreaker) allowSingleTest() bool { 
  2.  circuit.mutex.RLock() 
  3.  defer circuit.mutex.RUnlock() 
  4.   
  5.   // 獲取當前時間戳 
  6.  now := time.Now().UnixNano() 
  7.  openedOrLastTestedTime := atomic.LoadInt64(&circuit.openedOrLastTestedTime) 
  8.   // 當前熔斷器是開啟狀態(tài),當前的時間已經(jīng)大于 (上次開啟熔斷器的時間 +SleepWindow 的時間) 
  9.  if circuit.open && now > openedOrLastTestedTime+getSettings(circuit.Name).SleepWindow.Nanoseconds() { 
  10.     // 替換openedOrLastTestedTime 
  11.   swapped := atomic.CompareAndSwapInt64(&circuit.openedOrLastTestedTime, openedOrLastTestedTime, now) 
  12.   if swapped { 
  13.    log.Printf("hystrix-go: allowing single test to possibly close circuit %v", circuit.Name
  14.   } 
  15.   return swapped 
  16.  } 

這里只看到了熔斷器被開啟的設(shè)置了,但是沒有關(guān)閉熔斷器的邏輯,因為關(guān)閉熔斷器的邏輯是在上報狀態(tài)指標的方法ReportEvent內(nèi)實現(xiàn),我們最后再看一下ReportEvent的實現(xiàn):

  1. func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error { 
  2.  if len(eventTypes) == 0 { 
  3.   return fmt.Errorf("no event types sent for metrics"
  4.  } 
  5.   
  6.  circuit.mutex.RLock() 
  7.  o := circuit.open 
  8.  circuit.mutex.RUnlock() 
  9.   // 上報的狀態(tài)事件是success 并且當前熔斷器是開啟狀態(tài),則說明下游服務(wù)正常了,可以關(guān)閉熔斷器了 
  10.  if eventTypes[0] == "success" && o { 
  11.   circuit.setClose() 
  12.  } 
  13.  
  14.  var concurrencyInUse float64 
  15.  if circuit.executorPool.Max > 0 { 
  16.   concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max
  17.  } 
  18.  
  19.  select { 
  20.     // 上報狀態(tài)指標,與上文的monitor呼應(yīng) 
  21.  case circuit.metrics.Updates <- &commandExecution{ 
  22.   Types:            eventTypes, 
  23.   Start:            start, 
  24.   RunDuration:      runDuration, 
  25.   ConcurrencyInUse: concurrencyInUse, 
  26.  }: 
  27.  default
  28.   return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)} 
  29.  } 
  30.  
  31.  return nil 

可視化hystrix的上報信息

通過上面的分析我們知道hystrix-go上報了狀態(tài)事件、執(zhí)行數(shù)量事件,那么這些指標我們可以怎么查看呢?

設(shè)計者早就想到了這個問題,所以他們做了一個dashborad,可以查看hystrix的上報信息,使用方法只需在服務(wù)啟動時添加如下代碼:

  1. hystrixStreamHandler := hystrix.NewStreamHandler() 
  2. hystrixStreamHandler.Start() 
  3. go http.ListenAndServe(net.JoinHostPort("""81"), hystrixStreamHandler) 

然后打開瀏覽器:http://127.0.0.1:81/hystrix-dashboard,進行觀測吧。

總結(jié)

故事終于接近尾聲了,一個熔斷機制的實現(xiàn)確實不簡單,要考慮的因素也是方方面面,尤其在微服務(wù)架構(gòu)下,熔斷機制是必不可少的,不僅要在框架層面實現(xiàn)熔斷機制,還要根據(jù)具體業(yè)務(wù)場景使用熔斷機制,這些都是值得我們深思熟慮的。本文介紹的熔斷框架實現(xiàn)的還是比較完美的,這種優(yōu)秀的設(shè)計思路值得我們學習。

 

文中代碼已上傳github:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/hystrix_demo,歡迎star。

 

責任編輯:武曉燕 來源: Golang夢工廠
相關(guān)推薦

2020-09-26 10:56:33

服務(wù)器熔斷服務(wù)隔離

2025-03-13 00:55:00

微服務(wù)架構(gòu)系統(tǒng)

2022-01-17 10:55:50

微服務(wù)API網(wǎng)關(guān)

2020-07-28 08:32:57

微服務(wù)API網(wǎng)關(guān)熔斷

2017-07-03 09:50:07

Spring Clou微服務(wù)架構(gòu)

2018-12-06 14:56:46

微服務(wù)隔離熔斷

2017-07-04 17:35:46

微服務(wù)架構(gòu)Spring Clou

2024-06-05 06:43:20

2020-11-27 10:50:06

微服務(wù)架構(gòu)框架

2021-03-05 11:09:46

Go框架微服務(wù)

2025-01-20 00:10:00

Go語言Kratos

2023-12-13 07:19:01

微服務(wù)架構(gòu)Golang

2024-06-27 10:50:01

2024-04-09 07:27:06

微服務(wù)架構(gòu)YAML

2024-12-23 00:22:55

2025-01-13 00:00:07

Go語言微服務(wù)

2017-07-17 15:50:17

微服務(wù)Docker架構(gòu)

2021-06-22 18:00:09

微服務(wù)架構(gòu)系統(tǒng)

2022-05-13 09:05:49

Hystrix熔斷器

2024-12-30 00:38:23

Go語言微服務(wù)
點贊
收藏

51CTO技術(shù)棧公眾號