Skip to content
gqlxj1987's Blog
Go back

Event bus in go

Edit page

原文链接

The traditional approach to implementing an event bus involves using callbacks. Subscribers usually implement an interface and then the event bus propagates data via the interface.

type DataEvent struct {
   Data interface{}
   Topic string
}

// DataChannel is a channel which can accept an DataEvent
type DataChannel chan DataEvent
// DataChannelSlice is a slice of DataChannels
type DataChannelSlice [] DataChannel

关键的数据结构

// EventBus stores the information about subscribers interested for // a particular topic
type EventBus struct {
   subscribers map[string]DataChannelSlice
   rm sync.RWMutex
}

publisher

func (eb *EventBus)Publish(topic string, data interface{}) {
   eb.rm.RLock()
   if chans, found := eb.subscribers[topic]; found {
      go func(data DataEvent, dataChannelSlices DataChannelSlice) {
         for _, ch := range dataChannelSlices {
            ch <- data
         }
      }(DataEvent{Data: data, Topic:topic}, chans)
   }
   eb.rm.RUnlock()
}

代替callback?

func main() {
	ch1 := make(chan DataEvent)
	ch2 := make(chan DataEvent)
	ch3 := make(chan DataEvent)

	eb.Subscribe("topic1", ch1)
	eb.Subscribe("topic2", ch2)
	eb.Subscribe("topic2", ch3)

	go publisTo("topic1", "Hi topic 1")
	go publisTo("topic2", "Welcome to topic 2")

	for {
		select {
		case d := <-ch1:
			go printDataEvent("ch1", d)
		case d := <-ch2:
			go printDataEvent("ch2", d)
		case d := <-ch3:
			go printDataEvent("ch3", d)
		}
	}
}

从代码看来,subscribe部分,也只是接受数据,后面可以统一定义操作?再做一层

I’ve used a slice to store all the subscribers for a topic. This is used to simplifying the article. This need to be replaced with a SET so no duplicate entries present in the list


Edit page
Share this post on:

Previous Post
Json Style
Next Post
Knative building containers on k8s