Skip to content

Commit

Permalink
test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronc committed Jul 9, 2024
1 parent f670a70 commit 47a9751
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 23 deletions.
14 changes: 8 additions & 6 deletions schema/appdata/async.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,19 @@
package appdata

import "context"

// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil
// except for Commit which will return an error or nil once all listeners have processed the commit. The doneChan
// except for Commit which will return an error or nil once all listeners have processed the commit. The context
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the
// channels used to send events to the listeners.
func AsyncListenerMux(listeners []Listener, bufferSize int, doneChan <-chan struct{}) Listener {
func AsyncListenerMux(listeners []Listener, bufferSize int, ctx context.Context) Listener {
asyncListeners := make([]Listener, len(listeners))
commitChans := make([]chan error, len(listeners))
for i, l := range listeners {
commitChan := make(chan error)
commitChans[i] = commitChan
asyncListeners[i] = AsyncListener(l, bufferSize, commitChan, doneChan)
asyncListeners[i] = AsyncListener(l, bufferSize, commitChan, ctx)
}
mux := ListenerMux(asyncListeners...)
muxCommit := mux.Commit
Expand Down Expand Up @@ -39,11 +41,11 @@ func AsyncListenerMux(listeners []Listener, bufferSize int, doneChan <-chan stru
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine
// that is being used for listening will exit when doneChan is closed and no more events will be received by the listener.
// that is being used for listening will exit when context.Done() returns and no more events will be received by the listener.
// bufferSize is the size of the buffer for the channel that is used to send events to the listener.
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly
// via its Commit callback.
func AsyncListener(listener Listener, bufferSize int, commitChan chan<- error, doneChan <-chan struct{}) Listener {
func AsyncListener(listener Listener, bufferSize int, commitChan chan<- error, ctx context.Context) Listener {
packetChan := make(chan Packet, bufferSize)
res := Listener{}

Expand Down Expand Up @@ -71,7 +73,7 @@ func AsyncListener(listener Listener, bufferSize int, commitChan chan<- error, d
}
}

case <-doneChan:
case <-ctx.Done():
close(packetChan)
return
}
Expand Down
34 changes: 17 additions & 17 deletions schema/appdata/async_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package appdata

import (
"context"
"fmt"
"testing"
)

func TestAsyncListenerMux(t *testing.T) {
t.Run("empty", func(t *testing.T) {
listener := AsyncListenerMux([]Listener{{}, {}}, 16, make(chan struct{}))
listener := AsyncListenerMux([]Listener{{}, {}}, 16, context.Background())

if listener.InitializeModuleData != nil {
t.Error("expected nil")
Expand All @@ -31,16 +32,16 @@ func TestAsyncListenerMux(t *testing.T) {
// commit is not expected to be nil
})

t.Run("call done", func(t *testing.T) {
doneChan := make(chan struct{})
t.Run("call cancel", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
var calls1, calls2 []string
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux([]Listener{listener1, listener2}, 16, doneChan)
res := AsyncListenerMux([]Listener{listener1, listener2}, 16, ctx)

callAllCallbacksOnces(t, res)

Expand All @@ -57,27 +58,27 @@ func TestAsyncListenerMux(t *testing.T) {
checkExpectedCallOrder(t, calls1, expectedCalls)
checkExpectedCallOrder(t, calls2, expectedCalls)

calls1 = nil
calls2 = nil

doneChan <- struct{}{}
cancel()

// expect a panic if we try to write to the now closed channels
defer func() {
if err := recover(); err == nil {
t.Fatalf("expected panic")
}
}()
callAllCallbacksOnces(t, res)
//
//checkExpectedCallOrder(t, calls1, nil)
//checkExpectedCallOrder(t, calls2, nil)
})
}

func TestAsyncListener(t *testing.T) {
t.Run("call done", func(t *testing.T) {
t.Run("call cancel", func(t *testing.T) {
commitChan := make(chan error)
doneChan := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})
res := AsyncListener(listener, 16, commitChan, doneChan)
res := AsyncListener(listener, 16, commitChan, ctx)

callAllCallbacksOnces(t, res)

Expand All @@ -98,7 +99,7 @@ func TestAsyncListener(t *testing.T) {

calls = nil

doneChan <- struct{}{}
cancel()

callAllCallbacksOnces(t, res)

Expand All @@ -107,7 +108,6 @@ func TestAsyncListener(t *testing.T) {

t.Run("error", func(t *testing.T) {
commitChan := make(chan error)
doneChan := make(chan struct{})
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
Expand All @@ -117,7 +117,7 @@ func TestAsyncListener(t *testing.T) {
return fmt.Errorf("error")
}

res := AsyncListener(listener, 16, commitChan, doneChan)
res := AsyncListener(listener, 16, commitChan, context.Background())

callAllCallbacksOnces(t, res)

Expand Down

0 comments on commit 47a9751

Please sign in to comment.