Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema/appdata): async listener mux'ing #20879

Merged
merged 17 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions schema/appdata/async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package appdata

// AsyncListenerMux returns a listener that forwards received events to all the provided listeners asynchronously
// with each listener processing in a separate go routine. All callbacks in the returned listener will return nil
// except for Commit which will return an error or nil once all listeners have processed the commit. The doneChan
// is used to signal that the listeners should stop listening and return. bufferSize is the size of the buffer for the
// channels used to send events to the listeners.
func AsyncListenerMux(listeners []Listener, bufferSize int, doneChan <-chan struct{}) Listener {
asyncListeners := make([]Listener, len(listeners))
commitChans := make([]chan error, len(listeners))
for i, l := range listeners {
commitChan := make(chan error)
commitChans[i] = commitChan
asyncListeners[i] = AsyncListener(l, bufferSize, commitChan, doneChan)
}
mux := ListenerMux(asyncListeners...)
muxCommit := mux.Commit
mux.Commit = func(data CommitData) error {
if muxCommit != nil {
err := muxCommit(data)
if err != nil {
return err
}
}

for _, commitChan := range commitChans {
err := <-commitChan
if err != nil {
return err
}
}
return nil
}

return mux
}

// AsyncListener returns a listener that forwards received events to the provided listener listening in asynchronously
// in a separate go routine. The listener that is returned will return nil for all methods including Commit and
// an error or nil will only be returned in commitChan once the sender has sent commit and the receiving listener has
// processed it. Thus commitChan can be used as a synchronization and error checking mechanism. The go routine
// that is being used for listening will exit when doneChan is closed and no more events will be received by the listener.
// bufferSize is the size of the buffer for the channel that is used to send events to the listener.
// Instead of using AsyncListener directly, it is recommended to use AsyncListenerMux which does coordination directly
// via its Commit callback.
func AsyncListener(listener Listener, bufferSize int, commitChan chan<- error, doneChan <-chan struct{}) Listener {
packetChan := make(chan Packet, bufferSize)
res := Listener{}

go func() {
var err error
for {
select {
case packet := <-packetChan:
if err != nil {
// if we have an error, don't process any more packets
// and return the error and finish when it's time to commit
if _, ok := packet.(CommitData); ok {
commitChan <- err
return
}
} else {
// process the packet
err = listener.SendPacket(packet)
// if it's a commit
if _, ok := packet.(CommitData); ok {
commitChan <- err
if err != nil {
return
}
}
}

case <-doneChan:
close(packetChan)
return
}
}
}()
Fixed Show fixed Hide fixed

if listener.InitializeModuleData != nil {
res.InitializeModuleData = func(data ModuleInitializationData) error {
packetChan <- data
return nil
}
}

if listener.StartBlock != nil {
res.StartBlock = func(data StartBlockData) error {
packetChan <- data
return nil
}
}

if listener.OnTx != nil {
res.OnTx = func(data TxData) error {
packetChan <- data
return nil
}
}

if listener.OnEvent != nil {
res.OnEvent = func(data EventData) error {
packetChan <- data
return nil
}
}

if listener.OnKVPair != nil {
res.OnKVPair = func(data KVPairData) error {
packetChan <- data
return nil
}
}

if listener.OnObjectUpdate != nil {
res.OnObjectUpdate = func(data ObjectUpdateData) error {
packetChan <- data
return nil
}
}

if listener.Commit != nil {
res.Commit = func(data CommitData) error {
packetChan <- data
return nil
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

res.callbacks := make(func(data Packet) error, 0)

How about having a general callback slice instead of all individual callbacks?

Copy link
Member Author

@aaronc aaronc Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I follow how this would work. Could you explain more?

Copy link
Contributor

@cool-develope cool-develope Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, we can keep only one callback function within Listener, since all callbacks StartBlock, OnTx ... are in the same format as func(packet Packet) error. And then within the callback implementation, we can deal with different packet type like ModuleInitializationData, StartBlockData, and so on using a switch statement. It will reduce the duplicated imp here, and in mux.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already defined the different packet data structures, so we don't need to distinguish the callback functions.

Copy link
Member Author

@aaronc aaronc Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The purpose of distinguishing the different callback functions is to signal what each listener is subscribing to. If listeners don't listen to events at all, then OnEvent should be nil and then the producers won't send any events. If there's only one callback function then a producer has to always send all packets even if the consumers are only interested in certain packet types. In the async case it simplifies what is sent over each packet channel to only the subscription callbacks that are non nil.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that makes sense!


return res
}
131 changes: 131 additions & 0 deletions schema/appdata/async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package appdata

import (
"fmt"
"testing"
)

func TestAsyncListenerMux(t *testing.T) {
t.Run("empty", func(t *testing.T) {
listener := AsyncListenerMux([]Listener{{}, {}}, 16, make(chan struct{}))

if listener.InitializeModuleData != nil {
t.Error("expected nil")
}
if listener.StartBlock != nil {
t.Error("expected nil")
}
if listener.OnTx != nil {
t.Error("expected nil")
}
if listener.OnEvent != nil {
t.Error("expected nil")
}
if listener.OnKVPair != nil {
t.Error("expected nil")
}
if listener.OnObjectUpdate != nil {
t.Error("expected nil")
}

// commit is not expected to be nil
})

t.Run("call done", func(t *testing.T) {
doneChan := make(chan struct{})
var calls1, calls2 []string
listener1 := callCollector(1, func(name string, _ int, _ Packet) {
calls1 = append(calls1, name)
})
listener2 := callCollector(2, func(name string, _ int, _ Packet) {
calls2 = append(calls2, name)
})
res := AsyncListenerMux([]Listener{listener1, listener2}, 16, doneChan)

callAllCallbacksOnces(t, res)

expectedCalls := []string{
"InitializeModuleData",
"StartBlock",
"OnTx",
"OnEvent",
"OnKVPair",
"OnObjectUpdate",
"Commit",
}

checkExpectedCallOrder(t, calls1, expectedCalls)
checkExpectedCallOrder(t, calls2, expectedCalls)

calls1 = nil
calls2 = nil

doneChan <- struct{}{}

callAllCallbacksOnces(t, res)
//
//checkExpectedCallOrder(t, calls1, nil)
//checkExpectedCallOrder(t, calls2, nil)
})
}

func TestAsyncListener(t *testing.T) {
t.Run("call done", func(t *testing.T) {
commitChan := make(chan error)
doneChan := make(chan struct{})
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})
res := AsyncListener(listener, 16, commitChan, doneChan)

callAllCallbacksOnces(t, res)

err := <-commitChan
if err != nil {
t.Fatalf("expected nil, got %v", err)
}

checkExpectedCallOrder(t, calls, []string{
"InitializeModuleData",
"StartBlock",
"OnTx",
"OnEvent",
"OnKVPair",
"OnObjectUpdate",
"Commit",
})

calls = nil

doneChan <- struct{}{}

callAllCallbacksOnces(t, res)

checkExpectedCallOrder(t, calls, nil)
})

t.Run("error", func(t *testing.T) {
commitChan := make(chan error)
doneChan := make(chan struct{})
var calls []string
listener := callCollector(1, func(name string, _ int, _ Packet) {
calls = append(calls, name)
})

listener.OnKVPair = func(updates KVPairData) error {
return fmt.Errorf("error")
}

res := AsyncListener(listener, 16, commitChan, doneChan)

callAllCallbacksOnces(t, res)

err := <-commitChan
if err == nil || err.Error() != "error" {
t.Fatalf("expected error, got %v", err)
}

checkExpectedCallOrder(t, calls, []string{"InitializeModuleData", "StartBlock", "OnTx", "OnEvent"})
})
}
42 changes: 42 additions & 0 deletions schema/appdata/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package appdata

// ModuleFilter returns an updated listener that filters state updates based on the module name.
func ModuleFilter(listener Listener, filter func(moduleName string) bool) Listener {
if initModData := listener.InitializeModuleData; initModData != nil {
listener.InitializeModuleData = func(data ModuleInitializationData) error {
if !filter(data.ModuleName) {
return nil
}

return initModData(data)
}
}

if onKVPair := listener.OnKVPair; onKVPair != nil {
listener.OnKVPair = func(data KVPairData) error {
for _, update := range data.Updates {
if !filter(update.ModuleName) {
continue
}

if err := onKVPair(KVPairData{Updates: []ModuleKVPairUpdate{update}}); err != nil {
return err
}
}

return nil
}
}

if onObjectUpdate := listener.OnObjectUpdate; onObjectUpdate != nil {
listener.OnObjectUpdate = func(data ObjectUpdateData) error {
if !filter(data.ModuleName) {
return nil
}

return onObjectUpdate(data)
}
}

return listener
}
Loading
Loading