Skip to content

Commit

Permalink
Remediate watch filter label removal (#1328)
Browse files Browse the repository at this point in the history
This change handles future cases where watch filtering by label
causes delete events when the required label is removed.
  • Loading branch information
karlkfi committed Jul 11, 2024
1 parent 49648d5 commit b478dcd
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 92 deletions.
68 changes: 46 additions & 22 deletions pkg/remediator/reconcile/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,29 @@ func (w *Worker) process(ctx context.Context, obj client.Object) error {
id := core.IDOf(obj)
var toRemediate client.Object
if queue.WasDeleted(ctx, obj) {
// Passing a nil Object to the reconciler signals that the accompanying ID
// is for an Object that was deleted.
toRemediate = nil
// Unwrap deleted object
prevObj := obj.(*queue.Deleted).Object
// Get object from cluster to confirm deletion.
// This is required because delete events always include the previous
// object state, even if the object wasn't actually deleted, which can
// happen when a selected label is removed.
latestObj, err := w.getObject(ctx, prevObj)
if err != nil {
// Failed to confirm object state. Enqueue for retry.
w.objectQueue.Retry(obj)
return fmt.Errorf("failed to remediate %q: %w", id, err)
}
if queue.WasDeleted(ctx, latestObj) {
// Confirmed not on the server.
// Passing a nil Object to the reconciler signals that the
// accompanying ID is for an Object that was deleted.
toRemediate = nil
} else {
// Not actually deleted, or if it was then it's since been recreated.
// Pass the latest Object state to the reconciler to handle as if
// in response to an update event.
toRemediate = latestObj
}
} else {
toRemediate = obj
}
Expand All @@ -128,25 +148,29 @@ func (w *Worker) process(ctx context.Context, obj client.Object) error {
}

// refresh updates the cached version of the object.
func (w *Worker) refresh(ctx context.Context, o client.Object) status.Error {
c := w.reconciler.GetClient()

// Try to get an updated version of the object from the cluster.
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(o.GetObjectKind().GroupVersionKind())
err := c.Get(ctx, client.ObjectKey{Name: o.GetName(), Namespace: o.GetNamespace()}, u)

switch {
case apierrors.IsNotFound(err):
// The object no longer exists on the cluster, so mark it deleted.
w.objectQueue.Add(queue.MarkDeleted(ctx, o))
case err != nil:
// We encountered some other error that we don't know how to solve, so
// surface it.
return status.APIServerError(err, "failed to get updated object for worker cache", o)
default:
// Update the cached version of the resource.
w.objectQueue.Add(u)
func (w *Worker) refresh(ctx context.Context, obj client.Object) status.Error {
obj, err := w.getObject(ctx, obj)
if err != nil {
return err
}
// Enqueue object for remediation
w.objectQueue.Add(obj)
return nil
}

// getObject updates the object from the server.
// Wraps the supplied object with MarkDeleted, if NotFound.
func (w *Worker) getObject(ctx context.Context, obj client.Object) (client.Object, status.Error) {
uObj := &unstructured.Unstructured{}
uObj.SetGroupVersionKind(obj.GetObjectKind().GroupVersionKind())
err := w.reconciler.GetClient().Get(ctx, client.ObjectKeyFromObject(obj), uObj)
if err != nil {
// If not found, wrap for processing as a deleted object
if apierrors.IsNotFound(err) {
return queue.MarkDeleted(ctx, obj), nil
}
// Surface any other errors
return uObj, status.APIServerError(err, "failed to get updated object for worker cache", obj)
}
return uObj, nil
}
200 changes: 130 additions & 70 deletions pkg/remediator/reconcile/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,84 +44,144 @@ import (
// TestWorker_Run_Remediates verifies that worker.Run remediates declared
// objects added to the queue.
func TestWorker_Run_Remediates(t *testing.T) {
ctx := context.Background()

existingObjs := []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled),
fake.ClusterRoleObject(syncertest.ManagementEnabled),
}
declaredObjs := []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled),
fake.ClusterRoleObject(syncertest.ManagementEnabled),
}
changedObjs := []client.Object{
queue.MarkDeleted(ctx, fake.ClusterRoleBindingObject()),
fake.ClusterRoleObject(syncertest.ManagementEnabled,
core.Label("new", "label")),
}
expectedObjs := []client.Object{
// CRB delete should be reverted
// TODO: Upgrade FakeClient to increment UID after deletion
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.UID("1"), core.ResourceVersion("1"), core.Generation(1),
),
// Role change should be reverted
fake.ClusterRoleObject(syncertest.ManagementEnabled,
core.UID("1"), core.ResourceVersion("3"), core.Generation(1),
),
testCases := []struct {
name string
existingObjs []client.Object
declaredObjs []client.Object
changedObjs []client.Object
eventObjs []client.Object
expectedObjs []client.Object
}{
{
name: "revert delete",
existingObjs: []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled),
},
declaredObjs: []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled),
},
changedObjs: []client.Object{
queue.MarkDeleted(context.Background(), fake.ClusterRoleBindingObject()),
},
eventObjs: []client.Object{
queue.MarkDeleted(context.Background(), fake.ClusterRoleBindingObject()),
},
expectedObjs: []client.Object{
// TODO: Upgrade FakeClient to increment UID after deletion
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.UID("1"), core.ResourceVersion("1"), core.Generation(1),
),
},
},
{
name: "revert watch filter label removal",
existingObjs: []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.Label("example-label", "example-value")),
},
declaredObjs: []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.Label("example-label", "example-value")),
},
changedObjs: []client.Object{
// Update object to remove label
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled),
},
eventObjs: []client.Object{
// Watch server treats update to remove required label as a delete.
// Delete event includes previous object state.
queue.MarkDeleted(context.Background(),
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.Label("example-label", "example-value"))),
},
expectedObjs: []client.Object{
fake.ClusterRoleBindingObject(syncertest.ManagementEnabled,
core.Label("example-label", "example-value"),
core.UID("1"), core.ResourceVersion("3"), core.Generation(1),
),
},
},
{
name: "revert update",
existingObjs: []client.Object{
fake.ClusterRoleObject(syncertest.ManagementEnabled),
},
declaredObjs: []client.Object{
fake.ClusterRoleObject(syncertest.ManagementEnabled),
},
changedObjs: []client.Object{
fake.ClusterRoleObject(syncertest.ManagementEnabled,
core.Label("new", "label")),
},
eventObjs: []client.Object{
fake.ClusterRoleObject(syncertest.ManagementEnabled,
core.Label("new", "label")),
},
expectedObjs: []client.Object{
// Role change should be reverted
fake.ClusterRoleObject(syncertest.ManagementEnabled,
core.UID("1"), core.ResourceVersion("3"), core.Generation(1),
),
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()

q := queue.New("test")
defer q.ShutDown()

c := testingfake.NewClient(t, core.Scheme, existingObjs...)
q := queue.New("test")
defer q.ShutDown()

d := makeDeclared(t, randomCommitHash(), declaredObjs...)
w := NewWorker(declared.RootScope, configsync.RootSyncName, c.Applier(configsync.FieldManager), q, d, syncertestfake.NewFightHandler())
c := testingfake.NewClient(t, core.Scheme, tc.existingObjs...)

ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Run worker in the background
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
w.Run(ctx)
}()
d := makeDeclared(t, randomCommitHash(), tc.declaredObjs...)
w := NewWorker(declared.RootScope, configsync.RootSyncName, c.Applier(configsync.FieldManager), q, d, syncertestfake.NewFightHandler())

// Execute runtime changes
for _, obj := range changedObjs {
if deletedObj, ok := obj.(*queue.Deleted); ok {
if err := c.Delete(ctx, deletedObj.Object); err != nil {
t.Fatalf("Failed to delete object in fake client: %v", err)
}
} else {
if err := c.Update(ctx, obj, client.FieldOwner(testingfake.FieldManager)); err != nil {
t.Fatalf("Failed to update object in fake client: %v", err)
ctx, cancel := context.WithCancel(ctx)
defer cancel()

// Run worker in the background
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
w.Run(ctx)
}()

// Execute runtime changes
for _, obj := range tc.changedObjs {
if deletedObj, ok := obj.(*queue.Deleted); ok {
if err := c.Delete(ctx, deletedObj.Object); err != nil {
t.Fatalf("Failed to delete object in fake client: %v", err)
}
} else {
if err := c.Update(ctx, obj, client.FieldOwner(testingfake.FieldManager)); err != nil {
t.Fatalf("Failed to update object in fake client: %v", err)
}
}
}
}
}

// Simulate watch events to add the objects to the queue
for _, obj := range changedObjs {
q.Add(obj)
}

// Give the worker a few seconds to remediate
// TODO: use client.Watch to watch for the desired changes (requires FakeClient to impl Watch).
time.Sleep(2 * time.Second)
cancel()
// Simulate watch events to add the objects to the queue
for _, obj := range tc.eventObjs {
q.Add(obj)
}

// Wait for worker to exit or timeout
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
select {
case <-timeout.C:
// fail
t.Error("Run() failed to return when context was cancelled")
case <-doneCh:
// pass
c.Check(t, expectedObjs...)
// Give the worker a few seconds to remediate
// TODO: use client.Watch to watch for the desired changes (requires FakeClient to impl Watch).
time.Sleep(2 * time.Second)
cancel()

// Wait for worker to exit or timeout
timeout := time.NewTimer(5 * time.Second)
defer timeout.Stop()
select {
case <-timeout.C:
// fail
t.Error("Run() failed to return when context was cancelled")
case <-doneCh:
// pass
c.Check(t, tc.expectedObjs...)
}
})
}
}

Expand Down

0 comments on commit b478dcd

Please sign in to comment.