自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

Go語言異步高并發(fā)編程的秘密:無鎖,無條件變量,無回調(diào)

開發(fā) 前端
下面我們針對(duì)他給出的case做一些說明與總結(jié),同時(shí)對(duì)Go語言并發(fā)編程的語言特性與技巧進(jìn)行總結(jié),換句話就是說想提煉出面向場(chǎng)景的go語言高并發(fā)編程的八股模式

背景

在并發(fā)處理中,資源爭(zhēng)用是一個(gè)常見的問題。為了避免資源爭(zhēng)用,需要進(jìn)行優(yōu)化。以下是一些可以優(yōu)化并發(fā)處理中的資源爭(zhēng)用問題的建議:

  1. 避免鎖競(jìng)爭(zhēng):鎖競(jìng)爭(zhēng)是一種常見的資源爭(zhēng)用問題??梢酝ㄟ^減少鎖的使用,使用更細(xì)粒度的鎖,以及避免不必要的鎖競(jìng)爭(zhēng)來減少鎖競(jìng)爭(zhēng)。
  2. 使用緩存:在一些情況下,可以使用緩存來減少資源爭(zhēng)用。例如,在處理一些計(jì)算密集型的任務(wù)時(shí),可以使用緩存來避免重復(fù)計(jì)算。
  3. 使用原子操作:原子操作可以在不使用鎖的情況下實(shí)現(xiàn)資源的同步訪問。Go 語言中提供了一些原子操作,例如 atomic.AddInt32 和 atomic.LoadInt32 等,可以用于實(shí)現(xiàn)線程安全的資源訪問。
  4. 使用互斥鎖:互斥鎖是一種用于避免并發(fā)資源爭(zhēng)用的機(jī)制。在需要對(duì)資源進(jìn)行訪問的時(shí)候,可以使用互斥鎖來保證資源的獨(dú)占訪問。
  5. 使用讀寫鎖:讀寫鎖是一種特殊的互斥鎖,可以允許多個(gè)讀操作同時(shí)進(jìn)行,但是只允許一個(gè)寫操作進(jìn)行。在讀操作頻繁的場(chǎng)景下,可以使用讀寫鎖來提高并發(fā)性能。
  6. 使用條件變量:條件變量是一種用于在不同線程之間進(jìn)行協(xié)調(diào)的機(jī)制??梢允褂脳l件變量來避免不必要的資源爭(zhēng)用。例如,在一個(gè)生產(chǎn)者-消費(fèi)者模式的程序中,可以使用條件變量來協(xié)調(diào)生產(chǎn)者和消費(fèi)者之間的交互,從而避免資源爭(zhēng)用。

但是如果讓你不用鎖,條件變量,回調(diào)的話,還怎么寫并發(fā)程序啊,谷歌大佬Sameer給了大家一個(gè)思路。"Advanced Go Concurrency Patterns" by Sameer Ajmani: 這篇博客深入研究了 Golang 中的并發(fā)模式,并討論了如何使用它們來構(gòu)建高性能系統(tǒng)。它僅僅使用了Go語言的goroutine和channel便實(shí)現(xiàn)高效異步并發(fā)編程,沒有用到諸如await,context等包括鎖,條件變量,和回調(diào)函數(shù)文章包括一些示例和實(shí)踐建議,幫助讀者更好地理解和實(shí)踐這些概念。下面我們針對(duì)他給出的case做一些說明與總結(jié),同時(shí)對(duì)go語言并發(fā)編程的語言特性與技巧進(jìn)行總結(jié),換句話就是說想提煉出面向場(chǎng)景的go語言高并發(fā)編程的八股模式。

select-loop的編程關(guān)鍵要素

1.如何處理事件

2.如何處理元素

3.如何關(guān)閉退出

代碼示例:


核心結(jié)構(gòu)與接口

下面代碼給出了核心結(jié)構(gòu)sub,以及它實(shí)現(xiàn)了接口subscription的關(guān)鍵代碼。

  1. updates屬性是一個(gè)通道,用于用戶對(duì)元素進(jìn)行處理。
  2. fetcher是用于獲取元素的客戶端,它可以是從數(shù)據(jù)庫讀取,也可以是從消息隊(duì)列讀取。
  3. closing用于關(guān)閉退出select-loop主體
// sub implements the Subscription interface.
type sub struct {
	fetcher Fetcher         // fetches items
	updates chan Item       // sends items to the user
	closing chan chan error // for Close
}

func (s *sub) Updates() <-chan Item {
	return s.updates
}

func (s *sub) Close() error {
	errc := make(chan error)
	s.closing <- errc // 向closing通道中同步寫入errc
	return <-errc     // 等待主loop返回
}

// Subscribe returns a new Subscription that uses fetcher to fetch Items.
func Subscribe(fetcher Fetcher) Subscription {
	s := &sub{
		fetcher: fetcher,
		updates: make(chan Item),       // for Updates
		closing: make(chan chan error), // for Close
	}
	go s.loop()
	return s
}

sub的核心處理邏輯

// loop periodically fecthes Items, sends them on s.updates, and exits
// when Close is called.  It extends dedupeLoop with logic to run
// Fetch asynchronously.
func (s *sub) loop() {
	const maxPending = 10
	type fetchResult struct {
		fetched []Item
		next    time.Time
		err     error
	}
	var fetchDone chan fetchResult // if non-nil, Fetch is running
	var pending []Item
	var next time.Time
	var err error
	var seen = make(map[string]bool)
	for {
		var fetchDelay time.Duration
		if now := time.Now(); next.After(now) {
			fetchDelay = next.Sub(now)
		}
		var startFetch <-chan time.Time
		if fetchDone == nil && len(pending) < maxPending { 
      //等待隊(duì)列長(zhǎng)度未超過最大設(shè)置且fetchDone是空,即元素已經(jīng)都入隊(duì)列了
      // 設(shè)置fetchDelay時(shí)間后,startFetch通道有值
			startFetch = time.After(fetchDelay) 
		}
		var first Item
		var updates chan Item
		if len(pending) > 0 {
			first = pending[0]
			updates = s.updates // updates通道是為了用戶進(jìn)一步消費(fèi)的
		}
		select {
		case <-startFetch:
			fetchDone = make(chan fetchResult, 1)
			go func() {
				fetched, next, err := s.fetcher.Fetch()
				fetchDone <- fetchResult{fetched, next, err}
			}()
		case result := <-fetchDone:
			fetchDone = nil
			// Use result.fetched, result.next, result.err
			fetched := result.fetched
			next, err = result.next, result.err
			if err != nil {
				next = time.Now().Add(10 * time.Second)
				break
			}
			for _, item := range fetched {
				if id := item.GUID; !seen[id] {
					pending = append(pending, item)
					seen[id] = true
				}
			}
		case errc := <-s.closing:
			errc <- err
			close(s.updates)
			return
		case updates <- first:
			pending = pending[1:]
		}
	}
}

那么上面的代碼是如何處理三個(gè)關(guān)鍵問題的呢?

  • 首先關(guān)于關(guān)閉并退出loop

上述代碼通過監(jiān)聽sub結(jié)構(gòu)的closing屬性,實(shí)現(xiàn)退出。

//Close asks loop to exit and waits for a response.
func (s *sub) Close() error {
    errc := make(chan error)
    s.closing <- errc
    return <-errc
}

當(dāng)調(diào)用sub的Close方法時(shí),s.closing會(huì)接收一個(gè)errc的通道,loop主體向errc中寫入error信息并退出,調(diào)用sub的Close方法的客戶端從errc中也同步收到error信息。這是一個(gè)同步關(guān)閉的過程。loop主體可以在給客戶端發(fā)送error信息之前,可以完成一系列的關(guān)閉清理工作。

  • 關(guān)于事件處理與調(diào)度

程序中設(shè)置的下一次獲取元素的延遲調(diào)度的最小單位是10秒,從下面第22行可以看到,如果獲取元素很快,沒有耗費(fèi)10秒,那么fetchDelay便有個(gè)時(shí)間gap,startFetch(第7行)這個(gè)時(shí)間通道便會(huì)通過time.After這個(gè)方法,在fetchDelay時(shí)間后,收到信號(hào),完成18到25行的獲取元素工作。

var pending []Item // appended by fetch; consumed by send
    var next time.Time // initially January 1, year 0
    var err error
    for {
        var fetchDelay time.Duration // initially 0 (no delay)
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        startFetch := time.After(fetchDelay)

     select {
        case <-startFetch:
            var fetched []Item
            fetched, next, err = s.fetcher.Fetch()
            if err != nil {
                next = time.Now().Add(10 * time.Second)
                break
            }
            pending = append(pending, fetched...)
       
        }
    }

問題:為了防止等待隊(duì)列過大,所以只有當(dāng)長(zhǎng)度不超過maxPending,并且獲取的數(shù)據(jù)已經(jīng)入隊(duì)了的時(shí)候,才會(huì)設(shè)置startFetch,否則就不觸發(fā)fetch。這塊可以結(jié)合上面整個(gè)代碼看看

var fetchDelay time.Duration
        if now := time.Now(); next.After(now) {
            fetchDelay = next.Sub(now)
        }
        var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // enable fetch case
        }

問題: Loop blocks on Fetch。

golang有個(gè)特性,就是Sends and receives on nil channels block.利用這個(gè)特性,當(dāng)fetchDone是nil或者他里面沒有準(zhǔn)備好結(jié)果的時(shí)候,相關(guān)的case都會(huì)阻塞,那么select也不會(huì)選擇它。同時(shí)為了防止fetch函數(shù)阻塞loop主函數(shù),通過啟動(dòng)協(xié)程(下面9-12行),再次提升主loop的性能。

type fetchResult struct{ fetched []Item; next time.Time; err error }
var fetchDone chan fetchResult // if non-nil, Fetch is running
var startFetch <-chan time.Time
        if fetchDone == nil && len(pending) < maxPending {
            startFetch = time.After(fetchDelay) // enable fetch case
        }
select {
        case <-startFetch:
            fetchDone = make(chan fetchResult, 1)
            go func() {
                fetched, next, err := s.fetcher.Fetch()
                fetchDone <- fetchResult{fetched, next, err}
            }()
        case result := <-fetchDone:
            fetchDone = nil
            // Use result.fetched, result.next, result.err

總結(jié)

上面用到了3個(gè)技巧,如下所示:

  • for-select loop
  • service channel, reply channels (chan chan error)
  • nil channels in select cases

通過err,next,pending三個(gè)變量,就實(shí)現(xiàn)了在沒有鎖,條件變量,回調(diào)情況下,編寫高效并發(fā)go程序的需求。

責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2025-04-28 02:22:00

并發(fā)編程無鎖編程

2023-03-10 21:48:52

Go語言消息隊(duì)列

2019-08-14 15:08:51

緩存存儲(chǔ)數(shù)據(jù)

2019-11-11 15:33:34

高并發(fā)緩存數(shù)據(jù)

2022-02-08 08:12:51

無鎖編程設(shè)計(jì)

2021-07-26 07:47:37

無鎖編程CPU

2016-11-23 16:08:24

Python處理器分布式系統(tǒng)

2024-10-14 08:51:52

協(xié)程Go語言

2023-02-10 09:40:36

Go語言并發(fā)

2011-11-11 13:38:39

Jscex

2012-06-14 14:03:19

JavaScript

2021-09-13 07:23:53

KafkaGo語言

2020-04-22 10:02:48

編程高德納算法

2022-07-06 08:02:51

undo 日志數(shù)據(jù)庫

2020-12-21 09:57:33

無鎖緩存并發(fā)緩存

2013-06-06 13:10:44

HashMap無鎖

2021-09-30 09:21:28

Go語言并發(fā)編程

2025-03-24 00:25:00

Go語言并發(fā)編程

2013-12-18 15:27:21

編程無鎖

2023-08-25 09:36:43

Java編程
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號(hào)