Skip to content

Commit

Permalink
added processor feature
Browse files Browse the repository at this point in the history
  • Loading branch information
stanipetrosyan committed Nov 20, 2023
1 parent 42691a9 commit b0bd6ab
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 6 deletions.
24 changes: 18 additions & 6 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@ package goeventbus
type Channel interface {
Publisher() Publisher
Subscriber() Subscriber
Processor(predicate func(message Message) bool) Channel
}

type defaultChannel struct {
address string
ch chan Message
chs []chan Message
address string
ch chan Message
chs []chan Message
predicate func(message Message) bool
}

func (c *defaultChannel) Listen() {
Expand All @@ -18,9 +20,12 @@ func (c *defaultChannel) Listen() {
return
}

for _, item := range c.chs {
item <- data
if c.predicate(data) {
for _, item := range c.chs {
item <- data
}
}

}
}

Expand All @@ -35,9 +40,16 @@ func (c *defaultChannel) Subscriber() Subscriber {
return NewSubscriber(ch)
}

func (c *defaultChannel) Processor(predicate func(message Message) bool) Channel {
c.predicate = predicate

return c
}

func NewChannel(address string) Channel {
ch := make(chan Message)
channel := defaultChannel{address: address, ch: ch, chs: []chan Message{}}
predicate := func(message Message) bool { return true }
channel := defaultChannel{address: address, ch: ch, chs: []chan Message{}, predicate: predicate}
go channel.Listen()

return &channel
Expand Down
18 changes: 18 additions & 0 deletions eventbus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,24 @@ func TestInBoundInterceptorHandler(t *testing.T) {
wg.Wait()
})

t.Run("should pass interceptor handler2", func(t *testing.T) {
wg.Add(2)

eventBus.Channel("my-channel").Subscriber().Listen(func(context Context) {
assert.Equal(t, "Hi There", context.Result().Data)
wg.Done()
})

eventBus.Channel("my-channel").Processor(func(message Message) bool {
wg.Done()
return message.Data == "Hi There"
})

message := CreateMessage().SetBody("Hi There")
eventBus.Channel("my-channel").Publisher().Publish(message)
wg.Wait()
})

t.Run("should pass message to handler created afted interceptor", func(t *testing.T) {
wg.Add(2)

Expand Down

0 comments on commit b0bd6ab

Please sign in to comment.