From 5cf3f58b0a6a74aa592435dcac0e954bfe60b8b0 Mon Sep 17 00:00:00 2001 From: Saman Hosseini Date: Mon, 6 Nov 2023 22:46:02 +0100 Subject: [PATCH] feat(event/eventbus): add artificial delay option to simulate network latency --- event/eventbus/bus.go | 67 ++++++++++++++++++++++++------- go.work.sum | 4 ++ projection/lookup/lookup_test.go | 51 +++++++++++++++++++++++ projection/schedule/continuous.go | 2 +- 4 files changed, 108 insertions(+), 16 deletions(-) diff --git a/event/eventbus/bus.go b/event/eventbus/bus.go index 8107c684..2b6aa559 100644 --- a/event/eventbus/bus.go +++ b/event/eventbus/bus.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "time" "github.com/modernice/goes/event" ) @@ -11,6 +12,8 @@ import ( type chanbus struct { sync.RWMutex + artificialDelay time.Duration + events map[string]*eventSubscription queue chan event.Event done chan struct{} @@ -38,22 +41,45 @@ type subscribeJob struct { done chan struct{} } -// New creates and returns a new event.Bus backed by a channel-based -// implementation. The returned event.Bus allows subscribing to events, -// publishing events, and managing event subscriptions. -func New() event.Bus { +// Option is a type of function that modifies the properties of a [*chanbus] +// during its initialization. It enables the configuration of various settings +// of the event bus, such as the artificial delay, before it starts operation. +// Options are applied in the order they are provided when creating a new event +// bus through the New function. +type Option func(*chanbus) + +// WithArtificialDelay sets an artificial delay for the event bus. This delay is +// applied after each event is published to the bus, effectively slowing down +// the rate of event publishing. The delay duration is specified by the provided +// time.Duration value. The function returns an Option that can be used to +// configure a chanbus instance. +func WithArtificialDelay(delay time.Duration) func(*chanbus) { + return func(c *chanbus) { + c.artificialDelay = delay + } +} + +// New creates a new instance of an event bus with the provided options. The +// returned event bus is safe for concurrent use and starts processing events +// immediately. The artificial delay parameter can be set to simulate network +// latency or other delays. +func New(opts ...Option) event.Bus { bus := &chanbus{ - events: make(map[string]*eventSubscription), - queue: make(chan event.Event), - done: make(chan struct{}), + artificialDelay: time.Millisecond, + events: make(map[string]*eventSubscription), + queue: make(chan event.Event), + done: make(chan struct{}), + } + for _, opt := range opts { + opt(bus) } go bus.work() return bus } -// Close terminates the channel-based event bus. If the bus is already closed, -// it does nothing. Otherwise, it signals to all operations that they should -// stop and cleans up any resources associated with the bus. +// Close signals the termination of the Chanbus. After Close is called, no more +// events can be processed by the Chanbus. Calling Close multiple times has no +// additional effect. func (bus *chanbus) Close() { select { case <-bus.done: @@ -62,9 +88,14 @@ func (bus *chanbus) Close() { } } -// Subscribe creates a subscription for the specified events and returns -// channels for receiving the events and errors. The subscription is -// automatically unsubscribed when the provided context is canceled. +// Subscribe is used to register interest in a set of events. It takes in a +// context and a variadic parameter of event names. For each event name +// provided, it creates a subscription that listens for these events. The +// function returns two channels: one for receiving the subscribed events and +// another for receiving any errors that may occur during the subscription +// process. If an error occurs while setting up any of the subscriptions, the +// function cancels all other subscriptions, closes the channels, and returns an +// error. func (bus *chanbus) Subscribe(ctx context.Context, events ...string) (<-chan event.Event, <-chan error, error) { ctx, unsubscribeAll := context.WithCancel(ctx) go func() { @@ -86,8 +117,11 @@ func (bus *chanbus) Subscribe(ctx context.Context, events ...string) (<-chan eve return fanInEvents(ctx, rcpts), fanInErrors(ctx, rcpts), nil } -// Publish sends the provided events to the event bus. It returns an error if -// the context is canceled before all events are published. +// Publish publishes the provided events to the event bus. It ensures each event +// is dispatched to all subscribed recipients. If an artificial delay is set, it +// pauses for that duration before dispatching the next event. The function +// returns an error if the context gets cancelled before all events are +// published. func (bus *chanbus) Publish(ctx context.Context, events ...event.Event) error { done := make(chan struct{}) go func() { @@ -97,6 +131,9 @@ func (bus *chanbus) Publish(ctx context.Context, events ...event.Event) error { case <-ctx.Done(): return case bus.queue <- evt: + if bus.artificialDelay > 0 { + time.Sleep(bus.artificialDelay) + } } } }() diff --git a/go.work.sum b/go.work.sum index bc9d7f8c..64d7c18d 100644 --- a/go.work.sum +++ b/go.work.sum @@ -109,6 +109,7 @@ cloud.google.com/go/compute v1.19.1/go.mod h1:6ylj3a05WF8leseCdIf77NK0g1ey+nj5IK cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI= cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= +cloud.google.com/go/compute v1.23.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM= cloud.google.com/go/compute/metadata v0.2.0/go.mod h1:zFmK7XCadkQkj6TtorcaGlCW1hT1fIilQDwofLpJ20k= cloud.google.com/go/compute/metadata v0.2.1/go.mod h1:jgHgmJd2RKBGzXqF5LR2EZMGxBkeanZ9wwa75XHJgOM= cloud.google.com/go/compute/metadata v0.2.3 h1:mg4jlk7mCAj6xXp9UJ4fjI9VUI5rubuGBW5aJ7UnBMY= @@ -794,6 +795,7 @@ golang.org/x/oauth2 v0.7.0 h1:qe6s0zUXlPX80/dITx3440hWZ7GwMwgDDyrSGTPJG/g= golang.org/x/oauth2 v0.7.0/go.mod h1:hPLQkd9LyjfXTiRohC/41GhcFqxisoUQ99sCUOHO9x4= golang.org/x/oauth2 v0.8.0/go.mod h1:yr7u4HXZRm1R1kBWqr/xKNqewf0plRYoB7sla+BCIXE= golang.org/x/oauth2 v0.10.0/go.mod h1:kTpgurOux7LqtuxjuyZa4Gj2gdezIt/jQtGnNFfypQI= +golang.org/x/oauth2 v0.11.0/go.mod h1:LdF7O/8bLR/qWK9DrpXmbHLTouvRHK0SgJl0GmDBchk= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -992,11 +994,13 @@ google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02Oq google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5 h1:L6iMMGrtzgHsWofoFcihmDEMYeDR9KN/ThbPWGrh++g= google.golang.org/genproto v0.0.0-20230803162519-f966b187b2e5/go.mod h1:oH/ZOT02u4kWEp7oYBGYFFkCdKS/uYR9Z7+0/xuuFp8= google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d h1:VBu5YqKPv6XiJ199exd8Br+Aetz+o08F+PLMnwJQHAY= +google.golang.org/genproto v0.0.0-20230822172742-b8732ec3820d/go.mod h1:yZTlhN0tQnXo3h00fuXNCxJdLdIdnVFVBaRJ5LWBbw4= google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 h1:m8v1xLLLzMe1m5P+gCTF8nJB9epwZQUBERm20Oy1poQ= google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/api v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:vHYtlOoi6TsQ3Uk2yxR7NI5z8uoV+3pZtR4jmHIkRig= google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:mPBs5jNgx2GuQGvFwUvVKqtn6HsUw9nP64BedgvqEsQ= google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ= +google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d/go.mod h1:KjSP20unUpOx5kyQUFa7k4OJg0qeJ7DEZflGDu2p6Bk= google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/genproto/googleapis/rpc v0.0.0-20230530153820-e85fd2cbaebc/go.mod h1:66JfowdXAEgad5O9NnYcsNPLCPZJD++2L9X0PCMODrA= google.golang.org/genproto/googleapis/rpc v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:8mL13HKkDa+IuJ8yruA3ci0q+0vsUz4m//+ottjwS5o= diff --git a/projection/lookup/lookup_test.go b/projection/lookup/lookup_test.go index ba3e7a33..05848737 100644 --- a/projection/lookup/lookup_test.go +++ b/projection/lookup/lookup_test.go @@ -119,6 +119,44 @@ func TestLookup_Reverse(t *testing.T) { } } +func TestLookup_removed(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + bus := eventbus.New() + store := eventstore.New() + + l := lookup.New(store, bus, []string{"foo", "bar"}) + errs, err := l.Run(ctx) + if err != nil { + t.Fatalf("Run() failed with %q", err) + } + go func() { + for range errs { + } + }() + + aggregateID := uuid.New() + + events := []event.Event{ + event.New("foo", LookupEvent{Foo: "foo"}, event.Aggregate(aggregateID, "foo", 1)).Any(), + event.New("bar", LookupRemoveEvent{}, event.Aggregate(aggregateID, "foo", 2)).Any(), + } + + if err := bus.Publish(ctx, events...); err != nil { + t.Fatalf("publish events: %v", err) + } + + value, ok := l.Lookup(ctx, "foo", "foo", aggregateID) + if ok { + t.Fatalf("Lookup should return false; got true") + } + + if value != nil { + t.Fatalf("Lookup should return nil; got %v", value) + } +} + // LookupEvent is a type used in testing the lookup package. It provides a Foo // field and implements the ProvideLookup method of the lookup.Provider // interface. @@ -131,3 +169,16 @@ type LookupEvent struct { func (e LookupEvent) ProvideLookup(p lookup.Provider) { p.Provide("foo", e.Foo) } + +// LookupRemoveEvent is a type that, when used in the context of the lookup +// package, signals the removal of a lookup value from the provider it's +// attached to. It has no fields and its sole purpose is to implement the +// ProvideLookup method of the lookup.Provider interface in a way that instructs +// it to remove a specific value. +type LookupRemoveEvent struct{} + +// ProvideLookup removes the provided lookup value from the provider for the +// LookupRemoveEvent instance. +func (LookupRemoveEvent) ProvideLookup(p lookup.Provider) { + p.Remove("foo") +} diff --git a/projection/schedule/continuous.go b/projection/schedule/continuous.go index ccb30476..b1855c06 100644 --- a/projection/schedule/continuous.go +++ b/projection/schedule/continuous.go @@ -94,7 +94,7 @@ func DebounceCap(cap time.Duration) ContinuousOption { // subscribes to events with the given eventNames to create projection Jobs // for those events. // -// Debounce events +// # Debounce events // // It may be desirable to debounce the creation of projection Jobs to avoid // creating a Job on every event if Events are published within a short