自拍偷在线精品自拍偷,亚洲欧美中文日韩v在线观看不卡

基于Dispatcher模式的事件與數(shù)據(jù)分發(fā)處理器的Go語言實(shí)現(xiàn)

開發(fā) 前端
本文以Eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能。

背景

在實(shí)際項(xiàng)目中,我們經(jīng)常需要異步處理事件與數(shù)據(jù)。比如MVC模型中處理請求的Filter鏈,又如在nginx中或是linux的iptables中,都會有一個(gè)處理鏈條,來一步步的順序處理一個(gè)請求。此外基于集中存儲與分發(fā)的模式,實(shí)現(xiàn)事件與數(shù)據(jù)的異步處理,對于提升系統(tǒng)響應(yīng)程度,實(shí)現(xiàn)業(yè)務(wù)處理的解耦至關(guān)重要。本文以eosc(一個(gè)高性能中間件開發(fā)框架)中的代碼為例子,看看如何在我們的實(shí)際項(xiàng)目中,實(shí)現(xiàn)這樣的功能

代碼

eosc提供了關(guān)于dispatcher的關(guān)鍵實(shí)現(xiàn)的兩個(gè)文件,分別是dispatch.go和data-dispatch.go,具體的代碼地址是https://github.com/eolinker/eosc/tree/main/common/dispatcher。

這兩個(gè)文件中實(shí)現(xiàn)的結(jié)構(gòu)體與接口的關(guān)系如圖所示:

dispatcher關(guān)鍵接口與結(jié)構(gòu)體的關(guān)系

dispatch.go文件

在dispatch.go文件中,esco提供了IEvent、CallBackHandler、IListener三個(gè)重要的接口。

同時(shí)通過CallBackFunc來實(shí)現(xiàn)接口CallBackHandler, tListener來實(shí)現(xiàn)IListener。

//2個(gè)接口
type CallBackHandler interface {
DataEvent(e IEvent) error
}

type IListener interface {
Leave()
Event() <-chan IEvent
}
/*CallBackFunc實(shí)現(xiàn)了CallBackHandler,同時(shí)CallBackFunc又是一個(gè)接受IEvent為參數(shù),
返回error的函數(shù)*/
type CallBackFunc func(e IEvent) error

func (f CallBackFunc) DataEvent(e IEvent) error {
return f(e)
}
//實(shí)現(xiàn)了IListener接口
func (t *tListener) Leave() {
t.Once.Do(func() {
atomic.StoreUint32(&t.closed, 1)
close(t.c)
})
}

func (t *tListener) Event() <-chan IEvent {
return t.c
}

注意:tListener還提供了一個(gè)Handler方法,這個(gè)方法的參數(shù)與返回結(jié)果與CallBackFunc一樣,也就是說它實(shí)現(xiàn)的Handler方法是一種CallBackFunc,這個(gè)在后面的分發(fā)處理邏輯的注冊中會用到。

func (t *tListener) Handler(e IEvent) error {
if atomic.LoadUint32(&t.closed) == 0 {
t.c <- e
return nil
}
return ErrorIsClosed
}

data-dispatch.go文件

該文件提供了兩種dispatcher創(chuàng)建方法,分別是NewDataDispatchCenter、NewEventDispatchCenter。這兩個(gè)方法都是創(chuàng)建了DataDispatchCenter結(jié)構(gòu)體(這個(gè)結(jié)構(gòu)體后面會講到),但是啟動的處理協(xié)程不同,NewDataDispatchCenter啟動的是doDataLoop,NewEventDispatchCenter啟動的是doEventLoop。

//兩種DispatchCenter創(chuàng)建方法
func NewDataDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doDataLoop()
return center
}

func NewEventDispatchCenter() IDispatchCenter {
ctx, cancelFunc := context.WithCancel(context.Background())
center := &DataDispatchCenter{
ctx: ctx,
cancelFunc: cancelFunc,
addChannel: make(chan *_CallbackBox, 10),
eventChannel: make(chan IEvent),
}
go center.doEventLoop()
return center
}

//DataDispatchCenter 數(shù)據(jù)廣播中心
type DataDispatchCenter struct {
addChannel chan *_CallbackBox
eventChannel chan IEvent

ctx context.Context
cancelFunc context.CancelFunc
}

DataDispatchCenter這個(gè)結(jié)構(gòu)體中有兩個(gè)chan,一個(gè)是addChannel,一個(gè)是eventChannel。

addChannel

接受_CallbackBox,這個(gè)_CallbackBox提供了邏輯處理Handler

eventChannel

接受IEvent,觸發(fā)

doEventLoop邏輯:

NewEventDispatchCenter方法中啟動的doEventLoop,邏輯相對簡單,創(chuàng)建的channels用于存儲addChannel發(fā)送過來的_CallbackBox,即事件處理Handler.當(dāng)eventChannel收到事件后,遍歷channels中的每一個(gè)_CallbackBox,并調(diào)用相應(yīng)的Handler處理。

doEventLoop狀態(tài)圖

具體代碼可以查看https://github.com/eolinker/eosc/blob/main/common/dispatcher/data-dispatch.go#L48。

doDataLoop邏輯:

NewDataDispatchCenter方法中啟動的doDataLoop,這個(gè)邏輯稍微復(fù)雜點(diǎn)。其實(shí)它的大致流程和doEventLoop,不同的是每個(gè)新增加的_CallbackBox,需要對當(dāng)前接收并緩存的所有Event鍵值對進(jìn)行處理。而doEventLoop是不會的,新增加的_CallbackBox,只會對在它之后接收的Event生效。下面的代碼InitEvent(data.GET())很有意思。

  1. 首先InitEvent實(shí)現(xiàn)了IEvent接口,是一種IEvent。
  2. type InitEvent map[string]map[string][]byte (代碼鏈接:https://github.com/eolinker/eosc/blob/main/common/dispatcher/data.go#L88)InitEvent是一個(gè)map,可以通過InitEvent(data.GET())初始化。
func (d *DataDispatchCenter) doDataLoop() {
data := NewMyData(nil)
channels := make([]*_CallbackBox, 0, 10)
isInit := false
for {
select {
case event, ok := <-d.eventChannel:
if ok {
isInit = true
data.DoEvent(event)
next := channels[:0]
for _, c := range channels {
if err := c.handler(event); err != nil {
close(c.closeChan)
continue
}
next = append(next, c)
}
channels = next
}
case hbox, ok := <-d.addChannel:
{
if ok {
if !isInit {
channels = append(channels, hbox)
} else {
if err := hbox.handler(InitEvent(data.GET())); err == nil {
channels = append(channels, hbox)
}
}
}
}
}

}
}

應(yīng)用

創(chuàng)建EventServer。

type EventServer struct {
IDispatchCenter
}
func NewEventServer() *EventServer {
es := &EventServer{
IDispatchCenter: NewDataDispatchCenter(),
}
return es
}

定義事件。

type MyEvent struct {
namespace string
key string
event string
data []byte
}

func (m *MyEvent) Namespace() string {
return m.namespace
}

func (m *MyEvent) Event() string {
return m.event
}

func (m *MyEvent) Key() string {
return m.key
}

func (m *MyEvent) Data() []byte {
return m.data
}

定義Handler并注冊。

func Handler(e IEvent) error {
//根據(jù)自己的業(yè)務(wù)要求
}
es.Register(Handler)

發(fā)送事件。

es.Send(&MyEvent{
namespace: "a",
key: "b",
event: "set",
data: []byte(fmt.Sprint(index)),
})
責(zé)任編輯:姜華 來源: 今日頭條
相關(guān)推薦

2023-04-03 08:39:33

中間件go語言

2024-06-06 09:47:56

2023-03-27 00:20:48

2024-06-11 00:05:00

CasaOS云存儲管理

2022-05-19 14:14:26

go語言限流算法

2024-08-09 10:59:01

KubernetesSidecar模式

2024-03-19 10:45:31

HTTPSGo語言

2020-08-12 08:56:30

代碼凱撒密碼函數(shù)

2022-11-01 18:29:25

Go語言排序算法

2023-05-08 07:55:05

快速排序Go 語言

2024-08-29 13:23:04

WindowsGo語言

2021-07-26 09:47:38

Go語言C++

2024-07-30 08:12:04

Java消息go

2012-03-13 10:40:58

Google Go

2021-03-01 18:35:18

Go語言虛擬機(jī)

2021-03-01 21:59:25

編程語言GoCX

2022-07-19 12:25:29

Go

2014-12-26 09:52:08

Go

2024-08-26 14:32:43

2024-05-08 09:40:43

Go語言分布式存儲
點(diǎn)贊
收藏

51CTO技術(shù)棧公眾號