From 8bc920847fab10a0b5e0fa1a3dfaccea9258b18b Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Tue, 14 Jul 2020 15:20:58 -0700 Subject: [PATCH] fix: sync hooks should be deleted after sync phase/wave completion --- pkg/sync/sync_context.go | 30 +++++----- pkg/sync/sync_context_test.go | 101 ++++++++++++++++++++++++++++++++++ pkg/sync/sync_task.go | 9 ++- pkg/sync/sync_task_test.go | 22 +++++--- 4 files changed, 138 insertions(+), 24 deletions(-) diff --git a/pkg/sync/sync_context.go b/pkg/sync/sync_context.go index ca585f431..41f6da0ea 100644 --- a/pkg/sync/sync_context.go +++ b/pkg/sync/sync_context.go @@ -10,7 +10,6 @@ import ( log "github.com/sirupsen/logrus" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/apimachinery/pkg/api/errors" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -301,16 +300,7 @@ func (sc *syncContext) Sync() { sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to get resource health: %v", err)) } else { sc.setResourceResult(task, "", operationState, message) - - // maybe delete the hook - if task.needsDeleting() { - err := sc.deleteResource(task) - if err != nil && !errors.IsNotFound(err) { - sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err)) - } - } } - } else { // this must be calculated on the live object healthStatus, err := health.GetResourceHealth(task.liveObj, sc.healthOverride) @@ -341,6 +331,19 @@ func (sc *syncContext) Sync() { return } + // delete all completed hooks which have appropriate delete policy + hooksPendingDeletion := tasks.Filter(func(task *syncTask) bool { + return task.isHook() && task.liveObj != nil && !task.running() && task.deleteOnPhaseCompletion() + }) + for _, task := range hooksPendingDeletion { + if task.deleteOnPhaseCompletion() { + err := sc.deleteResource(task) + if err != nil && !apierr.IsNotFound(err) { + sc.setResourceResult(task, "", common.OperationError, fmt.Sprintf("failed to delete resource: %v", err)) + } + } + } + // syncFailTasks only run during failure, so separate them from regular tasks syncFailTasks, tasks := tasks.Split(func(t *syncTask) bool { return t.phase == common.SyncPhaseSyncFail }) @@ -388,7 +391,7 @@ func (sc *syncContext) Sync() { } default: sc.setRunningPhase(tasks.Filter(func(task *syncTask) bool { - return task.needsDeleting() + return task.deleteOnPhaseCompletion() }), true) } } @@ -768,10 +771,11 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState { wg.Wait() } + hooksPendingDeletion := createTasks.Filter(func(t *syncTask) bool { return t.deleteBeforeCreation() }) // delete anything that need deleting - if runState == successful && createTasks.Any(func(t *syncTask) bool { return t.needsDeleting() }) { + if runState == successful && hooksPendingDeletion.Len() > 0 { var wg sync.WaitGroup - for _, task := range createTasks.Filter(func(t *syncTask) bool { return t.needsDeleting() }) { + for _, task := range hooksPendingDeletion { wg.Add(1) go func(t *syncTask) { defer wg.Done() diff --git a/pkg/sync/sync_context_test.go b/pkg/sync/sync_context_test.go index e87d0969d..6c8e25c9e 100644 --- a/pkg/sync/sync_context_test.go +++ b/pkg/sync/sync_context_test.go @@ -15,6 +15,7 @@ import ( "k8s.io/client-go/rest" testcore "k8s.io/client-go/testing" + "github.com/argoproj/gitops-engine/pkg/health" synccommon "github.com/argoproj/gitops-engine/pkg/sync/common" "github.com/argoproj/gitops-engine/pkg/utils/kube" "github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest" @@ -593,6 +594,106 @@ func TestRunSyncFailHooksFailed(t *testing.T) { assert.Equal(t, synccommon.ResultCodeSynced, resources[2].Status) } +type resourceNameHealthOverride map[string]health.HealthStatusCode + +func (r resourceNameHealthOverride) GetResourceHealth(obj *unstructured.Unstructured) (*health.HealthStatus, error) { + if status, ok := r[obj.GetName()]; ok { + return &health.HealthStatus{Status: status, Message: "test"}, nil + } + return nil, nil +} + +func TestRunSync_HooksNotDeletedIfPhaseNotCompleted(t *testing.T) { + completedHook := newHook(synccommon.HookTypePreSync) + completedHook.SetName("completed-hook") + completedHook.SetNamespace(FakeArgoCDNamespace) + _ = Annotate(completedHook, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded") + + inProgressHook := newHook(synccommon.HookTypePreSync) + inProgressHook.SetNamespace(FakeArgoCDNamespace) + inProgressHook.SetName("in-progress-hook") + _ = Annotate(inProgressHook, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded") + + syncCtx := newTestSyncCtx( + WithHealthOverride(resourceNameHealthOverride(map[string]health.HealthStatusCode{ + inProgressHook.GetName(): health.HealthStatusProgressing, + })), + WithInitialState(synccommon.OperationRunning, "", []synccommon.ResourceSyncResult{{ + ResourceKey: kube.GetResourceKey(completedHook), + HookPhase: synccommon.OperationSucceeded, + SyncPhase: synccommon.SyncPhasePreSync, + }, { + ResourceKey: kube.GetResourceKey(inProgressHook), + HookPhase: synccommon.OperationRunning, + SyncPhase: synccommon.SyncPhasePreSync, + }})) + fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + syncCtx.dynamicIf = fakeDynamicClient + deletedCount := 0 + fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + deletedCount += 1 + return true, nil, nil + }) + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{completedHook, inProgressHook}, + Target: []*unstructured.Unstructured{nil, nil}, + }) + syncCtx.hooks = []*unstructured.Unstructured{completedHook, inProgressHook} + + syncCtx.kubectl = &kubetest.MockKubectlCmd{ + Commands: map[string]kubetest.KubectlOutput{}, + } + + syncCtx.Sync() + + assert.Equal(t, synccommon.OperationRunning, syncCtx.phase) + assert.Equal(t, 0, deletedCount) +} + +func TestRunSync_HooksDeletedAfterPhaseCompleted(t *testing.T) { + completedHook1 := newHook(synccommon.HookTypePreSync) + completedHook1.SetName("completed-hook1") + completedHook1.SetNamespace(FakeArgoCDNamespace) + _ = Annotate(completedHook1, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded") + + completedHook2 := newHook(synccommon.HookTypePreSync) + completedHook2.SetNamespace(FakeArgoCDNamespace) + completedHook2.SetName("completed-hook2") + _ = Annotate(completedHook2, synccommon.AnnotationKeyHookDeletePolicy, "HookSucceeded") + + syncCtx := newTestSyncCtx( + WithInitialState(synccommon.OperationRunning, "", []synccommon.ResourceSyncResult{{ + ResourceKey: kube.GetResourceKey(completedHook1), + HookPhase: synccommon.OperationSucceeded, + SyncPhase: synccommon.SyncPhasePreSync, + }, { + ResourceKey: kube.GetResourceKey(completedHook2), + HookPhase: synccommon.OperationSucceeded, + SyncPhase: synccommon.SyncPhasePreSync, + }})) + fakeDynamicClient := fake.NewSimpleDynamicClient(runtime.NewScheme()) + syncCtx.dynamicIf = fakeDynamicClient + deletedCount := 0 + fakeDynamicClient.PrependReactor("delete", "*", func(action testcore.Action) (handled bool, ret runtime.Object, err error) { + deletedCount += 1 + return true, nil, nil + }) + syncCtx.resources = groupResources(ReconciliationResult{ + Live: []*unstructured.Unstructured{completedHook1, completedHook2}, + Target: []*unstructured.Unstructured{nil, nil}, + }) + syncCtx.hooks = []*unstructured.Unstructured{completedHook1, completedHook2} + + syncCtx.kubectl = &kubetest.MockKubectlCmd{ + Commands: map[string]kubetest.KubectlOutput{}, + } + + syncCtx.Sync() + + assert.Equal(t, synccommon.OperationSucceeded, syncCtx.phase) + assert.Equal(t, 2, deletedCount) +} + func Test_syncContext_liveObj(t *testing.T) { type fields struct { compareResult ReconciliationResult diff --git a/pkg/sync/sync_task.go b/pkg/sync/sync_task.go index 672276ede..4a4ab7664 100644 --- a/pkg/sync/sync_task.go +++ b/pkg/sync/sync_task.go @@ -128,8 +128,11 @@ func (t *syncTask) hasHookDeletePolicy(policy common.HookDeletePolicy) bool { return false } -func (t *syncTask) needsDeleting() bool { - return t.liveObj != nil && (t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation) || - t.successful() && t.hasHookDeletePolicy(common.HookDeletePolicyHookSucceeded) || +func (t *syncTask) deleteBeforeCreation() bool { + return t.liveObj != nil && t.pending() && t.hasHookDeletePolicy(common.HookDeletePolicyBeforeHookCreation) +} + +func (t *syncTask) deleteOnPhaseCompletion() bool { + return t.liveObj != nil && (t.successful() && t.hasHookDeletePolicy(common.HookDeletePolicyHookSucceeded) || t.failed() && t.hasHookDeletePolicy(common.HookDeletePolicyHookFailed)) } diff --git a/pkg/sync/sync_task_test.go b/pkg/sync/sync_task_test.go index 36924cc45..c331cecf8 100644 --- a/pkg/sync/sync_task_test.go +++ b/pkg/sync/sync_task_test.go @@ -52,16 +52,22 @@ func Test_syncTask_hasHookDeletePolicy(t *testing.T) { assert.True(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).hasHookDeletePolicy(common.HookDeletePolicyHookFailed)) } -func Test_syncTask_needsDeleting(t *testing.T) { - assert.False(t, (&syncTask{liveObj: NewPod()}).needsDeleting()) +func Test_syncTask_deleteOnPhaseCompletion(t *testing.T) { + assert.False(t, (&syncTask{liveObj: NewPod()}).deleteOnPhaseCompletion()) // must be hook - assert.False(t, (&syncTask{liveObj: Annotate(NewPod(), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting()) + assert.True(t, (&syncTask{operationState: common.OperationSucceeded, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookSucceeded")}).deleteOnPhaseCompletion()) + assert.True(t, (&syncTask{operationState: common.OperationFailed, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).deleteOnPhaseCompletion()) +} + +func Test_syncTask_deleteBeforeCreation(t *testing.T) { + assert.False(t, (&syncTask{liveObj: NewPod()}).deleteBeforeCreation()) + // must be hook + assert.False(t, (&syncTask{liveObj: Annotate(NewPod(), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation()) // no need to delete if no live obj - assert.False(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argoocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting()) - assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting()) - assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).needsDeleting()) - assert.True(t, (&syncTask{operationState: common.OperationSucceeded, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookSucceeded")}).needsDeleting()) - assert.True(t, (&syncTask{operationState: common.OperationFailed, liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "HookFailed")}).needsDeleting()) + assert.False(t, (&syncTask{targetObj: Annotate(Annotate(NewPod(), "argoocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation()) + assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation()) + assert.True(t, (&syncTask{liveObj: Annotate(Annotate(NewPod(), "argocd.argoproj.io/hook", "Sync"), "argocd.argoproj.io/hook-delete-policy", "BeforeHookCreation")}).deleteBeforeCreation()) + } func Test_syncTask_wave(t *testing.T) {