Go 事件驅(qū)動編程:實(shí)現(xiàn)一個簡單的事件總線
前言
在當(dāng)今微服務(wù)和分布式系統(tǒng)盛行的背景下,事件驅(qū)動架構(gòu)(Event-Driven Architecture,EDA)扮演著一個至關(guān)重要的角色,此架構(gòu)的設(shè)計使得服務(wù)間可以通過事件進(jìn)行同步或異步通信,替代了傳統(tǒng)的直接接口調(diào)用?;谑录慕换シ绞?,促進(jìn)了服務(wù)之間的松耦合,提高系統(tǒng)的可擴(kuò)展性。
發(fā)布-訂閱模式是實(shí)現(xiàn)事件驅(qū)動架構(gòu)的模式之一,它允許系統(tǒng)的不同組件或服務(wù)發(fā)布事件,而其他組件或服務(wù)可以訂閱這些事件并根據(jù)事件內(nèi)容進(jìn)行響應(yīng)。相信大部分開發(fā)者都接觸過這一模式,常見的技術(shù)實(shí)現(xiàn)有消息隊列(MQ)和 Redis 發(fā)布/訂閱(PUB/SUB)功能等。
在 Go 語言中,我們可以利用其強(qiáng)大的 channel 和并發(fā)機(jī)制來實(shí)現(xiàn)發(fā)布-訂閱模式。本文將深入探討如何在 Go 中實(shí)現(xiàn)一個簡單的事件總線,這是發(fā)布-訂閱模式的具體實(shí)現(xiàn)。
準(zhǔn)備好了嗎?準(zhǔn)備一杯你最喜歡的咖啡或茶,隨著本文一探究竟吧。
事件總線
事件總線是發(fā)布-訂閱模式的具體實(shí)現(xiàn),它作為發(fā)布者和訂閱者的中間件,管理著事件傳遞與分發(fā),確保事件從發(fā)布者順利地傳達(dá)到訂閱者。
圖片
事件總線的優(yōu)勢主要包括:
- 解耦:服務(wù)間不需要直接通信,而是通過時間進(jìn)行交互,減少服務(wù)間的依賴。
- 異步處理:事件可以被異步處理,提高系統(tǒng)的響應(yīng)性和性能。
- 可擴(kuò)展性:新的訂閱者可以輕松訂閱事件,不需要修改現(xiàn)有的發(fā)布者代碼。
- 錯誤隔離:事件處理的失敗不會直接影響其他服務(wù)的正常運(yùn)行。
事件總線的代碼實(shí)現(xiàn)
接下來將介紹如何在 Go 語言中實(shí)現(xiàn)一個簡單的事件總線,它包含以下關(guān)鍵功能:
- 發(fā)布:允許系統(tǒng)的各個服務(wù)發(fā)送事件。
- 訂閱:允許感興趣的服務(wù)訂閱接收特定類型的事件。
- 取消訂閱:允許各個服務(wù)將本身已訂閱的事件刪除。
項(xiàng)目源碼地址:https://github.com/chenmingyong0423/go-eventbus
事件數(shù)據(jù)結(jié)構(gòu)定義
type Event struct {
Payload any
}
Event 是一個封裝事件的結(jié)構(gòu)體,其中 Payload 為事件的上下文信息,類型是 any。
事件總線定義
type (
EventChan chan Event
)
type EventBus struct {
mu sync.RWMutex
subscribers map[string][]EventChan
}
func NewEventBus() *EventBus {
return &EventBus{
subscribers: make(map[string][]EventChan),
}
}
EventChan 是一個類型別名,定義為傳遞 Event 結(jié)構(gòu)體的通道 chan Event。
EventBus 為事件總線的定義,它包含兩個屬性:
- mu:一個讀寫互斥鎖(sync.RWMutex),用于保證下面 subscribers 的并發(fā)讀寫安全。
- subscribers:一個映射,鍵為字符串類型,表示訂閱的主題;值為 EventChan 切片類型。該屬性用于存儲各個主體的所有訂閱者,每個訂閱者通過 EventChan 接收事件。
NewEventBus 函數(shù)用于創(chuàng)建一個新的 EventBus 事件總線。
事件總線的方法實(shí)現(xiàn)
事件總線實(shí)現(xiàn)了三個方法,分別為發(fā)布事件(Publish)和訂閱事件(Subscribe)以及取消訂閱事件(Unsubscribe)。
Publish 發(fā)布事件
func (eb *EventBus) Publish(topic string, event Event) {
eb.mu.RLock()
defer eb.mu.RUnlock()
// 復(fù)制一個新的訂閱者列表,避免在發(fā)布事件時修改訂閱者列表
subscribers := append([]EventChan{}, eb.subscribers[topic]...)
gofunc() {
for _, subscriber := range subscribers {
subscriber <- event
}
}()
}
Publish 方法用于發(fā)布事件。該方法接收兩個參數(shù):topic(主題)和 event (封裝事件的對象)。
在 Publish 方法的實(shí)現(xiàn)中,首先通過 mu 屬性獲取讀鎖,以確保接下來的 subscribers 寫操作是協(xié)程安全的。然后復(fù)制一份當(dāng)前主題的訂閱者列表 subscribers。接下來開啟一個新 goroutine,在這個 goroutine 中遍歷復(fù)制的訂閱者列表,將事件通過通道發(fā)送給所有訂閱者。完成這些操作后,釋放讀鎖。
為什么會復(fù)制一個新的訂閱者列表?
答:復(fù)制訂閱者列表是為了在發(fā)送事件時保持?jǐn)?shù)據(jù)的一致性和穩(wěn)定性。由于向通道發(fā)送數(shù)據(jù)的操作是在一個新的 goroutine 中進(jìn)行的,在發(fā)送數(shù)據(jù)時,讀鎖已經(jīng)被釋放,原來的訂閱者列表可能會由于添加或刪除訂閱者而發(fā)生變化。如果直接使用原來的訂閱者列表,可能會發(fā)生預(yù)料之外的錯誤(如向一個已經(jīng)關(guān)閉的通道發(fā)送數(shù)據(jù)會產(chǎn)生 panic)。
Subscribe 訂閱事件
func (eb *EventBus) Subscribe(topic string) EventChan {
eb.mu.Lock()
defer eb.mu.Unlock()
ch := make(EventChan)
eb.subscribers[topic] = append(eb.subscribers[topic], ch)
return ch
}
Subscribe 方法用于訂閱特定主題的事件。該方法有接收一個 topic 參數(shù),表示希望訂閱的主題。通過此方法,可以獲得一個 EventChan 通道,用于接收該主題的事件。
在 Subscribe 方法的實(shí)現(xiàn)中,首先通過 mu 屬性獲取寫鎖,以保證接下來的 subscribers 讀寫操作是協(xié)程安全的;接著創(chuàng)建一個新的 EventChan 通道 ch,將其添加到相應(yīng)主題的訂閱者切片中。完成這些操作后,釋放寫鎖。
Unsubscribe 取消訂閱事件
func (eb *EventBus) Unsubscribe(topic string, ch EventChan) {
eb.mu.Lock()
defer eb.mu.Unlock()
if subscribers, ok := eb.subscribers[topic]; ok {
for i, subscriber := range subscribers {
if ch == subscriber {
eb.subscribers[topic] = append(subscribers[:i], subscribers[i+1:]...)
close(ch)
// 清空通道
forrange ch {
}
return
}
}
}
}
Unsubscribe 方法用于取消訂閱事件。該方法接收兩個參數(shù):topic(已訂閱的主題)和 ch(被頒發(fā)的通道)。
在 Unsubscribe 方法里,首先通過 mu 屬性獲取寫鎖,以保證接下來的 subscribers 讀寫操作是協(xié)程安全的;然后檢查 topic 主題是否存在對應(yīng)的訂閱者。如果存在,遍歷該主題的訂閱者切片,找到與 ch 相匹配的通道,將其從訂閱者切片里移除并關(guān)閉該通道。然后清空通道。完成這些操作后,釋放寫鎖。
使用示例
// https://github.com/chenmingyong0423/blog/blob/master/tutorial-code/go/eventbus/main.go
package main
import (
"fmt"
"time"
"github.com/chenmingyong0423/go-eventbus"
)
func main() {
eventBus := eventbus.NewEventBus()
// 訂閱 post 主題事件
subscribe := eventBus.Subscribe("post")
gofunc() {
for event := range subscribe {
fmt.Println(event.Payload)
}
}()
eventBus.Publish("post", eventbus.Event{Payload: map[string]any{
"postId": 1,
"title": "Go 事件驅(qū)動編程:實(shí)現(xiàn)一個簡單的事件總線",
"author": "陳明勇",
}})
// 不存在訂閱者的 topic
eventBus.Publish("pay", eventbus.Event{Payload: "pay"})
time.Sleep(time.Second * 2)
// 取消訂閱 post 主題事件
eventBus.Unsubscribe("post", subscribe)
}
擴(kuò)展建議
本文實(shí)現(xiàn)的事件總線較為簡單,如果要增強(qiáng)時間總線的靈活性,可靠性和易用性等方面,我們可以考慮擴(kuò)展它,以下是一些建議:
- 事件持久化:實(shí)現(xiàn)時間的持久化存儲功能,確保系統(tǒng)崩潰后可以恢復(fù)未處理的事件。
- 通配符和模式匹配訂閱:允許使用通配符或正則表達(dá)式來訂閱一組相關(guān)主題,而不是單個具體的主題。
- 負(fù)載均衡和消息分發(fā)策略:在多個訂閱者之間分配事件,實(shí)現(xiàn)負(fù)載均衡。
- 插件支持:支持通過插件來擴(kuò)展功能,如日志記錄、消息過濾、轉(zhuǎn)換等。
小結(jié)
本文深入探討了在 Go 語言中實(shí)現(xiàn)簡單事件總線的過程。通過利用 Go 語言的強(qiáng)大特性,如 channel 和并發(fā)機(jī)制,我們可以輕松地實(shí)現(xiàn)發(fā)布-訂閱模式。
文章從事件總線的優(yōu)勢開始,介紹了其解耦、異步處理、可擴(kuò)展性和錯誤隔離等特點(diǎn)。然后詳細(xì)解釋了如何定義事件數(shù)據(jù)結(jié)構(gòu)和事件總線結(jié)構(gòu),并實(shí)現(xiàn)了發(fā)布、訂閱和取消訂閱事件的方法。最后,提出了一些可能的擴(kuò)展方向,如事件持久化、通配符訂閱、負(fù)載均衡和插件支持,以增強(qiáng)事件總線的靈活性和功能性。
通過閱讀本文,你可以學(xué)會在 Go 語言中實(shí)現(xiàn)一個簡單但功能強(qiáng)大的事件總線,并根據(jù)可能的需求進(jìn)行擴(kuò)展。
★項(xiàng)目源碼地址:https://github.com/chenmingyong0423/go-eventbus