From 76a7954dd40a18d55d0f5695eb532d74b5b44f7f Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Wed, 9 Aug 2023 14:42:40 -0400 Subject: [PATCH 1/2] Support custom watches on controller --- .changelog/18439.txt | 3 ++ internal/controller/api.go | 64 ++++++++++++++++++++--- internal/controller/api_test.go | 45 ++++++++++++++++ internal/controller/controller.go | 59 ++++++++++++++++++++- internal/controller/dependency_mappers.go | 6 +++ 5 files changed, 169 insertions(+), 8 deletions(-) create mode 100644 .changelog/18439.txt diff --git a/.changelog/18439.txt b/.changelog/18439.txt new file mode 100644 index 000000000000..dd12738d5c38 --- /dev/null +++ b/.changelog/18439.txt @@ -0,0 +1,3 @@ +```release-note:feature +Support custom watches on the Consul Controller framework. +``` diff --git a/internal/controller/api.go b/internal/controller/api.go index 17dfa8ef0ae7..6a91a048bb33 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -11,6 +11,7 @@ import ( "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/agent/consul/controller/queue" "github.com/hashicorp/consul/internal/resource" "github.com/hashicorp/consul/proto-public/pbresource" ) @@ -46,6 +47,22 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap return c } +// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper +// will be called to determine which resources must be reconciled as a result of +// an event. +func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller { + if source == nil { + panic("source must not be nil") + } + + if mapper == nil { + panic("mapper must not be nil") + } + + c.customWatches = append(c.customWatches, customWatch{source, mapper}) + return c +} + // WithLogger changes the controller's logger. func (c Controller) WithLogger(logger hclog.Logger) Controller { if logger == nil { @@ -107,13 +124,14 @@ func (c Controller) backoff() (time.Duration, time.Duration) { // Use the builder methods in this package (starting with ForType) to construct // a controller, and then pass it to a Manager to be executed. type Controller struct { - managedType *pbresource.Type - reconciler Reconciler - logger hclog.Logger - watches []watch - baseBackoff time.Duration - maxBackoff time.Duration - placement Placement + managedType *pbresource.Type + reconciler Reconciler + logger hclog.Logger + watches []watch + customWatches []customWatch + baseBackoff time.Duration + maxBackoff time.Duration + placement Placement } type watch struct { @@ -121,6 +139,38 @@ type watch struct { mapper DependencyMapper } +// Watch is responsible for watching for custom events from source and adding them to +// the event queue. +func (s *Source) Watch(ctx context.Context, add func(e Event)) error { + for { + select { + case <-ctx.Done(): + return nil + case evt := <-s.Source: + add(evt) + } + } +} + +// Source is used as a generic source of events. This can be used when events aren't coming from resources +// stored by the resource API. +type Source struct { + Source <-chan Event +} + +type Event struct { + Obj queue.ItemType +} + +func (e Event) Key() string { + return e.Obj.Key() +} + +type customWatch struct { + source *Source + mapper CustomDependencyMapper +} + // Request represents a request to reconcile the resource with the given ID. type Request struct { // ID of the resource that needs to be reconciled. diff --git a/internal/controller/api_test.go b/internal/controller/api_test.go index 215063d87c77..b27912f157eb 100644 --- a/internal/controller/api_test.go +++ b/internal/controller/api_test.go @@ -25,9 +25,19 @@ func TestController_API(t *testing.T) { rec := newTestReconciler() client := svctest.RunResourceService(t, demo.RegisterTypes) + concertsChan := make(chan controller.Event) + concertSource := &controller.Source{Source: concertsChan} + concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) { + artistID := event.Obj.(*Concert).artistID + var requests []controller.Request + requests = append(requests, controller.Request{ID: artistID}) + return requests, nil + } + ctrl := controller. ForType(demo.TypeV2Artist). WithWatch(demo.TypeV2Album, controller.MapOwner). + WithCustomWatch(concertSource, concertMapper). WithBackoff(10*time.Millisecond, 100*time.Millisecond). WithReconciler(rec) @@ -69,6 +79,32 @@ func TestController_API(t *testing.T) { prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) }) + t.Run("custom watched resource type", func(t *testing.T) { + res, err := demo.GenerateV2Artist() + require.NoError(t, err) + + rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res}) + require.NoError(t, err) + + req := rec.wait(t) + prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID) + + rec.expectNoRequest(t, 500*time.Millisecond) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}} + + watchedReq := rec.wait(t) + prototest.AssertDeepEqual(t, req.ID, watchedReq.ID) + + otherArtist, err := demo.GenerateV2Artist() + require.NoError(t, err) + + concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}} + + watchedReq = rec.wait(t) + prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID) + }) + t.Run("error retries", func(t *testing.T) { rec.failNext(errors.New("KABOOM")) @@ -266,3 +302,12 @@ func testContext(t *testing.T) context.Context { return ctx } + +type Concert struct { + name string + artistID *pbresource.ID +} + +func (c Concert) Key() string { + return c.name +} diff --git a/internal/controller/controller.go b/internal/controller/controller.go index f6a064853c73..916de1fdc277 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -41,7 +41,6 @@ func (c *controllerRunner) run(ctx context.Context) error { }) for _, watch := range c.ctrl.watches { - watch := watch mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) // Watched Type Events → Mapper Queue @@ -57,6 +56,22 @@ func (c *controllerRunner) run(ctx context.Context) error { }) } + for _, customWatch := range c.ctrl.customWatches { + customMapQueue := runQueue[Event](groupCtx, c.ctrl) + + // Custom Events → Mapper Queue + group.Go(func() error { + return customWatch.source.Watch(groupCtx, func(e Event) { + customMapQueue.Add(e) + }) + }) + + // Mapper Queue → Mapper → Reconciliation Queue + group.Go(func() error { + return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue) + }) + } + // Reconciliation Queue → Reconciler group.Go(func() error { return c.runReconciler(groupCtx, recQueue) @@ -136,6 +151,48 @@ func (c *controllerRunner) runMapper( } } +func (c *controllerRunner) runCustomMapper( + ctx context.Context, + w customWatch, + from queue.WorkQueue[Event], + to queue.WorkQueue[Request], +) error { + logger := c.logger.With("watched_event", w.source) + + for { + item, shutdown := from.Get() + if shutdown { + return nil + } + + var reqs []Request + err := c.handlePanic(func() error { + var err error + reqs, err = w.mapper(ctx, c.runtime(), item) + return err + }) + if err != nil { + from.AddRateLimited(item) + from.Done(item) + continue + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + + from.Forget(item) + from.Done(item) + } +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() diff --git a/internal/controller/dependency_mappers.go b/internal/controller/dependency_mappers.go index e66e4b50e5d3..cf439add5ba3 100644 --- a/internal/controller/dependency_mappers.go +++ b/internal/controller/dependency_mappers.go @@ -18,6 +18,12 @@ type DependencyMapper func( res *pbresource.Resource, ) ([]Request, error) +type CustomDependencyMapper func( + ctx context.Context, + rt Runtime, + event Event, +) ([]Request, error) + // MapOwner implements a DependencyMapper that returns the updated resource's owner. func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) { var reqs []Request From 1d0f71d3ec2d0610a5f8afe9eff4b3e0e3ec176b Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Fri, 11 Aug 2023 16:24:19 -0400 Subject: [PATCH 2/2] refactor mapper methods --- internal/controller/api.go | 14 +++- internal/controller/api_test.go | 1 + internal/controller/controller.go | 91 +++++++++++------------ internal/controller/dependency_mappers.go | 2 + 4 files changed, 56 insertions(+), 52 deletions(-) diff --git a/internal/controller/api.go b/internal/controller/api.go index 6a91a048bb33..5c2fc2e78271 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -47,9 +47,8 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap return c } -// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper -// will be called to determine which resources must be reconciled as a result of -// an event. +// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper +// will be called to map events produced by source to the controller's watched type. func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller { if source == nil { panic("source must not be nil") @@ -146,7 +145,10 @@ func (s *Source) Watch(ctx context.Context, add func(e Event)) error { select { case <-ctx.Done(): return nil - case evt := <-s.Source: + case evt, ok := <-s.Source: + if !ok { + return nil + } add(evt) } } @@ -158,14 +160,18 @@ type Source struct { Source <-chan Event } +// Event captures an event in the system which the API can choose to respond to. type Event struct { Obj queue.ItemType } +// Key returns a string that will be used to de-duplicate items in the queue. func (e Event) Key() string { return e.Obj.Key() } +// customWatch represent a Watch on a custom Event source and a Mapper to map said +// Events into Requests that the controller can respond to. type customWatch struct { source *Source mapper CustomDependencyMapper diff --git a/internal/controller/api_test.go b/internal/controller/api_test.go index b27912f157eb..40d3ec99bebd 100644 --- a/internal/controller/api_test.go +++ b/internal/controller/api_test.go @@ -26,6 +26,7 @@ func TestController_API(t *testing.T) { client := svctest.RunResourceService(t, demo.RegisterTypes) concertsChan := make(chan controller.Event) + defer close(concertsChan) concertSource := &controller.Source{Source: concertsChan} concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) { artistID := event.Obj.(*Concert).artistID diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 916de1fdc277..ac901d355b6e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -40,35 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error { }) }) - for _, watch := range c.ctrl.watches { + for _, w := range c.ctrl.watches { mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) - + watcher := w // Watched Type Events → Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { + return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runMapper(groupCtx, watch, mapQueue, recQueue) + return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res) + }) }) } - for _, customWatch := range c.ctrl.customWatches { + for _, cw := range c.ctrl.customWatches { customMapQueue := runQueue[Event](groupCtx, c.ctrl) - + watcher := cw // Custom Events → Mapper Queue group.Go(func() error { - return customWatch.source.Watch(groupCtx, func(e Event) { + return watcher.source.Watch(groupCtx, func(e Event) { customMapQueue.Add(e) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue) + return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(Event)) + }) }) } @@ -86,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work } func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { - watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ + wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -100,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add } for { - event, err := watch.Recv() + event, err := wl.Recv() if err != nil { c.logger.Warn("error received from watch", "error", err) return err @@ -114,6 +118,7 @@ func (c *controllerRunner) runMapper( w watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) @@ -123,29 +128,12 @@ func (c *controllerRunner) runMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item.res) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } @@ -153,11 +141,12 @@ func (c *controllerRunner) runMapper( func (c *controllerRunner) runCustomMapper( ctx context.Context, - w customWatch, + cw customWatch, from queue.WorkQueue[Event], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { - logger := c.logger.With("watched_event", w.source) + logger := c.logger.With("watched_event", cw.source) for { item, shutdown := from.Get() @@ -165,34 +154,40 @@ func (c *controllerRunner) runCustomMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } } +func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { + var reqs []Request + if err := c.handlePanic(func() error { + var err error + reqs, err = mapper(ctx, c.runtime(), item) + return err + }); err != nil { + return err + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + return nil +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() diff --git a/internal/controller/dependency_mappers.go b/internal/controller/dependency_mappers.go index cf439add5ba3..1ac331ffafdd 100644 --- a/internal/controller/dependency_mappers.go +++ b/internal/controller/dependency_mappers.go @@ -18,6 +18,8 @@ type DependencyMapper func( res *pbresource.Resource, ) ([]Request, error) +// CustomDependencyMapper is called when an Event occurs to determine which of the +// controller's managed resources need to be reconciled. type CustomDependencyMapper func( ctx context.Context, rt Runtime,