自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Go:有了 Sync 為什么還有 Atomic?

開發(fā) 后端
Go 是一種擅長并發(fā)的語言,啟動新的 goroutine 就像輸入 “go” 一樣簡單。隨著你發(fā)現(xiàn)自己構(gòu)建的系統(tǒng)越來越復(fù)雜,正確保護對共享資源的訪問以防止競爭條件變得極其重要。此類資源可能包括可即時更新的配置(例如功能標志)、內(nèi)部狀態(tài)(例如斷路器狀態(tài))等。

[[438792]]

Go 是一種擅長并發(fā)的語言,啟動新的 goroutine 就像輸入 “go” 一樣簡單。隨著你發(fā)現(xiàn)自己構(gòu)建的系統(tǒng)越來越復(fù)雜,正確保護對共享資源的訪問以防止競爭條件變得極其重要。此類資源可能包括可即時更新的配置(例如功能標志)、內(nèi)部狀態(tài)(例如斷路器狀態(tài))等。

01 什么是競態(tài)條件?

對于大多數(shù)讀者來說,這可能是基礎(chǔ)知識,但由于本文的其余部分取決于對競態(tài)條件的理解,因此有必要進行簡短的復(fù)習。競態(tài)條件是一種情況,在這種情況下,程序的行為取決于其他不可控事件的順序或時間。在大多數(shù)情況下,這種情況是一個錯誤,因為可能會發(fā)生不希望的結(jié)果。

舉個具體的例子或許更容易理解:

  1. // race_condition_test.go 
  2. package main 
  3.  
  4. import ( 
  5.  "fmt" 
  6.  "sort" 
  7.  "sync" 
  8.  "testing" 
  9.  
  10. func Test_RaceCondition(t *testing.T) { 
  11.  var s = make([]int, 0) 
  12.  
  13.  wg := sync.WaitGroup{} 
  14.  
  15.  // spawn 10 goroutines to modify the slice in parallel 
  16.  for i := 0; i < 10; i++ { 
  17.   wg.Add(1) 
  18.   go func(i int) { 
  19.    defer wg.Done() 
  20.    s = append(s, i) //add a new item to the slice 
  21.   }(i) 
  22.  } 
  23.  
  24.  wg.Wait() 
  25.   
  26.  sort.Ints(s) //sort the response to have comparable results 
  27.  fmt.Println(s) 

執(zhí)行一:

  1. $ go test -v race_condition_test.go 
  2. === RUN   Test_RaceCondition 
  3. [0 1 2 3 4 5 6 7 8 9] 
  4. --- PASS: Test_RaceCondition (0.00s) 

這里看起來一切都很好。這是我們預(yù)期的輸出。該程序迭代了 10 次,并在每次迭代時將索引添加到切片中。

執(zhí)行二:

  1. === RUN Test_RaceCondition 
  2.  
  3. [0 3] 
  4.  
  5. --- PASS: Test_RaceCondition (0.00s) 

等等,這里發(fā)生了什么?這次我們的響應(yīng)切片中只有兩個元素。這是因為切片的內(nèi)容 s 在加載和修改之間發(fā)生了變化,導(dǎo)致程序覆蓋了一些結(jié)果。這種特殊的競態(tài)條件是由數(shù)據(jù)競爭引起的,在這種情況下,多個 goroutine 嘗試同時訪問特定的共享變量,并且這些 goroutine 中的至少一個嘗試修改它。(注意,以上結(jié)果并非一定如此,每次運行結(jié)果可能都不相同)

如果你使用 -race 標志執(zhí)行測試,go 甚至會告訴你存在數(shù)據(jù)競爭并幫助你準確定位:

  1. $ go test race_condition_test.go -race 
  2.  
  3. ================== 
  4. WARNING: DATA RACE 
  5. Read at 0x00c000132048 by goroutine 9: 
  6.   command-line-arguments.Test_RaceCondition.func1() 
  7.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0xb4 
  8.   command-line-arguments.Test_RaceCondition·dwrap·1() 
  9.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47 
  10.  
  11. Previous write at 0x00c000132048 by goroutine 8: 
  12.   command-line-arguments.Test_RaceCondition.func1() 
  13.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:20 +0x136 
  14.   command-line-arguments.Test_RaceCondition·dwrap·1() 
  15.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:21 +0x47 
  16.  
  17. Goroutine 9 (running) created at
  18.   command-line-arguments.Test_RaceCondition() 
  19.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5 
  20.   testing.tRunner() 
  21.       /usr/local/go/src/testing/testing.go:1259 +0x22f 
  22.   testing.(*T).Run·dwrap·21() 
  23.       /usr/local/go/src/testing/testing.go:1306 +0x47 
  24.  
  25. Goroutine 8 (finished) created at
  26.   command-line-arguments.Test_RaceCondition() 
  27.       /home/sfinlay/go/src/benchmarks/race_condition_test.go:18 +0xc5 
  28.   testing.tRunner() 
  29.       /usr/local/go/src/testing/testing.go:1259 +0x22f 
  30.   testing.(*T).Run·dwrap·21() 
  31.       /usr/local/go/src/testing/testing.go:1306 +0x47 
  32. ================== 

02 并發(fā)控制

保護對這些共享資源的訪問通常涉及常見的內(nèi)存同步機制,例如通道或互斥鎖。

這是將競態(tài)條件調(diào)整為使用互斥鎖的相同測試用例:

  1. func Test_NoRaceCondition(t *testing.T) { 
  2.  var s = make([]int, 0) 
  3.  
  4.  m := sync.Mutex{} 
  5.  wg := sync.WaitGroup{} 
  6.  
  7.  // spawn 10 goroutines to modify the slice in parallel 
  8.  for i := 0; i < 10; i++ { 
  9.   wg.Add(1) 
  10.   go func(i int) { 
  11.    m.Lock() 
  12.    defer wg.Done() 
  13.    defer m.Unlock() 
  14.    s = append(s, i) 
  15.   }(i) 
  16.  } 
  17.  
  18.  wg.Wait() 
  19.  
  20.  sort.Ints(s) //sort the response to have comparable results 
  21.  fmt.Println(s) 

這次它始終返回所有 10 個整數(shù),因為它確保每個 goroutine 僅在沒有其他人執(zhí)行時才讀寫切片。如果第二個 goroutine 同時嘗試獲取鎖,它必須等到前一個 goroutine 完成(即直到它解鎖)。

然而,對于高吞吐量系統(tǒng),性能變得非常重要,因此減少鎖爭用(即一個進程或線程試圖獲取另一個進程或線程持有的鎖的情況)變得更加重要。執(zhí)行此操作的最基本方法之一是使用讀寫鎖 ( sync.RWMutex) 而不是標準 sync.Mutex,但是 Go 還提供了一些原子內(nèi)存原語即 atomic 包。

03 原子

Go 的 atomic 包提供了用于實現(xiàn)同步算法的低級原子內(nèi)存原語。這聽起來像是我們需要的東西,所以讓我們嘗試用 atomic 重寫該測試:

  1. import "sync/atomic" 
  2.  
  3. func Test_RaceCondition_Atomic(t *testing.T) { 
  4.  var s = atomic.Value{} 
  5.  s.Store([]int{}) // store empty slice as the base 
  6.  
  7.  wg := sync.WaitGroup{} 
  8.  
  9.  // spawn 10 goroutines to modify the slice in parallel 
  10.  for i := 0; i < 10; i++ { 
  11.   wg.Add(1) 
  12.   go func(i int) { 
  13.    defer wg.Done() 
  14.    s1 := s.Load().([]int
  15.    s.Store(append(s1, i)) //replace the slice with a new one containing the new item 
  16.   }(i) 
  17.  } 
  18.  
  19.  wg.Wait() 
  20.  
  21.  s1 := s.Load().([]int
  22.  sort.Ints(s1) //sort the response to have comparable results 
  23.  fmt.Println(s1) 

執(zhí)行結(jié)果:

  1. === RUN Test_RaceCondition_Atomic 
  2.  
  3. [1 3] 
  4.  
  5. --- PASS: Test_RaceCondition_Atomic (0.00s) 

什么?這和我們之前遇到的問題完全一樣,那么這個包有什么好處呢?

04 讀取-復(fù)制-更新

atomic 不是靈丹妙藥,它顯然不能替代互斥鎖,但是當涉及到可以使用讀取-復(fù)制-更新[1]模式管理的共享資源時,它非常出色。在這種技術(shù)中,我們通過引用獲取當前值,當我們想要更新它時,我們不修改原始值,而是替換指針(因此沒有人訪問另一個線程可能訪問的相同資源)。前面的示例無法使用此模式實現(xiàn),因為它應(yīng)該隨著時間的推移擴展現(xiàn)有資源而不是完全替換其內(nèi)容,但在許多情況下,讀取-復(fù)制-更新是完美的。

這是一個基本示例,我們可以在其中獲取和存儲布爾值(例如,對于功能標志很有用)。在這個例子中,我們正在執(zhí)行一個并行基準測試,比較原子和讀寫互斥:

  1. package main 
  2.  
  3. import ( 
  4.  "sync" 
  5.  "sync/atomic" 
  6.  "testing" 
  7.  
  8. type AtomicValue struct{ 
  9.  value atomic.Value 
  10.  
  11. func (b *AtomicValue) Get() bool { 
  12.  return b.value.Load().(bool) 
  13.  
  14. func (b *AtomicValue) Set(value bool) { 
  15.  b.value.Store(value) 
  16.  
  17. func BenchmarkAtomicValue_Get(b *testing.B) { 
  18.  atomB := AtomicValue{} 
  19.  atomB.value.Store(false
  20.  
  21.  b.RunParallel(func(pb *testing.PB) { 
  22.   for pb.Next() { 
  23.    atomB.Get() 
  24.   } 
  25.  }) 
  26.  
  27. /************/ 
  28.  
  29. type MutexBool struct { 
  30.  mutex sync.RWMutex 
  31.  flag  bool 
  32.  
  33. func (mb *MutexBool) Get() bool { 
  34.  mb.mutex.RLock() 
  35.  defer mb.mutex.RUnlock() 
  36.  return mb.flag 
  37.  
  38. func BenchmarkMutexBool_Get(b *testing.B) { 
  39.  mb := MutexBool{flag: true
  40.  
  41.  b.RunParallel(func(pb *testing.PB) { 
  42.   for pb.Next() { 
  43.    mb.Get() 
  44.   } 
  45.  }) 

結(jié)果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz 
  2. BenchmarkAtomicValue_Get 
  3. BenchmarkAtomicValue_Get-8    1000000000          0.5472 ns/op 
  4. BenchmarkMutexBool_Get 
  5. BenchmarkMutexBool_Get-8      24966127            48.80 ns/op 

結(jié)果很清楚。atomic 的速度提高了 89 倍以上。并且可以通過使用更原始的類型來進一步改進:

  1. type AtomicBool struct{ flag int32 } 
  2.  
  3. func (b *AtomicBool) Get() bool { 
  4.  return atomic.LoadInt32(&(b.flag)) != 0 
  5.  
  6. func (b *AtomicBool) Set(value bool) { 
  7.  var i int32 = 0 
  8.  if value { 
  9.   i = 1 
  10.  } 
  11.  atomic.StoreInt32(&(b.flag), int32(i)) 
  12.  
  13. func BenchmarkAtomicBool_Get(b *testing.B) { 
  14.  atomB := AtomicBool{flag: 1} 
  15.  
  16.  b.RunParallel(func(pb *testing.PB) { 
  17.   for pb.Next() { 
  18.    atomB.Get() 
  19.   } 
  20.  }) 
  21. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz 
  22. BenchmarkAtomicBool_Get 
  23. BenchmarkAtomicBool_Get-8     1000000000          0.3161 ns/op 

此版本比互斥鎖版本快 154 倍以上。

寫操作也顯示出明顯的差異(盡管規(guī)模并不那么令人印象深刻):

  1. func BenchmarkAtomicBool_Set(b *testing.B) { 
  2.  atomB := AtomicBool{flag: 1} 
  3.  
  4.  b.RunParallel(func(pb *testing.PB) { 
  5.   for pb.Next() { 
  6.    atomB.Set(true
  7.   } 
  8.  }) 
  9.  
  10. /************/ 
  11.  
  12. func BenchmarkAtomicValue_Set(b *testing.B) { 
  13.  atomB := AtomicValue{} 
  14.  atomB.value.Store(false
  15.  
  16.  b.RunParallel(func(pb *testing.PB) { 
  17.   for pb.Next() { 
  18.    atomB.Set(true
  19.   } 
  20.  }) 
  21.  
  22. /************/ 
  23.  
  24. func BenchmarkMutexBool_Set(b *testing.B) { 
  25.  mb := MutexBool{flag: true
  26.  
  27.  b.RunParallel(func(pb *testing.PB) { 
  28.   for pb.Next() { 
  29.    mb.Set(true
  30.   } 
  31.  }) 

結(jié)果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz 
  2. BenchmarkAtomicBool_Set 
  3. BenchmarkAtomicBool_Set-8     64624705         16.79 ns/op 
  4. BenchmarkAtomicValue_Set 
  5. BenchmarkAtomicValue_Set-8    47654121         26.43 ns/op 
  6. BenchmarkMutexBool_Set 
  7. BenchmarkMutexBool_Set-8      20124637         66.50 ns/op 

在這里我們可以看到 atomic 在寫入時比在讀取時慢得多,但仍然比互斥鎖快得多。有趣的是,我們可以看到互斥鎖讀取和寫入之間的差異不是很明顯(慢 30%)。盡管如此, atomic 仍然表現(xiàn)得更好(比互斥鎖快 2-4 倍)。

05 為什么 atomic 這么快?

簡而言之,原子操作很快,因為它們依賴于原子 CPU 指令而不是依賴外部鎖。使用互斥鎖時,每次獲得鎖時,goroutine 都會短暫暫?;蛑袛?,這種阻塞占使用互斥鎖所花費時間的很大一部分。原子操作可以在沒有任何中斷的情況下執(zhí)行。

06 atomic 總是答案嗎?

正如我們在一個早期示例中已經(jīng)證明的那樣,atomic 無法解決所有問題,某些操作只能使用互斥鎖來解決。

考慮以下示例,該示例演示了我們使用 map 作為內(nèi)存緩存的常見模式:

  1. package main 
  2.  
  3. import ( 
  4.  "sync" 
  5.  "sync/atomic" 
  6.  "testing" 
  7.  
  8. //Don't use this implementation! 
  9. type AtomicCacheMap struct { 
  10.  value atomic.Value //map[int]int 
  11.  
  12. func (b *AtomicCacheMap) Get(key intint { 
  13.  return b.value.Load().(map[int]int)[key
  14.  
  15. func (b *AtomicCacheMap) Set(key, value int) { 
  16.  oldMap := b.value.Load().(map[int]int
  17.  newMap := make(map[int]int, len(oldMap)+1) 
  18.  for k, v := range oldMap { 
  19.   newMap[k] = v 
  20.  } 
  21.  newMap[key] = value 
  22.  b.value.Store(newMap) 
  23.  
  24. func BenchmarkAtomicCacheMap_Get(b *testing.B) { 
  25.  atomM := AtomicCacheMap{} 
  26.  atomM.value.Store(testMap) 
  27.  
  28.  b.RunParallel(func(pb *testing.PB) { 
  29.   for pb.Next() { 
  30.    atomM.Get(0) 
  31.   } 
  32.  }) 
  33.  
  34. func BenchmarkAtomicCacheMap_Set(b *testing.B) { 
  35.  atomM := AtomicCacheMap{} 
  36.  atomM.value.Store(testMap) 
  37.  
  38.  var i = 0 
  39.  b.RunParallel(func(pb *testing.PB) { 
  40.   for pb.Next() { 
  41.    atomM.Set(i, i) 
  42.    i++ 
  43.   } 
  44.  }) 
  45.  
  46. /************/ 
  47.  
  48. type MutexCacheMap struct { 
  49.  mutex sync.RWMutex 
  50.  value map[int]int 
  51.  
  52. func (mm *MutexCacheMap) Get(key intint { 
  53.  mm.mutex.RLock() 
  54.  defer mm.mutex.RUnlock() 
  55.  return mm.value[key
  56.  
  57. func (mm *MutexCacheMap) Set(key, value int) { 
  58.  mm.mutex.Lock() 
  59.  defer mm.mutex.Unlock() 
  60.  mm.value[key] = value 
  61.  
  62. func BenchmarkMutexCacheMap_Get(b *testing.B) { 
  63.  mb := MutexCacheMap{value: testMap} 
  64.  
  65.  b.RunParallel(func(pb *testing.PB) { 
  66.   for pb.Next() { 
  67.    mb.Get(0) 
  68.   } 
  69.  }) 
  70.  
  71. func BenchmarkMutexCacheMap_Set(b *testing.B) { 
  72.  mb := MutexCacheMap{value: testMap} 
  73.  
  74.  var i = 0 
  75.  b.RunParallel(func(pb *testing.PB) { 
  76.   for pb.Next() { 
  77.    mb.Set(i, i) 
  78.    i++ 
  79.   } 
  80.  }) 

結(jié)果:

  1. cpu: Intel(R) Core(TM) i7-8650U CPU @ 1.90GHz 
  2. BenchmarkAtomicCacheMap_Get 
  3. BenchmarkAtomicCacheMap_Get-8    301664540           4.194 ns/op 
  4. BenchmarkAtomicCacheMap_Set 
  5. BenchmarkAtomicCacheMap_Set-8       87637            95889 ns/op 
  6. BenchmarkMutexCacheMap_Get 
  7. BenchmarkMutexCacheMap_Get-8     20000959            54.63 ns/op 
  8. BenchmarkMutexCacheMap_Set 
  9. BenchmarkMutexCacheMap_Set-8      5012434            267.2 ns/op 

哎呀,這種表現(xiàn)是痛苦的。這意味著,當必須復(fù)制大型結(jié)構(gòu)時,atomic 的性能非常差。不僅如此,此代碼還包含競態(tài)條件。就像本文開頭的切片案例一樣,原子緩存示例具有競態(tài)條件,其中可能會在復(fù)制 map 和存儲 map 的時間之間添加新的緩存條目,在這種情況下,新條目將丟失。在這種情況下,該 -race 標志不會檢測到任何數(shù)據(jù)競爭,因為沒有對同一 map 的并發(fā)訪問。

07 注意事項

Go 的文檔[2]警告了 atomic 包的潛在誤用:

這些函數(shù)需要非常小心才能正確使用。除了特殊的低級應(yīng)用程序,同步最好使用通道或 sync 包的工具來完成。通過通信共享內(nèi)存;不要通過共享內(nèi)存進行通信。

開始使用 atomic 包時,你可能會遇到的第一個問題是:

  1. panic: sync/atomic: store of inconsistently typed value into Value 

使用 atomic.Store,確保每次調(diào)用方法時都存儲完全相同的類型很重要。這聽起來很容易,但通常并不像聽起來那么簡單:

  1. package main 
  2.  
  3. import ( 
  4.  "fmt" 
  5.  "sync/atomic" 
  6.  
  7. //Our own custom error type which implements the error interface 
  8. type CustomError struct { 
  9.  Code    int 
  10.  Message string 
  11.  
  12. func (e CustomError) Error() string { 
  13.  return fmt.Sprintf("%d: %s", e.Code, e.Message) 
  14.  
  15. func InternalServerError(msg string) error { 
  16.  return CustomError{Code: 500, Message: msg} 
  17.  
  18. func main() { 
  19.  var ( 
  20.   err1 error = fmt.Errorf("error happened"
  21.   err2 error = InternalServerError("another error happened"
  22.  ) 
  23.  
  24.  errVal := atomic.Value{} 
  25.  errVal.Store(err1) 
  26.  errVal.Store(err2) //panics here 

兩個值都是 error 類型是不夠的,因為它們只是實現(xiàn)了錯誤接口。它們的具體類型仍然不同,因此 atomic 不喜歡它。

08 總結(jié)

競態(tài)條件很糟糕,應(yīng)該保護對共享資源的訪問?;コ怏w很酷,但由于鎖爭用而趨于緩慢。對于某些讀取-復(fù)制-更新模式有意義的情況(這往往是動態(tài)配置之類的東西,例如特性標志、日志級別或 map 或結(jié)構(gòu)體,一次填充例如通過 JSON 解析等),尤其是當讀取次數(shù)比寫入次數(shù)多時。atomic 通常不應(yīng)用于其他用例(例如,隨時間增長的變量,如緩存),并且該特性的使用需要非常小心。

可能最重要的方法是將鎖保持在最低限度,如果你在在考慮原子等替代方案,請務(wù)必在投入生產(chǎn)之前對其進行廣泛的測試和試驗。

原文鏈接:https://www.sixt.tech/golangs-atomic

參考資料

[1]讀取-復(fù)制-更新: https://en.wikipedia.org/wiki/Read-copy-update

[2]文檔: https://pkg.go.dev/sync/atomic

本文轉(zhuǎn)載自微信公眾號「幽鬼」,可以通過以下二維碼關(guān)注。轉(zhuǎn)載本文請聯(lián)系幽鬼公眾號。

 

責任編輯:武曉燕 來源: 幽鬼
相關(guān)推薦

2016-09-27 21:25:08

Go語言Ken Thompso

2021-04-09 09:55:55

DockerGoLinux

2022-06-07 08:39:35

RPCHTTP

2020-11-25 09:36:17

HTTPRPC遠程

2019-08-05 14:23:43

DockerKubernetes容器

2024-07-11 10:41:07

HTTPSHTTP文本傳輸協(xié)議

2021-11-12 07:21:51

Go線程安全

2023-09-12 14:02:30

數(shù)組vector

2023-12-11 12:03:14

Python工具元組

2023-01-12 09:01:01

MongoDBMySQL

2017-01-18 09:42:11

Go

2020-04-07 16:12:56

Go編程語言開發(fā)

2020-02-27 21:03:30

調(diào)度器架構(gòu)效率

2023-10-24 15:15:26

HTTPWebSocket

2024-04-16 08:26:18

IP地址MAC地址

2021-12-20 10:30:33

forforEach前端

2022-09-13 08:44:02

IP網(wǎng)絡(luò)MAC地址

2021-10-12 18:48:07

HTTP 協(xié)議Websocket網(wǎng)絡(luò)通信

2022-01-12 20:04:09

網(wǎng)絡(luò)故障斷網(wǎng)事件網(wǎng)絡(luò)安全

2022-01-12 09:31:18

Go 變量方式
點贊
收藏

51CTO技術(shù)棧公眾號