Skip to content

Commit

Permalink
feat: expand the API of SyncDispatcher (#191)
Browse files Browse the repository at this point in the history
* feat: add more methods

* feat: expand the api of SyncDispatcher

* feat: expand the api of SyncDispatcher

* docs: comment
  • Loading branch information
Reasno authored Sep 10, 2021
1 parent ecc44a5 commit 73bd5aa
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 7 deletions.
117 changes: 114 additions & 3 deletions events/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package events

import (
"context"
"errors"
"sync"

"github.com/DoNewsCode/core/contract"
)

// ErrNotSubscribed is returned when trying to unsubscribe a listener that not subscribing.
var ErrNotSubscribed = errors.New("not subscribed")

var _ contract.Dispatcher = (*SyncDispatcher)(nil)

// SyncDispatcher is a contract.Dispatcher implementation that dispatches events synchronously.
// SyncDispatcher is safe for concurrent use.
type SyncDispatcher struct {
Expand All @@ -16,7 +22,7 @@ type SyncDispatcher struct {

// Dispatch dispatches events synchronously. If any listener returns an error,
// abort the process immediately and return that error to caller.
func (d *SyncDispatcher) Dispatch(ctx context.Context, topic interface{}, event interface{}) error {
func (d *SyncDispatcher) Dispatch(ctx context.Context, topic interface{}, payload interface{}) error {
d.rwLock.RLock()
listeners, ok := d.registry[topic]
d.rwLock.RUnlock()
Expand All @@ -25,14 +31,16 @@ func (d *SyncDispatcher) Dispatch(ctx context.Context, topic interface{}, event
return nil
}
for _, listener := range listeners {
if err := listener.Process(ctx, event); err != nil {
if err := listener.Process(ctx, payload); err != nil {
return err
}
}
return nil
}

// Subscribe subscribes the listener to the dispatcher.
// Subscribe subscribes the listener to the dispatcher. The listeners will not be
// deduplicated. If subscribed more than once, the event will be added and
// processed more than once.
func (d *SyncDispatcher) Subscribe(listener contract.Listener) {
d.rwLock.Lock()
defer d.rwLock.Unlock()
Expand All @@ -42,3 +50,106 @@ func (d *SyncDispatcher) Subscribe(listener contract.Listener) {
}
d.registry[listener.Listen()] = append(d.registry[listener.Listen()], listener)
}

// SubscribeOnce subscribes the listener to the dispatcher and unsubscribe the
// listener once after the event is processed by the listener.
func (d *SyncDispatcher) SubscribeOnce(listener contract.Listener) {
d.rwLock.Lock()
defer d.rwLock.Unlock()

if d.registry == nil {
d.registry = make(map[interface{}][]contract.Listener)
}
ol := &onceListener{Listener: listener}
ol.unsub = func() {
d.Unsubscribe(ol)
}
d.registry[listener.Listen()] = append(d.registry[listener.Listen()], ol)
}

// Prepend adds the listener to the beginning of the listeners queue for the
// topic it listens to. The listeners will not be deduplicated. If subscribed
// more than once, the event will be added and processed more than once.
func (d *SyncDispatcher) Prepend(listener contract.Listener) {
d.rwLock.Lock()
defer d.rwLock.Unlock()

if d.registry == nil {
d.registry = make(map[interface{}][]contract.Listener)
}
d.registry[listener.Listen()] = append([]contract.Listener{listener}, d.registry[listener.Listen()]...)
}

// PrependOnce adds a one-time listener function for the event it listens to, at
// the top of the listener queue waiting for the same event. The listener will be
// unsubscribed once after the event is processed by the listener .
func (d *SyncDispatcher) PrependOnce(listener contract.Listener) {
d.rwLock.Lock()
defer d.rwLock.Unlock()

if d.registry == nil {
d.registry = make(map[interface{}][]contract.Listener)
}
ol := &onceListener{Listener: listener}
ol.unsub = func() {
d.Unsubscribe(ol)
}
d.registry[listener.Listen()] = append([]contract.Listener{ol}, d.registry[listener.Listen()]...)
}

// Unsubscribe unsubscribes the listener from the dispatcher. If the listener doesn't exist, ErrNotSubscribed will be returned.
// If there are multiple instance of the listener provided subscribed, only one of the will be unsubscribed.
func (d *SyncDispatcher) Unsubscribe(listener contract.Listener) error {
d.rwLock.Lock()
defer d.rwLock.Unlock()

if d.registry == nil {
d.registry = make(map[interface{}][]contract.Listener)
}

event := listener.Listen()
lns := d.registry[event]

for i := range lns {
if ol, ok := lns[i].(interface {
Equals(anotherListener contract.Listener) bool
}); ok && ol.Equals(listener) {
removeListener(&lns, i)
d.registry[event] = lns
return nil
}
if lns[i] == listener {
removeListener(&lns, i)
d.registry[event] = lns
return nil
}
}
return ErrNotSubscribed
}

// RemoveAllListeners removes all listeners for a given event.
func (d *SyncDispatcher) RemoveAllListeners(topic interface{}) {
d.rwLock.Lock()
defer d.rwLock.Unlock()

delete(d.registry, topic)
}

// ListenerCount returns the number of listeners for a given event.
func (d *SyncDispatcher) ListenerCount(topic interface{}) int {
d.rwLock.RLock()
defer d.rwLock.RUnlock()

if d.registry == nil {
d.registry = make(map[interface{}][]contract.Listener)
}
return len(d.registry[topic])
}

func removeListener(lns *[]contract.Listener, i int) {
if i+1 < len(*lns) {
*lns = append((*lns)[0:i], (*lns)[i+1:]...)
return
}
*lns = (*lns)[0:i]
}
136 changes: 136 additions & 0 deletions events/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"testing"

"github.com/DoNewsCode/core/contract"
"github.com/stretchr/testify/assert"
)

Expand Down Expand Up @@ -133,3 +134,138 @@ func TestDispatcher(t *testing.T) {
})
}
}

func TestUnsubscribeDuringDispatching(t *testing.T) {
var (
l1Called bool
l2Called bool
)
dispatcher := SyncDispatcher{}

l1 := Listen("foo", func(ctx context.Context, event interface{}) error {
l1Called = true
return nil
})
l2 := Listen("foo", func(ctx context.Context, event interface{}) error {
l2Called = true
// If a user unsubscribe during proccessing a dispatched events, no dead lock should occur.
dispatcher.Unsubscribe(l1)
return nil
})
dispatcher.Subscribe(l2)
dispatcher.Subscribe(l1)
dispatcher.Dispatch(context.Background(), "foo", nil)
count := dispatcher.ListenerCount("foo")
assert.Equal(t, 1, count)
assert.True(t, l1Called)
assert.True(t, l2Called)
}

func TestSyncDispatcher_SubscribeAndUnsubscribe(t *testing.T) {
t.Parallel()
cases := []struct {
name string
process func(dispatcher *SyncDispatcher, listener contract.Listener)
count int
}{{
"subscribe once",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.SubscribeOnce(listener)
},
1,
}, {
"subscribe once but unsubscribed before execute",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.SubscribeOnce(listener)
dispatcher.Unsubscribe(listener)
},
0,
}, {
"subscribed multiple times",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.Subscribe(listener)
dispatcher.Subscribe(listener)
},
4,
}, {
"subscribed 2 times but unsubscribed once",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.Subscribe(listener)
dispatcher.Subscribe(listener)
dispatcher.Unsubscribe(listener)
},
2,
}, {
"subscribed 2 times but unsubscribed all",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.Subscribe(listener)
dispatcher.Subscribe(listener)
dispatcher.RemoveAllListeners(listener.Listen())
},
0,
}}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
var count int
dispatcher := SyncDispatcher{}
l := Listen("foo", func(ctx context.Context, event interface{}) error {
count++
return nil
})
c.process(&dispatcher, l)
dispatcher.Dispatch(context.Background(), "foo", nil)
dispatcher.Dispatch(context.Background(), "foo", nil)
assert.Equal(t, c.count, count)
})
}
}

func TestSyncDispatcher_Prepend(t *testing.T) {
t.Parallel()
cases := []struct {
name string
process func(dispatcher *SyncDispatcher, listener contract.Listener)
order []int
}{{
"prepend",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.Prepend(listener)
},
[]int{2, 1, 2, 1},
}, {
"prepend once",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.PrependOnce(listener)
},
[]int{2, 1},
}, {
"subscribe",
func(dispatcher *SyncDispatcher, listener contract.Listener) {
dispatcher.Subscribe(listener)
},
[]int{1, 2, 1, 2},
}}

for _, c := range cases {
c := c
t.Run(c.name, func(t *testing.T) {
var order []int
dispatcher := SyncDispatcher{}
l1 := Listen("foo", func(ctx context.Context, event interface{}) error {
order = append(order, 1)
return nil
})
l2 := Listen("foo", func(ctx context.Context, event interface{}) error {
order = append(order, 2)
return nil
})
c.process(&dispatcher, l1)
c.process(&dispatcher, l2)
dispatcher.Dispatch(context.Background(), "foo", nil)
dispatcher.Dispatch(context.Background(), "foo", nil)
assert.Equal(t, c.order, order)
})
}
}
28 changes: 24 additions & 4 deletions events/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
var _ contract.Listener = (*ListenerFunc)(nil)

// Listen creates a functional listener in one line.
func Listen(topic interface{}, callback func(ctx context.Context, event interface{}) error) *ListenerFunc {
func Listen(topic interface{}, callback func(ctx context.Context, payload interface{}) error) *ListenerFunc {
return &ListenerFunc{
topic: topic,
callback: callback,
Expand All @@ -20,7 +20,7 @@ func Listen(topic interface{}, callback func(ctx context.Context, event interfac
// It listens to the given topic and then execute the callback.
type ListenerFunc struct {
topic interface{}
callback func(ctx context.Context, event interface{}) error
callback func(ctx context.Context, payload interface{}) error
}

// Listen implements contract.Listener
Expand All @@ -29,6 +29,26 @@ func (f *ListenerFunc) Listen() interface{} {
}

// Process implements contract.Listener
func (f *ListenerFunc) Process(ctx context.Context, event interface{}) error {
return f.callback(ctx, event)
func (f *ListenerFunc) Process(ctx context.Context, payload interface{}) error {
return f.callback(ctx, payload)
}

type onceListener struct {
unsub func()
contract.Listener
}

func (o *onceListener) Process(ctx context.Context, payload interface{}) error {
// Dispatcher is synchronous, so we don't need to lock.
defer o.unsub()
return o.Listener.Process(ctx, payload)
}

func (o *onceListener) Equals(listener contract.Listener) bool {
if l, ok := o.Listener.(interface {
Equals(anotherListener contract.Listener) bool
}); ok {
return l.Equals(listener)
}
return o.Listener == listener
}
45 changes: 45 additions & 0 deletions events/listener_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package events

import (
"context"
"testing"

"github.com/DoNewsCode/core/contract"
"github.com/stretchr/testify/assert"
)

type mockListener1 struct{}

func (m mockListener1) Listen() (topic interface{}) {
panic("implement me")
}

func (m mockListener1) Process(ctx context.Context, payload interface{}) error {
panic("implement me")
}

func (m mockListener1) Equals(listener contract.Listener) bool {
return true
}

type mockListener2 struct{}

func (m mockListener2) Listen() (topic interface{}) {
panic("implement me")
}

func (m mockListener2) Process(ctx context.Context, payload interface{}) error {
panic("implement me")
}

func (m mockListener2) Equals(listener contract.Listener) bool {
return false
}

func TestOnceListener(t *testing.T) {
once := onceListener{nil, mockListener1{}}
assert.True(t, once.Equals(mockListener1{}))

once = onceListener{nil, mockListener2{}}
assert.False(t, once.Equals(mockListener2{}))
}

0 comments on commit 73bd5aa

Please sign in to comment.