From 5b8eb525e04f6e633d986a349a3a7c39726d13de Mon Sep 17 00:00:00 2001 From: Saman Hosseini Date: Wed, 20 Sep 2023 21:50:19 +0200 Subject: [PATCH] feat(event): add `handler.StartupQuery()` option (closes #124) feat(handler.go): add DefaultStartupQuery function to construct default query for event handler startup feat(handler.go): add startupQuery field to Handler struct to allow customization of startup query feat(handler.go): modify Startup function to accept query options and merge with default query feat(handler.go): add StartupQuery function to configure Handler's startup query feat(handler.go): modify New function to set default startupQuery if not provided feat(handler.go): modify startup function to use startupQuery when querying events from startupStore test(handler_test.go): add tests for new Startup and StartupQuery functions and their effects on event handling --- event/handler/handler.go | 56 +++++++++-- event/handler/handler_test.go | 175 +++++++++++++++++++++++++++++++++- 2 files changed, 220 insertions(+), 11 deletions(-) diff --git a/event/handler/handler.go b/event/handler/handler.go index ad6c27ff..495728b2 100644 --- a/event/handler/handler.go +++ b/event/handler/handler.go @@ -18,6 +18,15 @@ import ( // handler. var ErrRunning = errors.New("event handler is already running") +// DefaultStartupQuery constructs a default query for the startup of an event +// handler. It uses the provided slice of event names to create a query that +// sorts events by their timestamps. This function is typically used to +// determine which events should be processed during the startup of an +// event handler. +func DefaultStartupQuery(events []string) query.Query { + return query.New(query.Name(events...), query.SortByTime()) +} + // Handler is a type that processes events from an event bus. It associates // event names with specific functions, which are called whenever their // respective event occurs. Handler uses multiple workers to process events @@ -30,6 +39,7 @@ var ErrRunning = errors.New("event handler is already running") type Handler struct { bus event.Bus startupStore event.Store + startupQuery func(event.Query) event.Query workers int mux sync.RWMutex @@ -44,13 +54,37 @@ type Handler struct { // constructing a new [Handler] using the New function. type Option func(*Handler) -// Startup sets the startup event store for a [Handler]. This store is used to -// handle events when the [Handler] starts up. The Startup option is typically -// used to initialize the system with initial event handling on startup or -// implement a "catch-up" mechanism for their event handlers. -func Startup(store event.Store) Option { +// Startup configures a [Handler] with a specified event store and options for +// querying events. It is used to setup the event store that the [Handler] will +// use to fetch events during startup. This can be used to initialize the system +// with initial event handling on startup or implement a "catch-up" mechanism +// for their event handlers. The query options allow customization of how the +// events are fetched from the store. The returned [Option] can be used when +// creating a new [Handler]. +// +// If [query.Option]s are provided, they will be merged with the default query +// using [query.Merge]. If you want to _replace_ the default query, use the +// [StartupQuery] option instead of providing [query.Option]s to [Startup]. +func Startup(store event.Store, opts ...query.Option) Option { return func(h *Handler) { h.startupStore = store + if len(opts) > 0 { + StartupQuery(func(q event.Query) event.Query { + return query.Merge(q, query.New(opts...)) + })(h) + } + } +} + +// StartupQuery is a function that configures a [Handler]'s startup query. It +// accepts a function that takes and returns an event.Query as its argument. The +// provided function will be used by the [Handler] to modify the default query +// used when fetching events from the event store during startup. The resulting +// [Option] can be used when constructing a new [Handler], allowing +// customization of the startup behavior of the [Handler]. +func StartupQuery(fn func(event.Query) event.Query) Option { + return func(h *Handler) { + h.startupQuery = fn } } @@ -89,6 +123,11 @@ func New(bus event.Bus, opts ...Option) *Handler { if h.workers < 1 { h.workers = 1 } + + if h.startupQuery == nil && h.startupStore != nil { + h.startupQuery = func(q event.Query) event.Query { return q } + } + return h } @@ -205,10 +244,9 @@ func (h *Handler) handleEvents(ctx context.Context, events <-chan event.Event) < } func (h *Handler) startup(ctx context.Context, eventNames []string) error { - str, errs, err := h.startupStore.Query(ctx, query.New( - query.Name(eventNames...), - query.SortByTime(), - )) + q := h.startupQuery(DefaultStartupQuery(eventNames)) + + str, errs, err := h.startupStore.Query(ctx, q) if err != nil { return fmt.Errorf("query events %v: %w", eventNames, err) } diff --git a/event/handler/handler_test.go b/event/handler/handler_test.go index abdabd09..5d65f89d 100644 --- a/event/handler/handler_test.go +++ b/event/handler/handler_test.go @@ -5,10 +5,12 @@ import ( "testing" "time" + "github.com/google/uuid" "github.com/modernice/goes/event" "github.com/modernice/goes/event/eventbus" "github.com/modernice/goes/event/eventstore" "github.com/modernice/goes/event/handler" + "github.com/modernice/goes/event/query" "github.com/modernice/goes/event/test" ) @@ -57,13 +59,13 @@ func TestHandler(t *testing.T) { } } -func TestWithStore(t *testing.T) { +func TestStartup(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() bus := eventbus.New() store := eventstore.New() - h := handler.New(bus, handler.WithStore(store)) + h := handler.New(bus, handler.Startup(store)) fooHandled := make(chan event.Of[test.FooEventData]) barHandled := make(chan event.Of[test.BarEventData]) @@ -102,3 +104,172 @@ func TestWithStore(t *testing.T) { case <-barHandled: } } + +func TestStartupQuery(t *testing.T) { + bus := eventbus.New() + store := eventstore.New() + + h := handler.New(bus, handler.Startup(store), handler.StartupQuery(func(event.Query) event.Query { + return query.New(query.Name("bar")) + })) + + fooHandled := make(chan event.Of[test.FooEventData]) + barHandled := make(chan event.Of[test.BarEventData]) + + h.RegisterEventHandler("foo", func(evt event.Event) { fooHandled <- event.Cast[test.FooEventData](evt) }) + h.RegisterEventHandler("bar", func(evt event.Event) { barHandled <- event.Cast[test.BarEventData](evt) }) + + if err := store.Insert( + context.Background(), + event.New("foo", test.FooEventData{}).Any(), + event.New("bar", test.BarEventData{}).Any(), + ); err != nil { + t.Fatalf("Insert() failed with %q", err) + } + + errs, err := h.Run(context.Background()) + if err != nil { + t.Fatalf("Run() failed with %q", err) + } + + go func() { + for err := range errs { + panic(err) + } + }() + + select { + case <-time.After(time.Second): + t.Fatalf("bar event was not handled") + case <-barHandled: + } + + select { + case <-time.After(50 * time.Millisecond): + case <-fooHandled: + t.Fatalf("foo event was handled") + } +} + +func TestStartup_withQuery_merges_names(t *testing.T) { + bus := eventbus.New() + store := eventstore.New() + + testID := uuid.New() + + h := handler.New(bus, handler.Startup(store, query.Name("bar"))) + + fooHandled := make(chan event.Of[test.FooEventData]) + barHandled := make(chan event.Of[test.BarEventData]) + + h.RegisterEventHandler("foo", func(evt event.Event) { + t.Log("Handling foo event") + fooHandled <- event.Cast[test.FooEventData](evt) + }) + h.RegisterEventHandler("bar", func(evt event.Event) { + t.Log("Handling bar event") + barHandled <- event.Cast[test.BarEventData](evt) + }) + + t1 := time.Now().Add(time.Minute) + t2 := t1.Add(time.Second) + + if err := store.Insert( + context.Background(), + event.New("foo", test.FooEventData{}).Any(), + event.New("bar", test.BarEventData{}, event.Time(t2)).Any(), + event.New("bar", test.BarEventData{}, event.Time(t1), event.ID(testID)).Any(), + ); err != nil { + t.Fatalf("Insert() failed with %q", err) + } + + errs, err := h.Run(context.Background()) + if err != nil { + t.Fatalf("Run() failed with %q", err) + } + + go func() { + for err := range errs { + panic(err) + } + }() + + select { + case <-time.After(time.Second): + t.Fatalf("foo event was not handled") + case <-fooHandled: + } + + select { + case <-time.After(time.Second): + t.Fatalf("bar event was not handled #1") + case evt := <-barHandled: + if evt.ID() != testID { + t.Fatalf("expected event ID %q; got %q", testID, evt.ID()) + } + } + + select { + case <-time.After(time.Second): + t.Fatalf("bar event was not handled #2") + case evt := <-barHandled: + if evt.ID() == testID { + t.Fatalf("expected event ID not to be %q; got %q", testID, evt.ID()) + } + } +} + +func TestStartup_withQuery_merges_ids(t *testing.T) { + bus := eventbus.New() + store := eventstore.New() + + testID := uuid.New() + + h := handler.New(bus, handler.Startup(store, query.ID(testID))) + + fooHandled := make(chan event.Of[test.FooEventData]) + barHandled := make(chan event.Of[test.BarEventData]) + + h.RegisterEventHandler("foo", func(evt event.Event) { + t.Log("Handling foo event") + fooHandled <- event.Cast[test.FooEventData](evt) + }) + h.RegisterEventHandler("bar", func(evt event.Event) { + t.Log("Handling bar event") + barHandled <- event.Cast[test.BarEventData](evt) + }) + + if err := store.Insert( + context.Background(), + event.New("foo", test.FooEventData{}).Any(), + event.New("bar", test.BarEventData{}, event.ID(testID)).Any(), + ); err != nil { + t.Fatalf("Insert() failed with %q", err) + } + + errs, err := h.Run(context.Background()) + if err != nil { + t.Fatalf("Run() failed with %q", err) + } + + go func() { + for err := range errs { + panic(err) + } + }() + + select { + case <-time.After(50 * time.Millisecond): + case <-fooHandled: + t.Fatalf("foo event was handled") + } + + select { + case <-time.After(time.Second): + t.Fatalf("bar event was not handled") + case evt := <-barHandled: + if evt.ID() != testID { + t.Fatalf("expected event ID %q; got %q", testID, evt.ID()) + } + } +}