Skip to content

Commit

Permalink
Support custom watches on controller (#18439)
Browse files Browse the repository at this point in the history
* Support custom watches on controller
* refactor mapper methods
  • Loading branch information
Ashwin Venkatesh authored Aug 17, 2023
1 parent 92cfb4a commit 97b41d9
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .changelog/18439.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
Support custom watches on the Consul Controller framework.
```
70 changes: 63 additions & 7 deletions internal/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
Expand Down Expand Up @@ -46,6 +47,21 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap
return c
}

// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper
// will be called to map events produced by source to the controller's watched type.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
}

if mapper == nil {
panic("mapper must not be nil")
}

c.customWatches = append(c.customWatches, customWatch{source, mapper})
return c
}

// WithLogger changes the controller's logger.
func (c Controller) WithLogger(logger hclog.Logger) Controller {
if logger == nil {
Expand Down Expand Up @@ -107,20 +123,60 @@ func (c Controller) backoff() (time.Duration, time.Duration) {
// Use the builder methods in this package (starting with ForType) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
customWatches []customWatch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
}

type watch struct {
watchedType *pbresource.Type
mapper DependencyMapper
}

// Watch is responsible for watching for custom events from source and adding them to
// the event queue.
func (s *Source) Watch(ctx context.Context, add func(e Event)) error {
for {
select {
case <-ctx.Done():
return nil
case evt, ok := <-s.Source:
if !ok {
return nil
}
add(evt)
}
}
}

// Source is used as a generic source of events. This can be used when events aren't coming from resources
// stored by the resource API.
type Source struct {
Source <-chan Event
}

// Event captures an event in the system which the API can choose to respond to.
type Event struct {
Obj queue.ItemType
}

// Key returns a string that will be used to de-duplicate items in the queue.
func (e Event) Key() string {
return e.Obj.Key()
}

// customWatch represent a Watch on a custom Event source and a Mapper to map said
// Events into Requests that the controller can respond to.
type customWatch struct {
source *Source
mapper CustomDependencyMapper
}

// Request represents a request to reconcile the resource with the given ID.
type Request struct {
// ID of the resource that needs to be reconciled.
Expand Down
46 changes: 46 additions & 0 deletions internal/controller/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ func TestController_API(t *testing.T) {
rec := newTestReconciler()
client := svctest.RunResourceService(t, demo.RegisterTypes)

concertsChan := make(chan controller.Event)
defer close(concertsChan)
concertSource := &controller.Source{Source: concertsChan}
concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
artistID := event.Obj.(*Concert).artistID
var requests []controller.Request
requests = append(requests, controller.Request{ID: artistID})
return requests, nil
}

ctrl := controller.
ForType(demo.TypeV2Artist).
WithWatch(demo.TypeV2Album, controller.MapOwner).
WithCustomWatch(concertSource, concertMapper).
WithBackoff(10*time.Millisecond, 100*time.Millisecond).
WithReconciler(rec)

Expand Down Expand Up @@ -69,6 +80,32 @@ func TestController_API(t *testing.T) {
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
})

t.Run("custom watched resource type", func(t *testing.T) {
res, err := demo.GenerateV2Artist()
require.NoError(t, err)

rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)

req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)

rec.expectNoRequest(t, 500*time.Millisecond)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}}

watchedReq := rec.wait(t)
prototest.AssertDeepEqual(t, req.ID, watchedReq.ID)

otherArtist, err := demo.GenerateV2Artist()
require.NoError(t, err)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}}

watchedReq = rec.wait(t)
prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID)
})

t.Run("error retries", func(t *testing.T) {
rec.failNext(errors.New("KABOOM"))

Expand Down Expand Up @@ -266,3 +303,12 @@ func testContext(t *testing.T) context.Context {

return ctx
}

type Concert struct {
name string
artistID *pbresource.ID
}

func (c Concert) Key() string {
return c.name
}
98 changes: 75 additions & 23 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error {
})
})

for _, watch := range c.ctrl.watches {
watch := watch
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)

watcher := w
// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})

// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue)
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}

for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
watcher := cw
// Custom Events → Mapper Queue
group.Go(func() error {
return watcher.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})

// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}

Expand All @@ -71,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work
}

func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Expand All @@ -85,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
}

for {
event, err := watch.Recv()
event, err := wl.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
Expand All @@ -99,6 +118,7 @@ func (c *controllerRunner) runMapper(
w watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))

Expand All @@ -108,34 +128,66 @@ func (c *controllerRunner) runMapper(
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) runCustomMapper(
ctx context.Context,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", cw.source)

for {
item, shutdown := from.Get()
if shutdown {
return nil
}

if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(), item)
return err
}); err != nil {
return err
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
return nil
}

func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
Expand Down
8 changes: 8 additions & 0 deletions internal/controller/dependency_mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ type DependencyMapper func(
res *pbresource.Resource,
) ([]Request, error)

// CustomDependencyMapper is called when an Event occurs to determine which of the
// controller's managed resources need to be reconciled.
type CustomDependencyMapper func(
ctx context.Context,
rt Runtime,
event Event,
) ([]Request, error)

// MapOwner implements a DependencyMapper that returns the updated resource's owner.
func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
var reqs []Request
Expand Down

0 comments on commit 97b41d9

Please sign in to comment.