Go并發(fā)編程中的經(jīng)驗(yàn)教訓(xùn)
通過(guò)學(xué)習(xí)如何定位并發(fā)處理的陷阱來(lái)避免未來(lái)處理這些問(wèn)題時(shí)的困境。
在復(fù)雜的分布式系統(tǒng)進(jìn)行任務(wù)處理時(shí),你通常會(huì)需要進(jìn)行并發(fā)的操作。在 Mode.net 公司,我們每天都要和實(shí)時(shí)、快速和靈活的軟件打交道。而沒(méi)有一個(gè)高度并發(fā)的系統(tǒng),就不可能構(gòu)建一個(gè)毫秒級(jí)的動(dòng)態(tài)地路由數(shù)據(jù)包的全球?qū)S镁W(wǎng)絡(luò)。這個(gè)動(dòng)態(tài)路由是基于網(wǎng)絡(luò)狀態(tài)的,盡管這個(gè)過(guò)程需要考慮眾多因素,但我們的重點(diǎn)是鏈路指標(biāo)。在我們的環(huán)境中,鏈路指標(biāo)可以是任何跟網(wǎng)絡(luò)鏈接的狀態(tài)和當(dāng)前屬性(如鏈接延遲)有關(guān)的任何內(nèi)容。
并發(fā)探測(cè)鏈接監(jiān)控
我們的動(dòng)態(tài)路由算法 H.A.L.O.(逐跳自適應(yīng)鏈路狀態(tài)最佳路由)部分依賴(lài)于鏈路指標(biāo)來(lái)計(jì)算路由表。這些指標(biāo)由位于每個(gè) PoP(存活節(jié)點(diǎn))上的獨(dú)立組件收集。PoP 是表示我們的網(wǎng)絡(luò)中單個(gè)路由實(shí)體的機(jī)器,通過(guò)鏈路連接并分布在我們的網(wǎng)絡(luò)拓?fù)渲械母鱾€(gè)位置。某個(gè)組件使用網(wǎng)絡(luò)數(shù)據(jù)包探測(cè)周?chē)臋C(jī)器,周?chē)臋C(jī)器回復(fù)數(shù)據(jù)包給前者。從接收到的探測(cè)包中可以獲得鏈路延遲。由于每個(gè) PoP 都有不止一個(gè)臨近節(jié)點(diǎn),所以這種探測(cè)任務(wù)實(shí)質(zhì)上是并發(fā)的:我們需要實(shí)時(shí)測(cè)量每個(gè)臨近連接點(diǎn)的延遲。我們不能串行地處理;為了計(jì)算這個(gè)指標(biāo),必須盡快處理每個(gè)探測(cè)。
latency computation graph
序列號(hào)和重置:一個(gè)重新排列場(chǎng)景
我們的探測(cè)組件互相發(fā)送和接收數(shù)據(jù)包,并依靠序列號(hào)進(jìn)行數(shù)據(jù)包處理。這旨在避免處理重復(fù)的包或順序被打亂的包。我們的第一個(gè)實(shí)現(xiàn)依靠特殊的序列號(hào) 0 來(lái)重置序列號(hào)。這個(gè)數(shù)字僅在組件初始化時(shí)使用。主要的問(wèn)題是我們考慮了遞增的序列號(hào)總是從 0 開(kāi)始。在該組件重啟后,包的順序可能會(huì)重新排列,某個(gè)包的序列號(hào)可能會(huì)輕易地被替換成重置之前使用過(guò)的值。這意味著,后繼的包都會(huì)被忽略掉,直到排到重置之前用到的序列值。
UDP 握手和有限狀態(tài)機(jī)
這里的問(wèn)題是該組件重啟前后的序列號(hào)是否一致。有幾種方法可以解決這個(gè)問(wèn)題,經(jīng)過(guò)討論,我們選擇了實(shí)現(xiàn)一個(gè)帶有清晰狀態(tài)定義的三步握手協(xié)議。這個(gè)握手過(guò)程在初始化時(shí)通過(guò)鏈接建立會(huì)話(huà)。這樣可以確保節(jié)點(diǎn)通過(guò)同一個(gè)會(huì)話(huà)進(jìn)行通信且使用了適當(dāng)?shù)男蛄刑?hào)。
為了正確實(shí)現(xiàn)這個(gè)過(guò)程,我們必須定義一個(gè)有清晰狀態(tài)和過(guò)渡的有限狀態(tài)機(jī)。這樣我們就可以正確管理握手過(guò)程中的所有極端情況。
finite state machine diagram
會(huì)話(huà) ID 由握手的初始化程序生成。一個(gè)完整的交換順序如下:
- 發(fā)送者發(fā)送一個(gè)
SYN(ID)
數(shù)據(jù)包。 - 接收者存儲(chǔ)接收到的
ID
并發(fā)送一個(gè)SYN-ACK(ID)
。 - 發(fā)送者接收到
SYN-ACK(ID)
并發(fā)送一個(gè)ACK(ID)
。它還發(fā)送一個(gè)從序列號(hào) 0 開(kāi)始的數(shù)據(jù)包。 - 接收者檢查最后接收到的
ID
,如果 ID 匹配,則接受ACK(ID)
。它還開(kāi)始接受序列號(hào)為 0 的數(shù)據(jù)包。
處理狀態(tài)超時(shí)
基本上,每種狀態(tài)下你都需要處理最多三種類(lèi)型的事件:鏈接事件、數(shù)據(jù)包事件和超時(shí)事件。這些事件會(huì)并發(fā)地出現(xiàn),因此你必須正確處理并發(fā)。
- 鏈接事件包括網(wǎng)絡(luò)連接或網(wǎng)絡(luò)斷開(kāi)的變化,相應(yīng)的初始化一個(gè)鏈接會(huì)話(huà)或斷開(kāi)一個(gè)已建立的會(huì)話(huà)。
- 數(shù)據(jù)包事件是控制數(shù)據(jù)包(
SYN
/SYN-ACK
/ACK
)或只是探測(cè)響應(yīng)。 - 超時(shí)事件在當(dāng)前會(huì)話(huà)狀態(tài)的預(yù)定超時(shí)時(shí)間到期后觸發(fā)。
這里面臨的最主要的問(wèn)題是如何處理并發(fā)的超時(shí)到期和其他事件。這里很容易陷入死鎖和資源競(jìng)爭(zhēng)的陷阱。
第一種方法
本項(xiàng)目使用的語(yǔ)言是 Golang。它確實(shí)提供了原生的同步機(jī)制,如自帶的通道和鎖,并且能夠使用輕量級(jí)線(xiàn)程來(lái)進(jìn)行并發(fā)處理。
gopher 們聚眾狂歡
首先,你可以設(shè)計(jì)兩個(gè)分別表示我們的會(huì)話(huà)和超時(shí)處理程序的結(jié)構(gòu)體。
type Session struct {
State SessionState
Id SessionId
RemoteIp string
}
type TimeoutHandler struct {
callback func(Session)
session Session
duration int
timer *timer.Timer
}
Session
標(biāo)識(shí)連接會(huì)話(huà),內(nèi)有表示會(huì)話(huà) ID、臨近的連接點(diǎn)的 IP 和當(dāng)前會(huì)話(huà)狀態(tài)的字段。
TimeoutHandler
包含回調(diào)函數(shù)、對(duì)應(yīng)的會(huì)話(huà)、持續(xù)時(shí)間和指向調(diào)度計(jì)時(shí)器的指針。
每一個(gè)臨近連接點(diǎn)的會(huì)話(huà)都包含一個(gè)保存調(diào)度 TimeoutHandler
的全局映射。
SessionTimeout map[Session]*TimeoutHandler
下面方法注冊(cè)和取消超時(shí):
// schedules the timeout callback function.
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() {
timeout.callback(timeout.session)
})
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.timer == nil {
return
}
timeout.timer.Stop()
}
你可以使用類(lèi)似下面的方法來(lái)創(chuàng)建和存儲(chǔ)超時(shí):
func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler {
if sessionTimeout[session] == nil {
sessionTimeout[session] := new(TimeoutHandler)
}
timeout = sessionTimeout[session]
timeout.session = session
timeout.callback = callback
timeout.duration = duration
return timeout
}
超時(shí)處理程序創(chuàng)建后,會(huì)在經(jīng)過(guò)了設(shè)置的 duration
時(shí)間(秒)后執(zhí)行回調(diào)函數(shù)。然而,有些事件會(huì)使你重新調(diào)度一個(gè)超時(shí)處理程序(與 SYN
狀態(tài)時(shí)的處理一樣,每 3 秒一次)。
為此,你可以讓回調(diào)函數(shù)重新調(diào)度一次超時(shí):
func synCallback(session Session) {
sendSynPacket(session)
// reschedules the same callback.
newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION)
newTimeout.Register()
sessionTimeout[state] = newTimeout
}
這次回調(diào)在新的超時(shí)處理程序中重新調(diào)度自己,并更新全局映射 sessionTimeout
。
數(shù)據(jù)競(jìng)爭(zhēng)和引用
你的解決方案已經(jīng)有了??梢酝ㄟ^(guò)檢查計(jì)時(shí)器到期后超時(shí)回調(diào)是否執(zhí)行來(lái)進(jìn)行一個(gè)簡(jiǎn)單的測(cè)試。為此,注冊(cè)一個(gè)超時(shí),休眠 duration
秒,然后檢查是否執(zhí)行了回調(diào)的處理。執(zhí)行這個(gè)測(cè)試后,最好取消預(yù)定的超時(shí)時(shí)間(因?yàn)樗鼤?huì)重新調(diào)度),這樣才不會(huì)在下次測(cè)試時(shí)產(chǎn)生副作用。
令人驚訝的是,這個(gè)簡(jiǎn)單的測(cè)試發(fā)現(xiàn)了這個(gè)解決方案中的一個(gè)問(wèn)題。使用 cancel
方法來(lái)取消超時(shí)并沒(méi)有正確處理。以下順序的事件會(huì)導(dǎo)致數(shù)據(jù)資源競(jìng)爭(zhēng):
- 你有一個(gè)已調(diào)度的超時(shí)處理程序。
- 線(xiàn)程 1:
- 你接收到一個(gè)控制數(shù)據(jù)包,現(xiàn)在你要取消已注冊(cè)的超時(shí)并切換到下一個(gè)會(huì)話(huà)狀態(tài)(如發(fā)送
SYN
后接收到一個(gè)SYN-ACK
) - 你調(diào)用了
timeout.Cancel()
,這個(gè)函數(shù)調(diào)用了timer.Stop()
。(請(qǐng)注意,Golang 計(jì)時(shí)器的停止不會(huì)終止一個(gè)已過(guò)期的計(jì)時(shí)器。)
- 你接收到一個(gè)控制數(shù)據(jù)包,現(xiàn)在你要取消已注冊(cè)的超時(shí)并切換到下一個(gè)會(huì)話(huà)狀態(tài)(如發(fā)送
- 線(xiàn)程 2:
- 在取消調(diào)用之前,計(jì)時(shí)器已過(guò)期,回調(diào)即將執(zhí)行。
- 執(zhí)行回調(diào),它調(diào)度一次新的超時(shí)并更新全局映射。
- 線(xiàn)程 1:
- 切換到新的會(huì)話(huà)狀態(tài)并注冊(cè)新的超時(shí),更新全局映射。
兩個(gè)線(xiàn)程并發(fā)地更新超時(shí)映射。最終結(jié)果是你無(wú)法取消注冊(cè)的超時(shí),然后你也會(huì)丟失對(duì)線(xiàn)程 2 重新調(diào)度的超時(shí)的引用。這導(dǎo)致處理程序在一段時(shí)間內(nèi)持續(xù)執(zhí)行和重新調(diào)度,出現(xiàn)非預(yù)期行為。
鎖也解決不了問(wèn)題
使用鎖也不能完全解決問(wèn)題。如果你在處理所有事件和執(zhí)行回調(diào)之前加鎖,它仍然不能阻止一個(gè)過(guò)期的回調(diào)運(yùn)行:
func (timeout* TimeoutHandler) Register() {
timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
})
}
現(xiàn)在的區(qū)別就是全局映射的更新是同步的,但是這還是不能阻止在你調(diào)用 timeout.Cancel()
后回調(diào)的執(zhí)行 —— 這種情況出現(xiàn)在調(diào)度計(jì)時(shí)器過(guò)期了但是還沒(méi)有拿到鎖的時(shí)候。你還是會(huì)丟失一個(gè)已注冊(cè)的超時(shí)的引用。
使用取消通道
你可以使用取消通道,而不必依賴(lài)不能阻止到期的計(jì)時(shí)器執(zhí)行的 golang 函數(shù) timer.Stop()
。
這是一個(gè)略有不同的方法?,F(xiàn)在你可以不用再通過(guò)回調(diào)進(jìn)行遞歸地重新調(diào)度;而是注冊(cè)一個(gè)死循環(huán),這個(gè)循環(huán)接收到取消信號(hào)或超時(shí)事件時(shí)終止。
新的 Register()
產(chǎn)生一個(gè)新的 go 線(xiàn)程,這個(gè)線(xiàn)程在超時(shí)后執(zhí)行你的回調(diào),并在前一個(gè)超時(shí)執(zhí)行后調(diào)度新的超時(shí)。返回給調(diào)用方一個(gè)取消通道,用來(lái)控制循環(huán)的終止。
func (timeout *TimeoutHandler) Register() chan struct{} {
cancelChan := make(chan struct{})
go func () {
select {
case _ = <- cancelChan:
return
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()
timeout.callback(timeout.session)
} ()
}
} ()
return cancelChan
}
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
timeout.cancelChan <- struct{}{}
}
這個(gè)方法給你注冊(cè)的所有超時(shí)提供了取消通道。一個(gè)取消調(diào)用向通道發(fā)送一個(gè)空結(jié)構(gòu)體并觸發(fā)取消操作。然而,這并不能解決前面的問(wèn)題;可能在你通過(guò)通道取消之前以及超時(shí)線(xiàn)程拿到鎖之前,超時(shí)時(shí)間就已經(jīng)到了。
這里的解決方案是,在拿到鎖之后,檢查一下超時(shí)范圍內(nèi)的取消通道。
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second):
func () {
stateLock.Lock()
defer stateLock.Unlock()
select {
case _ = <- handler.cancelChan:
return
default:
timeout.callback(timeout.session)
}
} ()
}
最終,這可以確保在拿到鎖之后執(zhí)行回調(diào),不會(huì)觸發(fā)取消操作。
小心死鎖
這個(gè)解決方案看起來(lái)有效;但是還是有個(gè)隱患:死鎖。
請(qǐng)閱讀上面的代碼,試著自己找到它??紤]下描述的所有函數(shù)的并發(fā)調(diào)用。
這里的問(wèn)題在取消通道本身。我們創(chuàng)建的是無(wú)緩沖通道,即發(fā)送的是阻塞調(diào)用。當(dāng)你在一個(gè)超時(shí)處理程序中調(diào)用取消函數(shù)時(shí),只有在該處理程序被取消后才能繼續(xù)處理。問(wèn)題出現(xiàn)在,當(dāng)你有多個(gè)調(diào)用請(qǐng)求到同一個(gè)取消通道時(shí),這時(shí)一個(gè)取消請(qǐng)求只被處理一次。當(dāng)多個(gè)事件同時(shí)取消同一個(gè)超時(shí)處理程序時(shí),如連接斷開(kāi)或控制包事件,很容易出現(xiàn)這種情況。這會(huì)導(dǎo)致死鎖,可能會(huì)使應(yīng)用程序停機(jī)。
有人在聽(tīng)嗎?
(已獲得 Trevor Forrey 授權(quán)。)
這里的解決方案是創(chuàng)建通道時(shí)指定緩存大小至少為 1,這樣向通道發(fā)送數(shù)據(jù)就不會(huì)阻塞,也顯式地使發(fā)送變成非阻塞的,避免了并發(fā)調(diào)用。這樣可以確保取消操作只發(fā)送一次,并且不會(huì)阻塞后續(xù)的取消調(diào)用。
func (timeout* TimeoutHandler) Cancel() {
if timeout.cancelChan == nil {
return
}
select {
case timeout.cancelChan <- struct{}{}:
default:
// can’t send on the channel, someone has already requested the cancellation.
}
}
總結(jié)
在實(shí)踐中你學(xué)到了并發(fā)操作時(shí)出現(xiàn)的常見(jiàn)錯(cuò)誤。由于其不確定性,即使進(jìn)行大量的測(cè)試,也不容易發(fā)現(xiàn)這些問(wèn)題。下面是我們?cè)谧畛醯膶?shí)現(xiàn)中遇到的三個(gè)主要問(wèn)題:
在非同步的情況下更新共享數(shù)據(jù)
這似乎是個(gè)很明顯的問(wèn)題,但如果并發(fā)更新發(fā)生在不同的位置,就很難發(fā)現(xiàn)。結(jié)果就是數(shù)據(jù)競(jìng)爭(zhēng),由于一個(gè)更新會(huì)覆蓋另一個(gè),因此對(duì)同一數(shù)據(jù)的多次更新中會(huì)有某些更新丟失。在我們的案例中,我們是在同時(shí)更新同一個(gè)共享映射里的調(diào)度超時(shí)引用。(有趣的是,如果 Go 檢測(cè)到在同一個(gè)映射對(duì)象上的并發(fā)讀寫(xiě),會(huì)拋出致命錯(cuò)誤 — 你可以嘗試下運(yùn)行 Go 的數(shù)據(jù)競(jìng)爭(zhēng)檢測(cè)器)。這最終會(huì)導(dǎo)致丟失超時(shí)引用,且無(wú)法取消給定的超時(shí)。當(dāng)有必要時(shí),永遠(yuǎn)不要忘記使用鎖。
不要忘記同步 gopher 們的工作
缺少條件檢查
在不能僅依賴(lài)鎖的獨(dú)占性的情況下,就需要進(jìn)行條件檢查。我們遇到的場(chǎng)景稍微有點(diǎn)不一樣,但是核心思想跟條件變量是一樣的。假設(shè)有個(gè)一個(gè)生產(chǎn)者和多個(gè)消費(fèi)者使用一個(gè)共享隊(duì)列的經(jīng)典場(chǎng)景,生產(chǎn)者可以將一個(gè)元素添加到隊(duì)列并喚醒所有消費(fèi)者。這個(gè)喚醒調(diào)用意味著隊(duì)列中的數(shù)據(jù)是可訪(fǎng)問(wèn)的,并且由于隊(duì)列是共享的,消費(fèi)者必須通過(guò)鎖來(lái)進(jìn)行同步訪(fǎng)問(wèn)。每個(gè)消費(fèi)者都可能拿到鎖;然而,你仍然需要檢查隊(duì)列中是否有元素。因?yàn)樵谀隳玫芥i的瞬間并不知道隊(duì)列的狀態(tài),所以還是需要進(jìn)行條件檢查。
在我們的例子中,超時(shí)處理程序收到了計(jì)時(shí)器到期時(shí)發(fā)出的“喚醒”調(diào)用,但是它仍需要檢查是否已向其發(fā)送了取消信號(hào),然后才能繼續(xù)執(zhí)行回調(diào)。
如果你要喚醒多個(gè) gopher,可能就需要進(jìn)行條件檢查
死鎖
當(dāng)一個(gè)線(xiàn)程被卡住,無(wú)限期地等待一個(gè)喚醒信號(hào),但是這個(gè)信號(hào)永遠(yuǎn)不會(huì)到達(dá)時(shí),就會(huì)發(fā)生這種情況。死鎖可以通過(guò)讓你的整個(gè)程序停機(jī)來(lái)徹底殺死你的應(yīng)用。
在我們的案例中,這種情況的發(fā)生是由于多次發(fā)送請(qǐng)求到一個(gè)非緩沖且阻塞的通道。這意味著向通道發(fā)送數(shù)據(jù)只有在從這個(gè)通道接收完數(shù)據(jù)后才能返回。我們的超時(shí)線(xiàn)程循環(huán)迅速?gòu)娜∠ǖ澜邮招盘?hào);然而,在接收到第一個(gè)信號(hào)后,它將跳出循環(huán),并且再也不會(huì)從這個(gè)通道讀取數(shù)據(jù)。其他的調(diào)用會(huì)一直被卡住。為避免這種情況,你需要仔細(xì)檢查代碼,謹(jǐn)慎處理阻塞調(diào)用,并確保不會(huì)發(fā)生線(xiàn)程饑餓。我們例子中的解決方法是使取消調(diào)用成為非阻塞調(diào)用 — 我們不需要阻塞調(diào)用。