本文以Eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能。

背景
在實(shí)際項(xiàng)目中,我們經(jīng)常需要異步處理事件與數(shù)據(jù)。比如MVC模型中處理請求的Filter鏈,又如在nginx中或是linux的iptables中,都會有一個(gè)處理鏈條,來一步步的順序處理一個(gè)請求。此外基于集中存儲與分發(fā)的模式,實(shí)現(xiàn)事件與數(shù)據(jù)的異步處理,對于提升系統(tǒng)響應(yīng)程度,實(shí)現(xiàn)業(yè)務(wù)處理的解耦至關(guān)重要。本文以eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能
代碼
eosc提供了關(guān)于dispatcher的關(guān)鍵實(shí)現(xiàn)的兩個(gè)文件,分別是dispatch.go和data-dispatch.go,具體的代碼地址是https://github.com/eolinker/eosc/tree/main/common/dispatcher。
這兩個(gè)文件中實(shí)現(xiàn)的結(jié)構(gòu)體與接口的關(guān)系如圖所示:

dispatcher關(guān)鍵接口與結(jié)構(gòu)體的關(guān)系
dispatch.go文件
在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三個(gè)重要的接口。
同時(shí)通過CallBackFunc來實(shí)現(xiàn)接口CallBackHandler, tListener來實(shí)現(xiàn)IListener。
//2個(gè)接口
type CallBackHandler interface {
DataEvent(e IEvent) error
}
type IListener interface {
Leave()
Event() <-chan IEvent
}
/*CallBackFunc實(shí)現(xiàn)了CallBackHandler,同時(shí)CallBackFunc又是一個(gè)接受IEvent為參數(shù),
返回error的函數(shù)*/
type CallBackFunc func(e IEvent) error
func (f CallBackFunc) DataEvent(e IEvent) error {
return f(e)
}
//實(shí)現(xiàn)了IListener接口
func (t *tListener) Leave() {
t.Once.Do(func() {
atomic.StoreUint32(&t.closed, 1)
close(t.c)
})
}
func (t *tListener) Event() <-chan IEvent {
return t.c
}
注意:tListener還提供了一個(gè)Handler方法,這個(gè)方法的參數(shù)與返回結(jié)果與CallBackFunc一樣,也就是說它實(shí)現(xiàn)的Handler方法是一種CallBackFunc,這個(gè)在后面的分發(fā)處理邏輯的注冊中會用到。
func (t *tListener) Handler(e IEvent) error {
if atomic.LoadUint32(&t.closed) == 0 {
t.c <- e
return nil
}
return ErrorIsClosed
}
data-dispatch.go文件
該文件提供了兩種dispatcher創(chuàng)建方法,分別是NewDataDispatchCenter、NewEventDispatchCenter。這兩個(gè)方法都是創(chuàng)建了DataDispatchCenter結(jié)構(gòu)體(這個(gè)結(jié)構(gòu)體后面會講到),但是啟動的處理協(xié)程不同,NewDataDispatchCenter啟動的是doDataLoop,NewEventDispatchCenter啟動的是doEventLoop。
//兩種DispatchCenter創(chuàng)建方法
func NewDataDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doDataLoop()
return center
}
func NewEventDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doEventLoop()
return center
}
//DataDispatchCenter 數(shù)據(jù)廣播中心
type DataDispatchCenter struct {
addChannel chan *_CallbackBox
eventChannel chan IEvent
ctx context.Context
cancelFunc context.CancelFunc
}
DataDispatchCenter這個(gè)結(jié)構(gòu)體中有兩個(gè)chan,一個(gè)是addChannel,一個(gè)是eventChannel。
addChannel | 接受_CallbackBox,這個(gè)_CallbackBox提供了邏輯處理Handler |
eventChannel | 接受IEvent,觸發(fā) |
doEventLoop邏輯:
NewEventDispatchCenter方法中啟動的doEventLoop,邏輯相對簡單,創(chuàng)建的channels用于存儲addChannel發(fā)送過來的_CallbackBox,即事件處理Handler.當(dāng)eventChannel收到事件后,遍歷channels中的每一個(gè)_CallbackBox,并調(diào)用相應(yīng)的Handler處理。

doEventLoop狀態(tài)圖
具體代碼可以查看https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48。
doDataLoop邏輯:
NewDataDispatchCenter方法中啟動的doDataLoop,這個(gè)邏輯稍微復(fù)雜點(diǎn)。其實(shí)它的大致流程和doEventLoop,不同的是每個(gè)新增加的_CallbackBox,需要對當(dāng)前接收并緩存的所有Event鍵值對進(jìn)行處理。而doEventLoop是不會的,新增加的_CallbackBox,只會對在它之后接收的Event生效。下面的代碼InitEvent(data.GET())很有意思。
- 首先InitEvent實(shí)現(xiàn)了IEvent接口,是一種IEvent。
- type InitEvent map[string]map[string][]byte
(代碼鏈接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一個(gè)map,可以通過InitEvent(data.GET())初始化。
func (d *DataDispatchCenter) doDataLoop() {
data := NewMyData(nil)
channels := make([]*_CallbackBox, 0, 10)
isInit := false
for {
select {
case event, ok := <-d.eventChannel:
if ok {
isInit = true
data.DoEvent(event)
next := channels[:0]
for _, c := range channels {
if err := c.handler(event); err != nil {
close(c.closeChan)
continue
}
next = append(next, c)
}
channels = next
}
case hbox, ok := <-d.addChannel:
{
if ok {
if !isInit {
channels = append(channels, hbox)
} else {
if err := hbox.handler(InitEvent(data.GET())); err == nil {
channels = append(channels, hbox)
}
}
}
}
}
}
}
應(yīng)用
創(chuàng)建EventServer。
type EventServer struct {
IDispatchCenter
}
func NewEventServer() *EventServer {
es := &EventServer{
IDispatchCenter: NewDataDispatchCenter(),
}
return es
}
定義事件。
type MyEvent struct {
namespace string
key string
event string
data []byte
}
func (m *MyEvent) Namespace() string {
return m.namespace
}
func (m *MyEvent) Event() string {
return m.event
}
func (m *MyEvent) Key() string {
return m.key
}
func (m *MyEvent) Data() []byte {
return m.data
}
定義Handler并注冊。
func Handler(e IEvent) error {
//根據(jù)自己的業(yè)務(wù)要求
}
es.Register(Handler)
發(fā)送事件。
es.Send(&MyEvent{
namespace: "a",
key: "b",
event: "set",
data: []byte(fmt.Sprint(index)),
})