From 1d0f71d3ec2d0610a5f8afe9eff4b3e0e3ec176b Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Fri, 11 Aug 2023 16:24:19 -0400 Subject: [PATCH] refactor mapper methods --- internal/controller/api.go | 14 +++- internal/controller/api_test.go | 1 + internal/controller/controller.go | 91 +++++++++++------------ internal/controller/dependency_mappers.go | 2 + 4 files changed, 56 insertions(+), 52 deletions(-) diff --git a/internal/controller/api.go b/internal/controller/api.go index 6a91a048bb33..5c2fc2e78271 100644 --- a/internal/controller/api.go +++ b/internal/controller/api.go @@ -47,9 +47,8 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap return c } -// WithCustomWatch adds a custom watch on the given type/dependency to the controller. custom mapper -// will be called to determine which resources must be reconciled as a result of -// an event. +// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper +// will be called to map events produced by source to the controller's watched type. func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller { if source == nil { panic("source must not be nil") @@ -146,7 +145,10 @@ func (s *Source) Watch(ctx context.Context, add func(e Event)) error { select { case <-ctx.Done(): return nil - case evt := <-s.Source: + case evt, ok := <-s.Source: + if !ok { + return nil + } add(evt) } } @@ -158,14 +160,18 @@ type Source struct { Source <-chan Event } +// Event captures an event in the system which the API can choose to respond to. type Event struct { Obj queue.ItemType } +// Key returns a string that will be used to de-duplicate items in the queue. func (e Event) Key() string { return e.Obj.Key() } +// customWatch represent a Watch on a custom Event source and a Mapper to map said +// Events into Requests that the controller can respond to. type customWatch struct { source *Source mapper CustomDependencyMapper diff --git a/internal/controller/api_test.go b/internal/controller/api_test.go index b27912f157eb..40d3ec99bebd 100644 --- a/internal/controller/api_test.go +++ b/internal/controller/api_test.go @@ -26,6 +26,7 @@ func TestController_API(t *testing.T) { client := svctest.RunResourceService(t, demo.RegisterTypes) concertsChan := make(chan controller.Event) + defer close(concertsChan) concertSource := &controller.Source{Source: concertsChan} concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) { artistID := event.Obj.(*Concert).artistID diff --git a/internal/controller/controller.go b/internal/controller/controller.go index 916de1fdc277..ac901d355b6e 100644 --- a/internal/controller/controller.go +++ b/internal/controller/controller.go @@ -40,35 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error { }) }) - for _, watch := range c.ctrl.watches { + for _, w := range c.ctrl.watches { mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl) - + watcher := w // Watched Type Events → Mapper Queue group.Go(func() error { - return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) { + return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) { mapQueue.Add(mapperRequest{res: res}) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runMapper(groupCtx, watch, mapQueue, recQueue) + return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res) + }) }) } - for _, customWatch := range c.ctrl.customWatches { + for _, cw := range c.ctrl.customWatches { customMapQueue := runQueue[Event](groupCtx, c.ctrl) - + watcher := cw // Custom Events → Mapper Queue group.Go(func() error { - return customWatch.source.Watch(groupCtx, func(e Event) { + return watcher.source.Watch(groupCtx, func(e Event) { customMapQueue.Add(e) }) }) // Mapper Queue → Mapper → Reconciliation Queue group.Go(func() error { - return c.runCustomMapper(groupCtx, customWatch, customMapQueue, recQueue) + return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) { + return watcher.mapper(ctx, runtime, itemType.(Event)) + }) }) } @@ -86,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work } func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error { - watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ + wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{ Type: typ, Tenancy: &pbresource.Tenancy{ Partition: storage.Wildcard, @@ -100,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add } for { - event, err := watch.Recv() + event, err := wl.Recv() if err != nil { c.logger.Warn("error received from watch", "error", err) return err @@ -114,6 +118,7 @@ func (c *controllerRunner) runMapper( w watch, from queue.WorkQueue[mapperRequest], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType)) @@ -123,29 +128,12 @@ func (c *controllerRunner) runMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item.res) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } @@ -153,11 +141,12 @@ func (c *controllerRunner) runMapper( func (c *controllerRunner) runCustomMapper( ctx context.Context, - w customWatch, + cw customWatch, from queue.WorkQueue[Event], to queue.WorkQueue[Request], + mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), ) error { - logger := c.logger.With("watched_event", w.source) + logger := c.logger.With("watched_event", cw.source) for { item, shutdown := from.Get() @@ -165,34 +154,40 @@ func (c *controllerRunner) runCustomMapper( return nil } - var reqs []Request - err := c.handlePanic(func() error { - var err error - reqs, err = w.mapper(ctx, c.runtime(), item) - return err - }) - if err != nil { + if err := c.doMap(ctx, mapper, to, item, logger); err != nil { from.AddRateLimited(item) from.Done(item) continue } - for _, r := range reqs { - if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { - logger.Error("dependency mapper returned request for a resource of the wrong type", - "type_expected", resource.ToGVK(c.ctrl.managedType), - "type_got", resource.ToGVK(r.ID.Type), - ) - continue - } - to.Add(r) - } - from.Forget(item) from.Done(item) } } +func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error { + var reqs []Request + if err := c.handlePanic(func() error { + var err error + reqs, err = mapper(ctx, c.runtime(), item) + return err + }); err != nil { + return err + } + + for _, r := range reqs { + if !resource.EqualType(r.ID.Type, c.ctrl.managedType) { + logger.Error("dependency mapper returned request for a resource of the wrong type", + "type_expected", resource.ToGVK(c.ctrl.managedType), + "type_got", resource.ToGVK(r.ID.Type), + ) + continue + } + to.Add(r) + } + return nil +} + func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error { for { req, shutdown := queue.Get() diff --git a/internal/controller/dependency_mappers.go b/internal/controller/dependency_mappers.go index cf439add5ba3..1ac331ffafdd 100644 --- a/internal/controller/dependency_mappers.go +++ b/internal/controller/dependency_mappers.go @@ -18,6 +18,8 @@ type DependencyMapper func( res *pbresource.Resource, ) ([]Request, error) +// CustomDependencyMapper is called when an Event occurs to determine which of the +// controller's managed resources need to be reconciled. type CustomDependencyMapper func( ctx context.Context, rt Runtime,