Skip to content

Commit

Permalink
Merge pull request #1 from stanipetrosyan/refactor-topic
Browse files Browse the repository at this point in the history
Refactor topic
  • Loading branch information
stanipetrosyan authored Sep 11, 2023
2 parents 36dadff + 0c9c10b commit d9c6c93
Show file tree
Hide file tree
Showing 9 changed files with 225 additions and 160 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ address := "topic"
options := goeventbus.NewMessageOptions().AddHeader("header", "value")
message := goeventbus.CreateMessage().SetBody("Hi Topic").SetOptions(options)

eventbus.Subscribe(address, func(dc goeventbus.DeliveryContext) {
eventbus.Subscribe(address, func(dc goeventbus.ConsumerContext) {
fmt.Printf("Message %s\n", dc.Result().Data)
})

Expand All @@ -53,12 +53,12 @@ var eventbus = goeventbus.NewEventBus()

address := "topic"

eventbus.Subscribe(address, func(dc goeventbus.DeliveryContext) {
eventbus.Subscribe(address, func(dc goeventbus.ConsumerContext) {
fmt.Printf("Message %s\n", dc.Result().Data)
dc.Reply("Hi from topic")
})

eventbus.Request(address, "Hi Topic", func(dc goeventbus.DeliveryContext) {
eventbus.Request(address, "Hi Topic", func(dc goeventbus.ConsumerContext) {
dc.Handle(func(message Message) {
fmt.Printf("Message %s\n", message.Data)
})
Expand Down Expand Up @@ -88,7 +88,7 @@ eventBus.Publish("address", message)

```go

eventbus.AddInBoundInterceptor("topic1", func(context goeventbus.DeliveryContext) {
eventbus.AddInBoundInterceptor("topic1", func(context goeventbus.InterceptonContext) {
if (some logic)
context.Next()
})
Expand Down
59 changes: 59 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package goeventbus

type Context interface {
Result() Message
}

type ConsumerContext struct {
Ch chan Message
message Message
}

func (d *ConsumerContext) Reply(data any) {
d.Ch <- Message{Data: data}
}

func (d *ConsumerContext) Result() Message {
return d.message
}

func (d *ConsumerContext) Handle(consume func(message Message)) {
go func(ch chan Message) {
for data := range ch {
consume(data)
}
}(d.Ch)
}

func (d ConsumerContext) SetData(msg Message) ConsumerContext {
d.message = msg
return d
}

func NewConsumerContext(ch chan Message) ConsumerContext {
return ConsumerContext{Ch: ch}
}

type InterceptorContext struct {
message Message
topic *Topic
}

func (d *InterceptorContext) Next() {
for _, item := range d.topic.GetChannels() {
item <- d.message
}
}

func (d *InterceptorContext) Result() Message {
return d.message
}

func (d InterceptorContext) SetData(msg Message) InterceptorContext {
d.message = msg
return d
}

func NewInterceptorContext(topic *Topic) InterceptorContext {
return InterceptorContext{topic: topic}
}
51 changes: 0 additions & 51 deletions delivery_context.go

This file was deleted.

40 changes: 17 additions & 23 deletions eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
)

type EventBus interface {
Subscribe(address string, callback func(context DeliveryContext))
AddInBoundInterceptor(address string, callback func(context DeliveryContext))
Subscribe(address string, callback func(context ConsumerContext))
AddInBoundInterceptor(address string, callback func(context InterceptorContext))
Publish(address string, message Message)
Request(address string, message Message, callback func(context DeliveryContext))
Request(address string, message Message, callback func(context ConsumerContext))
}

type DefaultEventBus struct {
Expand All @@ -17,38 +17,31 @@ type DefaultEventBus struct {
wg sync.WaitGroup
}

func (e *DefaultEventBus) Subscribe(address string, callback func(context DeliveryContext)) {
func (e *DefaultEventBus) Subscribe(address string, callback func(context ConsumerContext)) {
_, exists := e.topics[address]
if !exists {
e.topics[address] = NewTopic(address)
}

handler := NewConsumer(address, callback)
channels := []chan Message{handler.Ch}
handler = handler.SetContext(NewDeliveryContext(channels))

e.rm.Lock()
e.topics[address].AddHandler(handler)
handler := e.topics[address].AddConsumer(callback)
e.rm.Unlock()
go e.handle(handler)
go e.handle(Handler(handler))
}

func (e *DefaultEventBus) AddInBoundInterceptor(address string, callback func(context DeliveryContext)) {
func (e *DefaultEventBus) AddInBoundInterceptor(address string, callback func(context InterceptorContext)) {
_, exists := e.topics[address]
if !exists {
e.topics[address] = NewTopic(address)
}

channels := e.topics[address].GetChannels()
handler := NewInterceptor(address, callback).SetContext(NewDeliveryContext(channels))

e.rm.Lock()
e.topics[address].AddHandler(handler)
handler := e.topics[address].AddInterceptor(callback)
e.rm.Unlock()
go e.handle(handler)
}

func (e *DefaultEventBus) handle(handler *Handler) {
func (e *DefaultEventBus) handle(handler Handler) {
e.wg.Add(1)
go handler.Handle(&e.wg)
e.wg.Wait()
Expand All @@ -62,14 +55,15 @@ func (e *DefaultEventBus) Publish(address string, message Message) {
if !exists {
return
}

for _, handler := range topic.GetHandlers() {
if !handler.Closed {
handler.Ch <- message
if !handler.Closed() {
handler.Chain() <- message
}
}
}

func (e *DefaultEventBus) Request(address string, message Message, callback func(context DeliveryContext)) {
func (e *DefaultEventBus) Request(address string, message Message, callback func(context ConsumerContext)) {
e.rm.Lock()
defer e.rm.Unlock()

Expand All @@ -78,10 +72,10 @@ func (e *DefaultEventBus) Request(address string, message Message, callback func
return
}
for _, item := range topic.GetHandlers() {
go func(handler *Handler, data Message) {
if !handler.Closed {
handler.Ch <- data
callback(handler.Context)
go func(handler Handler, data Message) {
if !handler.Closed() {
handler.Chain() <- data
callback(NewConsumerContext(handler.Chain()))
}
}(item, message)
}
Expand Down
60 changes: 51 additions & 9 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSubscribeHandler(t *testing.T) {
t.Run("should handle a message", func(t *testing.T) {
wg.Add(1)

eventBus.Subscribe("address", func(context DeliveryContext) {
eventBus.Subscribe("address", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})
Expand All @@ -28,12 +28,12 @@ func TestSubscribeHandler(t *testing.T) {
t.Run("should handle message for more of one handlers", func(t *testing.T) {
wg.Add(2)

eventBus.Subscribe("newaddress", func(context DeliveryContext) {
eventBus.Subscribe("newaddress", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

eventBus.Subscribe("newaddress", func(context DeliveryContext) {
eventBus.Subscribe("newaddress", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})
Expand All @@ -42,7 +42,6 @@ func TestSubscribeHandler(t *testing.T) {
eventBus.Publish("newaddress", message)
wg.Wait()
})

}

func TestRequestReplyHandler(t *testing.T) {
Expand All @@ -51,12 +50,12 @@ func TestRequestReplyHandler(t *testing.T) {
t.Run("should send a request and reply", func(t *testing.T) {
wg.Add(1)

eventBus.Subscribe("address", func(context DeliveryContext) {
eventBus.Subscribe("address", func(context ConsumerContext) {
context.Reply("Hello")
})

message := CreateMessage().SetBody("Hi There")
eventBus.Request("address", message, func(context DeliveryContext) {
eventBus.Request("address", message, func(context ConsumerContext) {
context.Handle(func(message Message) {
assert.Equal(t, "Hello", message.Data)
wg.Done()
Expand All @@ -72,12 +71,12 @@ func TestInBoundInterceptorHandler(t *testing.T) {
t.Run("should pass interceptor handler", func(t *testing.T) {
wg.Add(2)

eventBus.Subscribe("address", func(context DeliveryContext) {
eventBus.Subscribe("address", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

eventBus.AddInBoundInterceptor("address", func(context DeliveryContext) {
eventBus.AddInBoundInterceptor("address", func(context InterceptorContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
context.Next()
Expand All @@ -87,13 +86,56 @@ func TestInBoundInterceptorHandler(t *testing.T) {
eventBus.Publish("address", message)
wg.Wait()
})

t.Run("should pass message to handler created afted interceptor", func(t *testing.T) {
wg.Add(2)

eventBus.AddInBoundInterceptor("newAddress", func(context InterceptorContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
context.Next()
})

eventBus.Subscribe("newAddress", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

message := CreateMessage().SetBody("Hi There")
eventBus.Publish("newAddress", message)
wg.Wait()
})

t.Run("should handle more interceptor", func(t *testing.T) {
wg.Add(3)

eventBus.Subscribe("anotherAdress", func(context ConsumerContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

eventBus.AddInBoundInterceptor("anotherAdress", func(context InterceptorContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
context.Next()
})

eventBus.AddInBoundInterceptor("anotherAdress", func(context InterceptorContext) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

message := CreateMessage().SetBody("Hi There")
eventBus.Publish("anotherAdress", message)
wg.Wait()
})
}

func TestMessageOptions(t *testing.T) {
var eventBus = NewEventBus()

wg.Add(1)
eventBus.Subscribe("address", func(context DeliveryContext) {
eventBus.Subscribe("address", func(context ConsumerContext) {
assert.Equal(t, "value", context.Result().Options.Header("key"))
wg.Done()
})
Expand Down
Loading

0 comments on commit d9c6c93

Please sign in to comment.