Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support custom watches on controller #18439

Merged
merged 2 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18439.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:feature
Support custom watches on the Consul Controller framework.
```
70 changes: 63 additions & 7 deletions internal/controller/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/consul/controller/queue"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/proto-public/pbresource"
)
Expand Down Expand Up @@ -46,6 +47,21 @@ func (c Controller) WithWatch(watchedType *pbresource.Type, mapper DependencyMap
return c
}

// WithCustomWatch adds a custom watch on the given dependency to the controller. Custom mapper
// will be called to map events produced by source to the controller's watched type.
func (c Controller) WithCustomWatch(source *Source, mapper CustomDependencyMapper) Controller {
if source == nil {
panic("source must not be nil")
}

if mapper == nil {
panic("mapper must not be nil")
}

c.customWatches = append(c.customWatches, customWatch{source, mapper})
return c
}

// WithLogger changes the controller's logger.
func (c Controller) WithLogger(logger hclog.Logger) Controller {
if logger == nil {
Expand Down Expand Up @@ -107,20 +123,60 @@ func (c Controller) backoff() (time.Duration, time.Duration) {
// Use the builder methods in this package (starting with ForType) to construct
// a controller, and then pass it to a Manager to be executed.
type Controller struct {
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
managedType *pbresource.Type
reconciler Reconciler
logger hclog.Logger
watches []watch
customWatches []customWatch
baseBackoff time.Duration
maxBackoff time.Duration
placement Placement
}

type watch struct {
watchedType *pbresource.Type
mapper DependencyMapper
}

// Watch is responsible for watching for custom events from source and adding them to
// the event queue.
func (s *Source) Watch(ctx context.Context, add func(e Event)) error {
thisisnotashwin marked this conversation as resolved.
Show resolved Hide resolved
for {
select {
case <-ctx.Done():
return nil
case evt, ok := <-s.Source:
if !ok {
return nil
}
add(evt)
}
}
}

// Source is used as a generic source of events. This can be used when events aren't coming from resources
// stored by the resource API.
type Source struct {
Source <-chan Event
}

// Event captures an event in the system which the API can choose to respond to.
type Event struct {
Obj queue.ItemType
}

// Key returns a string that will be used to de-duplicate items in the queue.
func (e Event) Key() string {
return e.Obj.Key()
}

// customWatch represent a Watch on a custom Event source and a Mapper to map said
// Events into Requests that the controller can respond to.
type customWatch struct {
source *Source
mapper CustomDependencyMapper
}

// Request represents a request to reconcile the resource with the given ID.
type Request struct {
// ID of the resource that needs to be reconciled.
Expand Down
46 changes: 46 additions & 0 deletions internal/controller/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,20 @@ func TestController_API(t *testing.T) {
rec := newTestReconciler()
client := svctest.RunResourceService(t, demo.RegisterTypes)

concertsChan := make(chan controller.Event)
defer close(concertsChan)
concertSource := &controller.Source{Source: concertsChan}
concertMapper := func(ctx context.Context, rt controller.Runtime, event controller.Event) ([]controller.Request, error) {
thisisnotashwin marked this conversation as resolved.
Show resolved Hide resolved
artistID := event.Obj.(*Concert).artistID
var requests []controller.Request
requests = append(requests, controller.Request{ID: artistID})
return requests, nil
}

ctrl := controller.
ForType(demo.TypeV2Artist).
WithWatch(demo.TypeV2Album, controller.MapOwner).
WithCustomWatch(concertSource, concertMapper).
WithBackoff(10*time.Millisecond, 100*time.Millisecond).
WithReconciler(rec)

Expand Down Expand Up @@ -69,6 +80,32 @@ func TestController_API(t *testing.T) {
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)
})

t.Run("custom watched resource type", func(t *testing.T) {
res, err := demo.GenerateV2Artist()
require.NoError(t, err)

rsp, err := client.Write(testContext(t), &pbresource.WriteRequest{Resource: res})
require.NoError(t, err)

req := rec.wait(t)
prototest.AssertDeepEqual(t, rsp.Resource.Id, req.ID)

rec.expectNoRequest(t, 500*time.Millisecond)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: rsp.Resource.Id}}

watchedReq := rec.wait(t)
prototest.AssertDeepEqual(t, req.ID, watchedReq.ID)

otherArtist, err := demo.GenerateV2Artist()
require.NoError(t, err)

concertsChan <- controller.Event{Obj: &Concert{name: "test-concert", artistID: otherArtist.Id}}
thisisnotashwin marked this conversation as resolved.
Show resolved Hide resolved

watchedReq = rec.wait(t)
prototest.AssertDeepEqual(t, otherArtist.Id, watchedReq.ID)
})

t.Run("error retries", func(t *testing.T) {
rec.failNext(errors.New("KABOOM"))

Expand Down Expand Up @@ -266,3 +303,12 @@ func testContext(t *testing.T) context.Context {

return ctx
}

type Concert struct {
name string
artistID *pbresource.ID
}

func (c Concert) Key() string {
return c.name
}
98 changes: 75 additions & 23 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,39 @@ func (c *controllerRunner) run(ctx context.Context) error {
})
})

for _, watch := range c.ctrl.watches {
watch := watch
for _, w := range c.ctrl.watches {
mapQueue := runQueue[mapperRequest](groupCtx, c.ctrl)

watcher := w
// Watched Type Events → Mapper Queue
group.Go(func() error {
return c.watch(groupCtx, watch.watchedType, func(res *pbresource.Resource) {
return c.watch(groupCtx, watcher.watchedType, func(res *pbresource.Resource) {
mapQueue.Add(mapperRequest{res: res})
})
})

// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runMapper(groupCtx, watch, mapQueue, recQueue)
return c.runMapper(groupCtx, watcher, mapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(mapperRequest).res)
})
})
}

for _, cw := range c.ctrl.customWatches {
customMapQueue := runQueue[Event](groupCtx, c.ctrl)
watcher := cw
// Custom Events → Mapper Queue
group.Go(func() error {
return watcher.source.Watch(groupCtx, func(e Event) {
customMapQueue.Add(e)
})
})

// Mapper Queue → Mapper → Reconciliation Queue
group.Go(func() error {
return c.runCustomMapper(groupCtx, watcher, customMapQueue, recQueue, func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error) {
return watcher.mapper(ctx, runtime, itemType.(Event))
})
})
}

Expand All @@ -71,7 +90,7 @@ func runQueue[T queue.ItemType](ctx context.Context, ctrl Controller) queue.Work
}

func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add func(*pbresource.Resource)) error {
watch, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
wl, err := c.client.WatchList(ctx, &pbresource.WatchListRequest{
Type: typ,
Tenancy: &pbresource.Tenancy{
Partition: storage.Wildcard,
Expand All @@ -85,7 +104,7 @@ func (c *controllerRunner) watch(ctx context.Context, typ *pbresource.Type, add
}

for {
event, err := watch.Recv()
event, err := wl.Recv()
if err != nil {
c.logger.Warn("error received from watch", "error", err)
return err
Expand All @@ -99,6 +118,7 @@ func (c *controllerRunner) runMapper(
w watch,
from queue.WorkQueue[mapperRequest],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_resource_type", resource.ToGVK(w.watchedType))

Expand All @@ -108,34 +128,66 @@ func (c *controllerRunner) runMapper(
return nil
}

var reqs []Request
err := c.handlePanic(func() error {
var err error
reqs, err = w.mapper(ctx, c.runtime(), item.res)
return err
})
if err != nil {
if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) runCustomMapper(
thisisnotashwin marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
cw customWatch,
from queue.WorkQueue[Event],
to queue.WorkQueue[Request],
mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error),
) error {
logger := c.logger.With("watched_event", cw.source)

for {
item, shutdown := from.Get()
if shutdown {
return nil
}

if err := c.doMap(ctx, mapper, to, item, logger); err != nil {
from.AddRateLimited(item)
from.Done(item)
continue
}

from.Forget(item)
from.Done(item)
}
}

func (c *controllerRunner) doMap(ctx context.Context, mapper func(ctx context.Context, runtime Runtime, itemType queue.ItemType) ([]Request, error), to queue.WorkQueue[Request], item queue.ItemType, logger hclog.Logger) error {
var reqs []Request
if err := c.handlePanic(func() error {
var err error
reqs, err = mapper(ctx, c.runtime(), item)
return err
}); err != nil {
return err
}

for _, r := range reqs {
if !resource.EqualType(r.ID.Type, c.ctrl.managedType) {
logger.Error("dependency mapper returned request for a resource of the wrong type",
"type_expected", resource.ToGVK(c.ctrl.managedType),
"type_got", resource.ToGVK(r.ID.Type),
)
continue
}
to.Add(r)
}
return nil
}

func (c *controllerRunner) runReconciler(ctx context.Context, queue queue.WorkQueue[Request]) error {
for {
req, shutdown := queue.Get()
Expand Down
8 changes: 8 additions & 0 deletions internal/controller/dependency_mappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@ type DependencyMapper func(
res *pbresource.Resource,
) ([]Request, error)

// CustomDependencyMapper is called when an Event occurs to determine which of the
// controller's managed resources need to be reconciled.
type CustomDependencyMapper func(
thisisnotashwin marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
rt Runtime,
event Event,
) ([]Request, error)

// MapOwner implements a DependencyMapper that returns the updated resource's owner.
func MapOwner(_ context.Context, _ Runtime, res *pbresource.Resource) ([]Request, error) {
var reqs []Request
Expand Down