The traditional approach to implementing an event bus involves using callbacks. Subscribers usually implement an interface and then the event bus propagates data via the interface.
type DataEvent struct {
Data interface{}
Topic string
}
// DataChannel is a channel which can accept an DataEvent
type DataChannel chan DataEvent
// DataChannelSlice is a slice of DataChannels
type DataChannelSlice [] DataChannel
关键的数据结构
// EventBus stores the information about subscribers interested for // a particular topic
type EventBus struct {
subscribers map[string]DataChannelSlice
rm sync.RWMutex
}
publisher
func (eb *EventBus)Publish(topic string, data interface{}) {
eb.rm.RLock()
if chans, found := eb.subscribers[topic]; found {
go func(data DataEvent, dataChannelSlices DataChannelSlice) {
for _, ch := range dataChannelSlices {
ch <- data
}
}(DataEvent{Data: data, Topic:topic}, chans)
}
eb.rm.RUnlock()
}
代替callback?
func main() {
ch1 := make(chan DataEvent)
ch2 := make(chan DataEvent)
ch3 := make(chan DataEvent)
eb.Subscribe("topic1", ch1)
eb.Subscribe("topic2", ch2)
eb.Subscribe("topic2", ch3)
go publisTo("topic1", "Hi topic 1")
go publisTo("topic2", "Welcome to topic 2")
for {
select {
case d := <-ch1:
go printDataEvent("ch1", d)
case d := <-ch2:
go printDataEvent("ch2", d)
case d := <-ch3:
go printDataEvent("ch3", d)
}
}
}
从代码看来,subscribe部分,也只是接受数据,后面可以统一定义操作?再做一层
I’ve used a slice to store all the subscribers for a topic. This is used to simplifying the article. This need to be replaced with a SET so no duplicate entries present in the list