Golang 優(yōu)雅關(guān)閉 gRPC 實(shí)踐
問題
我在上次做技術(shù)支持的時候,遇到了一個有趣的錯誤。我們的服務(wù)在 Kubernetes 上運(yùn)行,有一個容器在重啟時不斷出現(xiàn)以下錯誤信息--"Error bind: address already in use"。對于大多數(shù)程序員來說,這是一個非常熟悉的錯誤信息,表明一個進(jìn)程正試圖綁定到另一個進(jìn)程正在使用的端口上。
一、背景
我的團(tuán)隊(duì)維護(hù)一個 Go 服務(wù),啟動時會在各自的 goroutine 中生成大量不同的 gRPC 服務(wù)。
Goroutine[2] - Go 運(yùn)行時管理的輕量級線程,運(yùn)行時只需要幾 KB 內(nèi)存,是 Go 并發(fā)性的基礎(chǔ)。
以下是我們服務(wù)架構(gòu)的簡化版本,以及以前啟動和停止服務(wù)器時所執(zhí)行的任務(wù)。
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(stopChan <-chan struct{}){
go ServerA.Run(stopChan)
go ServerB.Run(stopChan)
<- stopChan
}
func main() {
stopChan := make(chan struct{})
server := NewServer()
server.Start(stopChan)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
close(stopChan)
}
package internal
type ServerA struct {
stopChan <-chan struct{}
}
// Start runs each of the grpc servers
func (s *ServerA) Run(stopChan <-chan struct{}){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
// handle error
}
for {
err := grpcServer.Serve(listener)
if err != nil {
return
}
}
<- stopChan
grpcServer.Stop() // Gracefully terminate connections and close listener
}
我首先想到這可能是 Docker 或 Kubernetes 運(yùn)行時的某種偶發(fā)性錯誤。這個錯誤讓我覺得很奇怪,原因如下:1.)查看代碼,我們似乎確實(shí)在主程序退出時關(guān)閉了所有監(jiān)聽,端口怎么可能在重啟時仍在使用?2.)錯誤信息持續(xù)出現(xiàn)了幾個小時,以至于需要人工干預(yù)。我原以為在最壞情況下,操作系統(tǒng)會在嘗試重啟容器之前為我們清理資源?;蛟S是清理速度不夠快?
團(tuán)隊(duì)成員建議我們再深入調(diào)查一下。
二、解決方案
經(jīng)過仔細(xì)研究,發(fā)現(xiàn)我們的代碼實(shí)際上存在一些問題...
1. 通道(Channel)與上下文(Context)
通道用于在程序之間發(fā)送信號,通常以一對一的方式使用,當(dāng)一個值被發(fā)送到某個通道時,只能從該通道讀取一次。在我們的代碼中,使用的是一對多模式。我們將在 main 中創(chuàng)建的通道傳遞給多個不同的 goroutine,每個 goroutine 都在等待 main 關(guān)閉通道,以便知道何時運(yùn)行清理函數(shù)。
從 Go 1.7 開始,上下文被認(rèn)為是向多個 goroutine 廣播信號的標(biāo)準(zhǔn)方式。雖然這可能不是我們遇到問題的根本原因(我們是在等待通道關(guān)閉,而不是試圖讓每個 goroutine 從通道中讀取相同的值),但考慮到這是最佳實(shí)踐,還是希望采用這種模式。
以下是從通道切換到上下文后更新的代碼。
package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
stopServer context.CancelFunc
serverCtx context.Context
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
// create new context from parent context
s.serverCtx, stopServer := context.WithCancel(ctx)
go ServerA.Run(s.serverCtx)
go ServerB.Run(s.serverCtx)
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
2. 基于等待組(WaitGroup)的優(yōu)雅停機(jī)
雖然我們通過取消主上下文向 goroutine 發(fā)出了退出信號,但并沒有等待它們完成工作。當(dāng)主程序收到退出信號時,即使我們發(fā)送了取消信號,也不能保證它會等待生成的 goroutine 完成工作。因此我們必須明確等待每個 goroutine 完成工作,以避免任何泄漏,為此我們使用了 WaitGroup。
WaitGroup[3] 是一種計數(shù)器,用于阻止函數(shù)(或者說是 goroutine)的執(zhí)行,直到其內(nèi)部計數(shù)器變?yōu)?0。
package internal
type ServerA struct {}
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
wg.Add(1) // Add the current function to the parent's wait group
defer wg.Done() // Send "done" signal upon function exit
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
fmt.Println("ServerA has stopped")
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
wg sync.WaitGroup
stopServer context.CancelFunc
serverCtx context.Context
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
s.serverCtx, stopServer := context.WithCancel(ctx)
go ServerA.Run(s.serverCtx, &s.wg)
go ServerB.Run(s.serverCtx, &s.wg)
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
s.wg.Wait() // wait for all goroutines to exit before returning
fmt.Println("Main Server has stopped")
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
3. 基于通道的啟動信號
在測試過程中,又發(fā)現(xiàn)了一個隱藏錯誤。我們未能在接受流量之前等待所有服務(wù)端啟動,而這在測試中造成了一些誤報,即流量被發(fā)送到服務(wù)端,但沒有實(shí)際工作。為了向主服務(wù)發(fā)送所有附屬服務(wù)都已準(zhǔn)備就緒的信號,我們使用了通道。
package internal
type ServerA struct {
startChan
}
func (s *ServerA) Run(ctx context.Context, wg *sync.WaitGroup){
wg.Add(1) // Add the current function to the parent's wait group
defer wg.Done() // Send "done" signal upon function exit
go func(){
grpcServer := grpc.NewServer()
var listener net.Listener
ln, err := net.Listen("tcp", ":8080")
if err != nil {
log.Fatal("ServerA - Failed to create listener")
}
for {
err := grpcServer.Serve(listener)
if err != nil {
log.Fatal("ServerA - Failed to start server")
}
}
close(s.startChan) // Signal that we are done starting server to exit function
// Wait in the background for mina program to exit
<- ctx.Done()
// Clean up logic
grpcServer.Stop() // Gracefully terminate connections and close listener
fmt.Println("ServerA has stopped")
}()
<- s.StartChan // Wait for signal before exiting function
fmt.Println("ServerA has started")
}
package main
type GrpcServerInterface interface{
Run(stopChan chan <-struct{})
}
type Server struct {
ServerA GrpcServerIface
ServerB GrpcServerIface
wg sync.WaitGroup
stopServer context.CancelFunc
serverCtx context.Context
startChan chan <-struct{}
}
func NewServer() *Server {
return &NewServer{
ServerA: NewServerA,
ServerB: NewServerB,
startChan: make(chan <-struct{}),
}
}
// Start runs each of the grpc servers
func (s *Server) Start(ctx context.Context){
s.serverCtx, stopServer := context.WithCancel(ctx)
ServerA.Run(s.serverCtx, &s.wg)
ServerB.Run(s.serverCtx, &s.wg)
close(s.startChan)
<- s.startChan // wait for each server to Start before returning
fmt.Println("Main Server has started")
}
func (s *Server) Stop() {
s.stopServer() // close server context to signal spawned goroutines to stop
s.wg.Wait() // wait for all goroutines to exit before returning
fmt.Println("Main Server has stopped")
}
func main() {
ctx, cancel := context.withCancel()
server := NewServer()
server.Start(ctx)
// Wait for program to terminate and then signal servers to stop
ch := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-ch
cancel() // close main context on terminate signal
server.Stop() // clean up server resources
}
三、結(jié)論
不瞞你說,剛開始學(xué)習(xí) Go 時,并發(fā)會讓你頭疼不已。調(diào)試這個問題讓我有機(jī)會看到這些概念的實(shí)際用途,并強(qiáng)化了之前不確定的主題,建議你自己嘗試簡單的示例!
參考資料:
- [1]Go Concurrency — Graceful Shutdown: https:/medium.com/@goldengirlgeeks/go-graceful-shutdown-0c46e67ab9c9
- [2]Goroutine: https://go.dev/tour/concurrency/1
- [3]WaitGroup: https://www.geeksforgeeks.org/using-waitgroup-in-golang/amp