自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Golang 優(yōu)雅關(guān)閉 gRPC 實(shí)踐

開發(fā) 后端
本文主要討論了在 Go 語言中實(shí)現(xiàn)gRPC服務(wù)優(yōu)雅關(guān)閉的技術(shù)和方法,從而確保所有連接都得到正確處理,防止數(shù)據(jù)丟失或損壞。

問題

我在上次做技術(shù)支持的時候,遇到了一個有趣的錯誤。我們的服務(wù)在 Kubernetes 上運(yùn)行,有一個容器在重啟時不斷出現(xiàn)以下錯誤信息--"Error bind: address already in use"。對于大多數(shù)程序員來說,這是一個非常熟悉的錯誤信息,表明一個進(jìn)程正試圖綁定到另一個進(jìn)程正在使用的端口上。

一、背景

我的團(tuán)隊(duì)維護(hù)一個 Go 服務(wù),啟動時會在各自的 goroutine 中生成大量不同的 gRPC 服務(wù)。

Goroutine[2] - Go 運(yùn)行時管理的輕量級線程,運(yùn)行時只需要幾 KB 內(nèi)存,是 Go 并發(fā)性的基礎(chǔ)。

以下是我們服務(wù)架構(gòu)的簡化版本,以及以前啟動和停止服務(wù)器時所執(zhí)行的任務(wù)。

package main

type GrpcServerInterface interface{
  Run(stopChan chan <-struct{})
}

type Server struct {
  ServerA GrpcServerIface
  ServerB GrpcServerIface
}

func NewServer() *Server {
  return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
  go ServerA.Run(stopChan)
  go ServerB.Run(stopChan)
  <- stopChan
}

func main() {
  stopChan := make(chan struct{})
  server := NewServer()
  server.Start(stopChan)
 
  // Wait for program to terminate and then signal servers to stop
  ch := make(chan os.Signal, 1)
  signal.Notify(c, os.Interrupt, syscall.SIGTERM)
  <-ch
  close(stopChan)
}
package internal

type ServerA struct {
  stopChan <-chan struct{}
}

// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
  grpcServer := grpc.NewServer()
  
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   // handle error
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     return 
   }
  }
  
  <- stopChan
  grpcServer.Stop() // Gracefully terminate connections and close listener
}

我首先想到這可能是 Docker 或 Kubernetes 運(yùn)行時的某種偶發(fā)性錯誤。這個錯誤讓我覺得很奇怪,原因如下:1.)查看代碼,我們似乎確實(shí)在主程序退出時關(guān)閉了所有監(jiān)聽,端口怎么可能在重啟時仍在使用?2.)錯誤信息持續(xù)出現(xiàn)了幾個小時,以至于需要人工干預(yù)。我原以為在最壞情況下,操作系統(tǒng)會在嘗試重啟容器之前為我們清理資源?;蛟S是清理速度不夠快?

團(tuán)隊(duì)成員建議我們再深入調(diào)查一下。

二、解決方案

經(jīng)過仔細(xì)研究,發(fā)現(xiàn)我們的代碼實(shí)際上存在一些問題...

1. 通道(Channel)與上下文(Context)

通道用于在程序之間發(fā)送信號,通常以一對一的方式使用,當(dāng)一個值被發(fā)送到某個通道時,只能從該通道讀取一次。在我們的代碼中,使用的是一對多模式。我們將在 main 中創(chuàng)建的通道傳遞給多個不同的 goroutine,每個 goroutine 都在等待 main 關(guān)閉通道,以便知道何時運(yùn)行清理函數(shù)。

從 Go 1.7 開始,上下文被認(rèn)為是向多個 goroutine 廣播信號的標(biāo)準(zhǔn)方式。雖然這可能不是我們遇到問題的根本原因(我們是在等待通道關(guān)閉,而不是試圖讓每個 goroutine 從通道中讀取相同的值),但考慮到這是最佳實(shí)踐,還是希望采用這種模式。

以下是從通道切換到上下文后更新的代碼。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context){
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  // create new context from parent context
  s.serverCtx, stopServer := context.WithCancel(ctx) 
  go ServerA.Run(s.serverCtx)
  go ServerB.Run(s.serverCtx)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

2. 基于等待組(WaitGroup)的優(yōu)雅停機(jī)

雖然我們通過取消主上下文向 goroutine 發(fā)出了退出信號,但并沒有等待它們完成工作。當(dāng)主程序收到退出信號時,即使我們發(fā)送了取消信號,也不能保證它會等待生成的 goroutine 完成工作。因此我們必須明確等待每個 goroutine 完成工作,以避免任何泄漏,為此我們使用了 WaitGroup。

WaitGroup[3] 是一種計數(shù)器,用于阻止函數(shù)(或者說是 goroutine)的執(zhí)行,直到其內(nèi)部計數(shù)器變?yōu)?0。

package internal

type ServerA struct {}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
  
  grpcServer := grpc.NewServer()
  var listener net.Listener
  ln, err := net.Listen("tcp", ":8080")
  if err != nil {
   log.Fatal("ServerA - Failed to create listener")
  }
  
  for {
   err := grpcServer.Serve(listener)
   if err != nil {
     log.Fatal("ServerA - Failed to start server") 
   }
  }
  
  <- ctx.Done()
  // Clean up logic 
  grpcServer.Stop() // Gracefully terminate connections and close listener
  fmt.Println("ServerA has stopped")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  go ServerA.Run(s.serverCtx, &s.wg)
  go ServerB.Run(s.serverCtx, &s.wg)
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

3. 基于通道的啟動信號

在測試過程中,又發(fā)現(xiàn)了一個隱藏錯誤。我們未能在接受流量之前等待所有服務(wù)端啟動,而這在測試中造成了一些誤報,即流量被發(fā)送到服務(wù)端,但沒有實(shí)際工作。為了向主服務(wù)發(fā)送所有附屬服務(wù)都已準(zhǔn)備就緒的信號,我們使用了通道。

package internal

type ServerA struct {
  startChan  
}

func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
  wg.Add(1) // Add the current function to the parent's wait group
  defer wg.Done() // Send "done" signal upon function exit
   
  go func(){
    grpcServer := grpc.NewServer()
    
    var listener net.Listener
    ln, err := net.Listen("tcp", ":8080")
    if err != nil {
     log.Fatal("ServerA - Failed to create listener")
    }
    
    for {
     err := grpcServer.Serve(listener)
     if err != nil {
       log.Fatal("ServerA - Failed to start server") 
     }
    }
    close(s.startChan) // Signal that we are done starting server to exit function
    // Wait in the background for mina program to exit
    <- ctx.Done()
    // Clean up logic 
    grpcServer.Stop() // Gracefully terminate connections and close listener
    fmt.Println("ServerA has stopped")
  }()
  <- s.StartChan // Wait for signal before exiting function
  fmt.Println("ServerA has started")
}
package main

type GrpcServerInterface interface{
 Run(stopChan chan <-struct{})
}

type Server struct {
 ServerA GrpcServerIface
 ServerB GrpcServerIface
 wg sync.WaitGroup
 stopServer context.CancelFunc
 serverCtx context.Context
 startChan chan <-struct{}
}

func NewServer() *Server {
 return &NewServer{
    ServerA: NewServerA,
    ServerB: NewServerB,
    startChan: make(chan <-struct{}),
  }
}

// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
  s.serverCtx, stopServer := context.WithCancel(ctx)
  ServerA.Run(s.serverCtx, &s.wg)
  ServerB.Run(s.serverCtx, &s.wg)
  close(s.startChan)
  <- s.startChan // wait for each server to Start before returning
  fmt.Println("Main Server has started")
}

func (s *Server) Stop() {
  s.stopServer() // close server context to signal spawned goroutines to stop
  s.wg.Wait()  // wait for all goroutines to exit before returning
  fmt.Println("Main Server has stopped")
}

func main() {
 ctx, cancel := context.withCancel()
 server := NewServer()
 server.Start(ctx)
 // Wait for program to terminate and then signal servers to stop
 ch := make(chan os.Signal, 1)
 signal.Notify(c, os.Interrupt, syscall.SIGTERM)
 
 <-ch
 cancel() // close main context on terminate signal
 server.Stop() // clean up server resources
}

三、結(jié)論

不瞞你說,剛開始學(xué)習(xí) Go 時,并發(fā)會讓你頭疼不已。調(diào)試這個問題讓我有機(jī)會看到這些概念的實(shí)際用途,并強(qiáng)化了之前不確定的主題,建議你自己嘗試簡單的示例!

參考資料:

  • [1]Go Concurrency — Graceful Shutdown: https:/medium.com/@goldengirlgeeks/go-graceful-shutdown-0c46e67ab9c9
  • [2]Goroutine: https://go.dev/tour/concurrency/1
  • [3]WaitGroup: https://www.geeksforgeeks.org/using-waitgroup-in-golang/amp
責(zé)任編輯:趙寧寧 來源: DeepNoMind
相關(guān)推薦

2021-09-13 05:02:49

GogRPC語言

2021-09-26 10:20:06

開發(fā)Golang代碼

2021-01-19 10:35:49

JVM場景函數(shù)

2021-04-20 08:00:31

Redisson關(guān)閉訂單支付系統(tǒng)

2021-09-01 23:29:37

Golang語言gRPC

2024-04-02 09:55:36

GolangColly開發(fā)者

2023-12-05 07:26:21

Golang項(xiàng)目結(jié)構(gòu)

2021-06-04 10:52:51

kubernetes場景容器

2017-12-19 10:03:44

JavaLinux代碼

2022-04-11 08:17:07

JVMJava進(jìn)程

2024-11-13 16:37:00

Java線程池

2022-02-20 23:15:46

gRPCGolang語言

2020-11-23 14:16:42

Golang

2024-04-28 18:24:05

2022-04-29 11:52:02

API代碼HTTP

2021-03-28 09:17:18

JVM場景鉤子函數(shù)

2024-10-21 15:39:24

2024-01-07 12:47:35

Golang流水線設(shè)計模式

2024-05-28 00:00:30

Golang數(shù)據(jù)庫

2018-12-17 16:44:49

Golang微服務(wù)
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號