Skip to content

Commit

Permalink
added abstract on handler creation for event bus to topic
Browse files Browse the repository at this point in the history
  • Loading branch information
stanipetrosyan committed Jul 21, 2023
1 parent ea6bfd7 commit 9b00a02
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 34 deletions.
36 changes: 16 additions & 20 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
type EventBus interface {
Subscribe(address string, consumer func(context DeliveryContext))
SubscribeOnce(address string, consumer func(context DeliveryContext))
AddInBoundInterceptor(address string, consumer func(context DeliveryContext))
Publish(address string, message Message)
Request(address string, message Message, consumer func(context DeliveryContext))
AddInBoundInterceptor(address string, consumer func(context DeliveryContext))
}

type DefaultEventBus struct {
Expand All @@ -19,11 +19,15 @@ type DefaultEventBus struct {
}

func (e *DefaultEventBus) Subscribe(address string, consumer func(context DeliveryContext)) {
e.subscribe(address, consumer, false)
e.subscribe(address, consumer, false, false)
}

func (e *DefaultEventBus) SubscribeOnce(address string, consumer func(context DeliveryContext)) {
e.subscribe(address, consumer, true)
e.subscribe(address, consumer, true, false)
}

func (e *DefaultEventBus) AddInBoundInterceptor(address string, consumer func(context DeliveryContext)) {
e.subscribe(address, consumer, false, true)
}

func (e *DefaultEventBus) Publish(address string, message Message) {
Expand All @@ -49,7 +53,7 @@ func (e *DefaultEventBus) Request(address string, message Message, consumer func
if !exists {
return
}
for _, item := range topic.Handlers {
for _, item := range topic.GetHandlers() {
go func(handler *Handler, data Message) {
if !handler.closed {
handler.Ch <- data
Expand All @@ -60,32 +64,24 @@ func (e *DefaultEventBus) Request(address string, message Message, consumer func

}

func (e *DefaultEventBus) AddInBoundInterceptor(address string, consumer func(context DeliveryContext)) {
func (e *DefaultEventBus) subscribe(address string, consumer func(context DeliveryContext), once bool, interceptor bool) {
_, exists := e.topics[address]
if !exists {
e.topics[address] = NewTopic(address)
}

ch := make(chan Message)
context := NewDeliveryContext(e.topics[address].GetChannels())
handler := Handler{Ch: ch, Consumer: consumer, Context: context, Address: address, closed: false}

e.rm.Lock()
e.topics[address].AddInterceptor(handler)
e.rm.Unlock()

go e.handle(&handler, false)
}
var channels []chan Message = []chan Message{ch}
var handlerType HandlerType = Consumer

func (e *DefaultEventBus) subscribe(address string, consumer func(context DeliveryContext), once bool) {
_, exists := e.topics[address]
if !exists {
e.topics[address] = NewTopic(address)
if interceptor {
channels = e.topics[address].GetChannels()
handlerType = Interceptor
}

ch := make(chan Message)
context := NewDeliveryContext([]chan Message{ch})
handler := Handler{Ch: ch, Consumer: consumer, Context: context, Address: address, closed: false}
context := NewDeliveryContext(channels)
handler := Handler{Ch: ch, Consumer: consumer, Context: context, Address: address, closed: false, Type: handlerType}

e.rm.Lock()
e.topics[address].AddHandler(handler)
Expand Down
7 changes: 6 additions & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ func main() {
})

eventbus.AddInBoundInterceptor("topic1", func(context goeventbus.DeliveryContext) {
context.Next()
if context.Result().Data == "Hi topic 1" {
context.Next()
} else {
println("Message not passed")
}
})

eventbus.Subscribe("topic2", func(dc goeventbus.DeliveryContext) {
Expand All @@ -41,6 +45,7 @@ func main() {
})

go publishTo("topic1", "Hi topic 1")
go publishTo("topic1", "Message to block")
go publishTo("topic2", "Hi topic 2")
go publishTo("topic3", "Hi topic 3")
go publishTo("topic4", "Hi topic 4")
Expand Down
35 changes: 22 additions & 13 deletions topic.go
Original file line number Diff line number Diff line change
@@ -1,39 +1,48 @@
package goeventbus

type Topic struct {
Address string
Handlers []*Handler
Interceptor []*Handler
Address string
Consumers []*Handler
Interceptors []*Handler
}

func (t *Topic) AddInterceptor(interceptor Handler) *Handler {
t.Interceptor = append(t.Interceptor, &interceptor)
return &interceptor
func (t *Topic) AddHandler(handler Handler) {
switch handler.Type {
case Consumer:
t.addConsumer(handler)
case Interceptor:
t.addInterceptor(handler)
}
}

func (t *Topic) AddHandler(handler Handler) *Handler {
t.Handlers = append(t.Handlers, &handler)
func (t *Topic) addConsumer(handler Handler) *Handler {
t.Consumers = append(t.Consumers, &handler)
return &handler
}

func (t *Topic) addInterceptor(interceptor Handler) *Handler {
t.Interceptors = append(t.Interceptors, &interceptor)
return &interceptor
}

func (t *Topic) GetHandlers() []*Handler {
if len(t.Interceptor) > 0 {
return t.Interceptor
if len(t.Interceptors) > 0 {
return t.Interceptors
}

return t.Handlers
return t.Consumers
}

func (t *Topic) GetChannels() []chan Message {
chs := []chan Message{}

for _, item := range t.Handlers {
for _, item := range t.Consumers {
chs = append(chs, item.Ch)
}

return chs
}

func NewTopic(address string) *Topic {
return &Topic{Address: address, Handlers: []*Handler{}, Interceptor: []*Handler{}}
return &Topic{Address: address, Consumers: []*Handler{}, Interceptors: []*Handler{}}
}

0 comments on commit 9b00a02

Please sign in to comment.