一篇關(guān)于Semaphore 的原理與實現(xiàn)
本文轉(zhuǎn)載自微信公眾號「碼農(nóng)桃花源」,作者曹春暉。轉(zhuǎn)載本文請聯(lián)系碼農(nóng)桃花源公眾號。
Semaphore
數(shù)據(jù)結(jié)構(gòu)
- // Go 語言中暴露的 semaphore 實現(xiàn)
- // 具體的用法是提供 sleep 和 wakeup 原語
- // 以使其能夠在其它同步原語中的競爭情況下使用
- // 因此這里的 semaphore 和 Linux 中的 futex 目標(biāo)是一致的
- // 只不過語義上更簡單一些
- //
- // 也就是說,不要認(rèn)為這些是信號量
- // 把這里的東西看作 sleep 和 wakeup 實現(xiàn)的一種方式
- // 每一個 sleep 都會和一個 wakeup 配對
- // 即使在發(fā)生 race 時,wakeup 在 sleep 之前時也是如此
- //
- // See Mullender and Cox, ``Semaphores in Plan 9,''
- // http://swtch.com/semaphore.pdf
- // 為 sync.Mutex 準(zhǔn)備的異步信號量
- // semaRoot 持有一棵 地址各不相同的 sudog(s.elem) 的平衡樹
- // 每一個 sudog 都反過來指向(通過 s.waitlink)一個在同一個地址上等待的其它 sudog 們
- // 同一地址的 sudog 的內(nèi)部列表上的操作時間復(fù)雜度都是 O(1)。頂層 semaRoot 列表的掃描
- // 的時間復(fù)雜度是 O(log n),n 是被哈希到同一個 semaRoot 的不同地址的總數(shù),每一個地址上都會有一些 goroutine 被阻塞。
- // 訪問 golang.org/issue/17953 來查看一個在引入二級列表之前性能較差的程序樣例,test/locklinear.go
- // 中有一個復(fù)現(xiàn)這個樣例的測試
- type semaRoot struct {
- lock mutex
- treap *sudog // root of balanced tree of unique waiters.
- nwait uint32 // Number of waiters. Read w/o the lock.
- }
- // Prime to not correlate with any user patterns.
- const semTabSize = 251
- var semtable [semTabSize]struct {
- root semaRoot
- pad [sys.CacheLineSize - unsafe.Sizeof(semaRoot{})]byte
- }
- func semroot(addr *uint32) *semaRoot {
- return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
- }
- ┌─────┬─────┬─────┬─────┬─────┬────────────────────────┬─────┐
- │ 0 │ 1 │ 2 │ 3 │ 4 │ ..... │ 250 │
- └─────┴─────┴─────┴─────┴─────┴────────────────────────┴─────┘
- │ │
- │ │
- └──┐ └─┐
- │ │
- │ │
- ▼ ▼
- ┌─────────┐ ┌─────────┐
- │ struct │ │ struct │
- ├─────────┴─────────┐ ├─────────┴─────────┐
- │ root semaRoot │──┐ │ root semaRoot │──┐
- ├───────────────────┤ │ ├───────────────────┤ │
- │ pad │ │ │ pad │ │
- └───────────────────┘ │ └───────────────────┘ │
- │ │
- ┌────────────────┘ ┌────────────────┘
- │ │
- │ │
- ▼ ▼
- ┌──────────┐ ┌──────────┐
- │ semaRoot │ │ semaRoot │
- ├──────────┴────────┐ ├──────────┴────────┐
- │ lock mutex │ │ lock mutex │
- ├───────────────────┤ ├───────────────────┤
- │ treap *sudog │ │ treap *sudog │
- ├───────────────────┤ ├───────────────────┤
- │ nwait uint32 │ │ nwait uint32 │
- └───────────────────┘ └───────────────────┘
treap 結(jié)構(gòu):
- ┌──────────┐
- ┌─┬─▶│ sudog │
- │ │ ├──────────┴────────────┐
- ┌─────────────────────┼─┼──│ prev *sudog │
- │ │ │ ├───────────────────────┤
- │ │ │ │ next *sudog │────┐
- │ │ │ ├───────────────────────┤ │
- │ │ │ │ parent *sudog │ │
- │ │ │ ├───────────────────────┤ │
- │ │ │ │ elem unsafe.Pointer │ │
- │ │ │ ├───────────────────────┤ │
- │ │ │ │ ticket uint32 │ │
- │ │ │ └───────────────────────┘ │
- │ │ │ │
- │ │ │ │
- │ │ │ │
- │ │ │ │
- │ │ │ │
- │ │ │ │
- ▼ │ │ ▼
- ┌──────────┐ │ │ ┌──────────┐
- │ sudog │ │ │ │ sudog │
- ├──────────┴────────────┐ │ │ ├──────────┴────────────┐
- │ prev *sudog │ │ │ │ prev *sudog │
- ├───────────────────────┤ │ │ ├───────────────────────┤
- │ next *sudog │ │ │ │ next *sudog │
- ├───────────────────────┤ │ │ ├───────────────────────┤
- │ parent *sudog │───┘ └─────────────────────────│ parent *sudog │
- ├───────────────────────┤ ├───────────────────────┤
- │ elem unsafe.Pointer │ │ elem unsafe.Pointer │
- ├───────────────────────┤ ├───────────────────────┤
- │ ticket uint32 │ │ ticket uint32 │
- └───────────────────────┘ └───────────────────────┘
在這個 treap 結(jié)構(gòu)里,從 elem 的視角(其實就是 lock 的 addr)來看,這個結(jié)構(gòu)是個二叉搜索樹。從 ticket 的角度來看,整個結(jié)構(gòu)就是一個小頂堆。
所以才叫樹堆(treap)。
相同 addr,即對同一個 mutex 上鎖的 g,會阻塞在同一個地址上。這些阻塞在同一個地址上的 goroutine 會被打包成 sudog,組成一個鏈表。用 sudog 的 waitlink 相連:
- ┌──────────┐ ┌──────────┐ ┌──────────┐
- │ sudog │ ┌─────▶│ sudog │ ┌─────▶│ sudog │
- ├──────────┴────────────┐ │ ├──────────┴────────────┐ │ ├──────────┴────────────┐
- │ waitlink *sudog │─────┘ │ waitlink *sudog │──────┘ │ waitlink *sudog │
- ├───────────────────────┤ ├───────────────────────┤ ├───────────────────────┤
- │ waittail *sudog │ │ waittail *sudog │ │ waittail *sudog │
- └───────────────────────┘ └───────────────────────┘ └───────────────────────┘
中間的元素的 waittail 都會指向最后一個元素:
- ┌──────────┐
- │ sudog │
- ├──────────┴────────────┐
- │ waitlink *sudog │
- ├───────────────────────┤
- │ waittail *sudog │───────────────────────────────────────────────────────────┐
- └───────────────────────┘ │
- ┌──────────┐ │
- │ sudog │ │
- ├──────────┴────────────┐ │
- │ waitlink *sudog │ │
- ├───────────────────────┤ │
- │ waittail *sudog │─────────────────────────┤
- └───────────────────────┘ ▼
- ┌──────────┐
- │ sudog │
- ├──────────┴────────────┐
- │ waitlink *sudog │
- ├───────────────────────┤
- │ waittail *sudog │
- └───────────────────────┘
對外封裝
在 sema.go 里實現(xiàn)的內(nèi)容,用 go:linkname 導(dǎo)出給 sync、poll 庫來使用,也是在鏈接期做了些手腳:
- //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire
- func sync_runtime_Semacquire(addr *uint32) {
- semacquire1(addr, false, semaBlockProfile)
- }
- //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire
- func poll_runtime_Semacquire(addr *uint32) {
- semacquire1(addr, false, semaBlockProfile)
- }
- //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease
- func sync_runtime_Semrelease(addr *uint32, handoff bool) {
- semrelease1(addr, handoff)
- }
- //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
- func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {
- semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
- }
- //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease
- func poll_runtime_Semrelease(addr *uint32) {
- semrelease(addr)
- }
實現(xiàn)
sem 本身支持 acquire 和 release,其實就是 OS 里常說的 P 操作和 V 操作。
公共部分
- func cansemacquire(addr *uint32) bool {
- for {
- v := atomic.Load(addr)
- if v == 0 {
- return false
- }
- if atomic.Cas(addr, v, v-1) {
- return true
- }
- }
- }
acquire 過程
- type semaProfileFlags int
- const (
- semaBlockProfile semaProfileFlags = 1 << iota
- semaMutexProfile
- )
- // Called from runtime.
- func semacquire(addr *uint32) {
- semacquire1(addr, false, 0)
- }
- func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {
- gp := getg()
- if gp != gp.m.curg {
- throw("semacquire not on the G stack")
- }
- // 低成本的情況
- if cansemacquire(addr) {
- return
- }
- // 高成本的情況:
- // 增加 waiter count 的值
- // 再嘗試調(diào)用一次 cansemacquire,成本了就直接返回
- // 沒成功就把自己作為一個 waiter 入隊
- // sleep
- // (之后 waiter 的 descriptor 被 signaler 用 dequeue 踢出)
- s := acquireSudog()
- root := semroot(addr)
- t0 := int64(0)
- s.releasetime = 0
- s.acquiretime = 0
- s.ticket = 0
- for {
- lock(&root.lock)
- // 給 nwait 加一,這樣后來的就不會在 semrelease 中進(jìn)低成本的路徑了
- atomic.Xadd(&root.nwait, 1)
- // 檢查 cansemacquire 避免錯過了喚醒
- if cansemacquire(addr) {
- atomic.Xadd(&root.nwait, -1)
- unlock(&root.lock)
- break
- }
- // 在 cansemacquire 之后的 semrelease 都可以知道我們正在等待
- // (上面設(shè)置了 nwait),所以會直接進(jìn)入 sleep
- // 注: 這里說的 sleep 其實就是 goparkunlock
- root.queue(addr, s, lifo)
- goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)
- if s.ticket != 0 || cansemacquire(addr) {
- break
- }
- }
- if s.releasetime > 0 {
- blockevent(s.releasetime-t0, 3)
- }
- releaseSudog(s)
- }
release 過程
- func semrelease(addr *uint32) {
- semrelease1(addr, false)
- }
- func semrelease1(addr *uint32, handoff bool) {
- root := semroot(addr)
- atomic.Xadd(addr, 1)
- // 低成本情況: 沒有 waiter?
- // 這個 atomic 的檢查必須發(fā)生在 xadd 之前,以避免錯誤喚醒
- // (具體參見 semacquire 中的循環(huán))
- if atomic.Load(&root.nwait) == 0 {
- return
- }
- // 高成本情況: 搜索 waiter 并喚醒它
- lock(&root.lock)
- if atomic.Load(&root.nwait) == 0 {
- // count 值已經(jīng)被另一個 goroutine 消費了
- // 所以我們不需要喚醒其它 goroutine 了
- unlock(&root.lock)
- return
- }
- s, t0 := root.dequeue(addr)
- if s != nil {
- atomic.Xadd(&root.nwait, -1)
- }
- unlock(&root.lock)
- if s != nil { // 可能會很慢,所以先解鎖
- acquiretime := s.acquiretime
- if acquiretime != 0 {
- mutexevent(t0-acquiretime, 3)
- }
- if s.ticket != 0 {
- throw("corrupted semaphore ticket")
- }
- if handoff && cansemacquire(addr) {
- s.ticket = 1
- }
- readyWithTime(s, 5)
- }
- }
- func readyWithTime(s *sudog, traceskip int) {
- if s.releasetime != 0 {
- s.releasetime = cputicks()
- }
- goready(s.g, traceskip)
- }
treap 結(jié)構(gòu)
sudog 按照地址 hash 到 251 個 bucket 中的其中一個,每一個 bucket 都是一棵 treap。而相同 addr 上的 sudog 會形成一個鏈表。
為啥同一個地址的 sudog 不需要展開放在 treap 中呢?顯然,sudog 喚醒的時候,block 在同一個 addr 上的 goroutine,說明都是加的同一把鎖,這些 goroutine 被喚醒肯定是一起被喚醒的,相同地址的 g 并不需要查找才能找到,只要決定是先進(jìn)隊列的被喚醒(fifo)還是后進(jìn)隊列的被喚醒(lifo)就可以了。
- // queue 函數(shù)會把 s 添加到 semaRoot 上阻塞的 goroutine 們中
- // 實際上就是把 s 添加到其地址對應(yīng)的 treap 上
- func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) {
- s.g = getg()
- s.elem = unsafe.Pointer(addr)
- s.next = nil
- s.prev = nil
- var last *sudog
- pt := &root.treap
- for t := *pt; t != nil; t = *pt {
- if t.elem == unsafe.Pointer(addr) {
- // Already have addr in list.
- if lifo {
- // treap 中在 t 的位置用 s 覆蓋掉 t
- *pt = s
- s.ticket = t.ticket
- s.acquiretime = t.acquiretime
- s.parent = t.parent
- s.prev = t.prev
- s.next = t.next
- if s.prev != nil {
- s.prev.parent = s
- }
- if s.next != nil {
- s.next.parent = s
- }
- // 把 t 放在 s 的 wait list 的第一個位置
- s.waitlink = t
- s.waittail = t.waittail
- if s.waittail == nil {
- s.waittail = t
- }
- t.parent = nil
- t.prev = nil
- t.next = nil
- t.waittail = nil
- } else {
- // 把 s 添加到 t 的等待列表的末尾
- if t.waittail == nil {
- t.waitlink = s
- } else {
- t.waittail.waitlink = s
- }
- t.waittail = s
- s.waitlink = nil
- }
- return
- }
- last = t
- if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {
- pt = &t.prev
- } else {
- pt = &t.next
- }
- }
- // 把 s 作為樹的新的葉子插入進(jìn)去
- // 平衡樹使用 ticket 作為堆的權(quán)重值,這個 ticket 是隨機(jī)生成的
- // 也就是說,這個結(jié)構(gòu)以元素地址來看的話,是一個二叉搜索樹
- // 同時用 ticket 值使其同時又是一個小頂堆,滿足
- // s.ticket <= both s.prev.ticket and s.next.ticket.
- // https://en.wikipedia.org/wiki/Treap
- // http://faculty.washington.edu/aragon/pubs/rst89.pdf
- //
- // s.ticket 會在一些地方和 0 相比,因此只設(shè)置最低位的 bit
- // 這樣不會明顯地影響 treap 的質(zhì)量?
- s.ticket = fastrand() | 1
- s.parent = last
- *pt = s
- // 按照 ticket 來進(jìn)行旋轉(zhuǎn),以滿足 treap 的性質(zhì)
- for s.parent != nil && s.parent.ticket > s.ticket {
- if s.parent.prev == s {
- root.rotateRight(s.parent)
- } else {
- if s.parent.next != s {
- panic("semaRoot queue")
- }
- root.rotateLeft(s.parent)
- }
- }
- }
- // dequeue 會搜索到阻塞在 addr 地址的 semaRoot 中的第一個 goroutine
- // 如果這個 sudog 需要進(jìn)行 profile,dequeue 會返回它被喚醒的時間(now),否則的話 now 為 0
- func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) {
- ps := &root.treap
- s := *ps
- for ; s != nil; s = *ps {
- if s.elem == unsafe.Pointer(addr) {
- goto Found
- }
- if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) {
- ps = &s.prev
- } else {
- ps = &s.next
- }
- }
- return nil, 0
- Found:
- now = int64(0)
- if s.acquiretime != 0 {
- now = cputicks()
- }
- if t := s.waitlink; t != nil {
- // 替換掉同樣在 addr 上等待的 t。
- *ps = t
- t.ticket = s.ticket
- t.parent = s.parent
- t.prev = s.prev
- if t.prev != nil {
- t.prev.parent = t
- }
- t.next = s.next
- if t.next != nil {
- t.next.parent = t
- }
- if t.waitlink != nil {
- t.waittail = s.waittail
- } else {
- t.waittail = nil
- }
- t.acquiretime = now
- s.waitlink = nil
- s.waittail = nil
- } else {
- // 向下旋轉(zhuǎn) s 到葉節(jié)點,以進(jìn)行刪除,同時要考慮優(yōu)先級
- for s.next != nil || s.prev != nil {
- if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket {
- root.rotateRight(s)
- } else {
- root.rotateLeft(s)
- }
- }
- // Remove s, now a leaf.
- // 刪除 s,現(xiàn)在是葉子節(jié)點了
- if s.parent != nil {
- if s.parent.prev == s {
- s.parent.prev = nil
- } else {
- s.parent.next = nil
- }
- } else {
- root.treap = nil
- }
- }
- s.parent = nil
- s.elem = nil
- s.next = nil
- s.prev = nil
- s.ticket = 0
- return s, now
- }