自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

在沒有鎖,條件變量,回調(diào)情況下,如何編寫高效異步并發(fā)的Go程序

開發(fā) 前端
下面我們針對他給出的Case做一些說明與總結(jié),同時對Go語言編程的模式技巧進行總結(jié),換句話就是說想提煉出面向場景的Go語言編程的八股模式。

背景

不用鎖,條件變量,回調(diào)的話,還怎么寫并發(fā)程序啊,谷歌大佬Sameer給了大家一個思路。"Advanced Go Concurrency Patterns" by Sameer Ajmani: 這篇博客深入研究了 Golang 中的并發(fā)模式,并討論了如何使用它們來構(gòu)建高性能系統(tǒng)。它包括一些示例和實踐建議,幫助讀者更好地理解和實踐這些概念。下面我們針對他給出的case做一些說明與總結(jié),同時對go語言編程的模式技巧進行總結(jié),換句話就是說想提煉出面向場景的go語言編程的八股模式。

select-loop的編程關鍵要素

  1. 如何處理事件
  2. 如何處理元素
  3. 如何關閉退出

代碼示例:

核心結(jié)構(gòu)與接口

下面代碼給出了核心結(jié)構(gòu)sub,以及它實現(xiàn)了接口subscription的關鍵代碼。

  1. updates屬性是一個通道,用于用戶對元素進行處理。
  2. fetcher是用于獲取元素的客戶端,它可以是從數(shù)據(jù)庫讀取,也可以是從消息隊列讀取。
  3. closing用于關閉退出select-loop主體.

// sub implements the Subscription interface.
type sub struct {
fetcher Fetcher // fetches items
updates chan Item // sends items to the user
closing chan chan error // for Close
}

func (s *sub) Updates() <-chan Item {
return s.updates
}

func (s *sub) Close() error {
errc := make(chan error)
s.closing <- errc // 向closing通道中同步寫入errc
return <-errc // 等待主loop返回
}

// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
s := &sub{
fetcher: fetcher,
updates: make(chan Item), // for Updates
closing: make(chan chan error), // for Close
}
go s.loop()
return s
}

sub的核心處理邏輯

// loop periodically fecthes Items, sends them on s.updates, and exits
// when Close is called. It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
const maxPending = 10
type fetchResult struct {
fetched []Item
next time.Time
err error
}
var fetchDone chan fetchResult // if non-nil, Fetch is running
var pending []Item
var next time.Time
var err error
var seen = make(map[string]bool)
for {
var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
//等待隊列長度未超過最大設置且fetchDone是空,即元素已經(jīng)都入隊列了
// 設置fetchDelay時間后,startFetch通道有值
startFetch = time.After(fetchDelay)
}
var first Item
var updates chan Item
if len(pending) > 0 {
first = pending[0]
updates = s.updates // updates通道是為了用戶進一步消費的
}
select {
case <-startFetch:
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
// Use result.fetched, result.next, result.err
fetched := result.fetched
next, err = result.next, result.err
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
for _, item := range fetched {
if id := item.GUID; !seen[id] {
pending = append(pending, item)
seen[id] = true
}
}
case errc := <-s.closing:
errc <- err
close(s.updates)
return
case updates <- first:
pending = pending[1:]
}
}
}

那么上面的代碼是如何處理三個關鍵問題的呢?

  • 首先關于關閉并退出loop

上述代碼通過監(jiān)聽sub結(jié)構(gòu)的closing屬性,實現(xiàn)退出。

//Close asks loop to exit and waits for a response.
func (s *sub) Close() error {
errc := make(chan error)
s.closing <- errc
return <-errc
}

當調(diào)用sub的Close方法時,s.closing會接收一個errc的通道,loop主體向errc中寫入error信息并退出,調(diào)用sub的Close方法的客戶端從errc中也同步收到error信息。這是一個同步關閉的過程。loop主體可以在給客戶端發(fā)送error信息之前,可以完成一系列的關閉清理工作。

  • 關于事件處理與調(diào)度

程序中設置的下一次獲取元素的延遲調(diào)度的最小單位是10秒,從下面第22行可以看到,如果獲取元素很快,沒有耗費10秒,那么fetchDelay便有個時間gap,startFetch(第7行)這個時間通道便會通過time.After這個方法,在fetchDelay時間后,收到信號,完成18到25行的獲取元素工作。

var pending []Item // appended by fetch; consumed by send
var next time.Time // initially January 1, year 0
var err error
for {
var fetchDelay time.Duration // initially 0 (no delay)
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
startFetch := time.After(fetchDelay)

select {
case <-startFetch:
var fetched []Item
fetched, next, err = s.fetcher.Fetch()
if err != nil {
next = time.Now().Add(10 * time.Second)
break
}
pending = append(pending, fetched...)

}
}

問題:為了防止等待隊列過大,所以只有當長度不超過maxPending,并且獲取的數(shù)據(jù)已經(jīng)入隊了的時候,才會設置startFetch,否則就不觸發(fā)fetch。這塊可以結(jié)合上面整個代碼看看。

var fetchDelay time.Duration
if now := time.Now(); next.After(now) {
fetchDelay = next.Sub(now)
}
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // enable fetch case
}

問題: Loop blocks on Fetch。

golang有個特性,就是Sends and receives on nil channels block.利用這個特性,當fetchDone是nil或者他里面沒有準備好結(jié)果的時候,相關的case都會阻塞,那么select也不會選擇它。同時為了防止fetch函數(shù)阻塞loop主函數(shù),通過啟動協(xié)程(下面9-12行),再次提升主loop的性能。

type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // if non-nil, Fetch is running
var startFetch <-chan time.Time
if fetchDone == nil && len(pending) < maxPending {
startFetch = time.After(fetchDelay) // enable fetch case
}
select {
case <-startFetch:
fetchDone = make(chan fetchResult, 1)
go func() {
fetched, next, err := s.fetcher.Fetch()
fetchDone <- fetchResult{fetched, next, err}
}()
case result := <-fetchDone:
fetchDone = nil
// Use result.fetched, result.next, result.err

總結(jié)

上面用到了3個技巧,如下所示:

  • for-select loop
  • service channel, reply channels (chan chan error)
  • nil channels in select cases

通過err,next,pending三個變量,就實現(xiàn)了在沒有鎖,條件變量,回調(diào)情況下,編寫高效并發(fā)go程序的需求。

參考文獻:

??https://go.dev/talks/2013/advconc.slide#43。??

責任編輯:姜華 來源: 今日頭條
相關推薦

2023-03-27 13:00:13

Javascript前端

2023-05-09 08:28:44

Go語言并發(fā)編程

2023-03-09 08:17:46

Google存儲設備

2021-10-26 15:59:18

WiFi 6WiFi 5通信網(wǎng)絡

2019-12-12 15:32:48

ITvCenterVMware

2019-02-27 12:00:09

開源Org模式Emacs

2023-09-14 09:27:19

Java系統(tǒng)

2025-02-13 09:45:31

2018-07-31 16:20:12

Windows 10Windows密碼

2022-07-31 23:55:23

區(qū)塊鏈加密貨幣代幣

2020-06-12 10:03:01

線程安全多線程

2020-04-02 11:16:28

Linux進程高并發(fā)

2022-02-12 00:05:53

開發(fā)軟件硬件

2022-11-15 11:02:21

2019-09-10 08:00:00

微軟Account網(wǎng)站瀏覽器

2022-04-18 07:56:43

分段鎖多線程嵌入式開發(fā)

2010-06-30 10:55:13

SQL Server日

2020-09-18 06:36:21

Linuxkernel高并發(fā)

2023-05-18 08:38:13

Java鎖機制

2016-11-23 16:08:24

Python處理器分布式系統(tǒng)
點贊
收藏

51CTO技術(shù)棧公眾號