diff --git a/cmd/hydration-controller/main.go b/cmd/hydration-controller/main.go index 805d9e6b23..127e54d850 100644 --- a/cmd/hydration-controller/main.go +++ b/cmd/hydration-controller/main.go @@ -19,16 +19,17 @@ import ( "flag" "os" "strings" - "time" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" + "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/kmetrics" "kpt.dev/configsync/pkg/profiler" "kpt.dev/configsync/pkg/reconcilermanager" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/util/log" ctrl "sigs.k8s.io/controller-runtime" ) @@ -55,15 +56,18 @@ var ( syncDir = flag.String("sync-dir", os.Getenv(reconcilermanager.SyncDirKey), "Relative path of the root directory within the repo.") - hydrationPollingPeriodStr = flag.String("polling-period", os.Getenv(reconcilermanager.HydrationPollingPeriod), + // TODO: rename to `polling-delay` to match internal name. + pollingDelay = flag.Duration("polling-period", + controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay), "Period of time between checking the filesystem for rendering the DRY configs.") - // rehydratePeriod sets the hydration-controller to re-run the hydration process + // rehydrateDelay sets the hydration-controller to re-run the hydration process // periodically when errors happen. It retries on both transient errors and permanent errors. // Other ways to trigger the hydration process are: // - push a new commit // - delete the done file from the hydration-controller. - rehydratePeriod = flag.Duration("rehydrate-period", 30*time.Minute, + // TODO: rename to `rehydrate-delay` to match internal name. + rehydrateDelay = flag.Duration("rehydrate-period", configsync.DefaultHydrationRetryDelay, "Period of time between rehydrating on errors.") reconcilerName = flag.String("reconciler-name", os.Getenv(reconcilermanager.ReconcilerNameKey), @@ -107,22 +111,17 @@ func main() { dir := strings.TrimPrefix(*syncDir, "/") relSyncDir := cmpath.RelativeOS(dir) - hydrationPollingPeriod, err := time.ParseDuration(*hydrationPollingPeriodStr) - if err != nil { - klog.Fatalf("Failed to get hydration polling period: %v", err) - } - hydrator := &hydrate.Hydrator{ - DonePath: absDonePath, - SourceType: v1beta1.SourceType(*sourceType), - SourceRoot: absSourceRootDir, - HydratedRoot: absHydratedRootDir, - SourceLink: *sourceLinkDir, - HydratedLink: *hydratedLinkDir, - SyncDir: relSyncDir, - PollingFrequency: hydrationPollingPeriod, - RehydrateFrequency: *rehydratePeriod, - ReconcilerName: *reconcilerName, + DonePath: absDonePath, + SourceType: v1beta1.SourceType(*sourceType), + SourceRoot: absSourceRootDir, + HydratedRoot: absHydratedRootDir, + SourceLink: *sourceLinkDir, + HydratedLink: *hydratedLinkDir, + SyncDir: relSyncDir, + PollingDelay: *pollingDelay, + RehydrateDelay: *rehydrateDelay, + ReconcilerName: *reconcilerName, } hydrator.Run(context.Background()) diff --git a/cmd/reconciler-manager/main.go b/cmd/reconciler-manager/main.go index 51379f2530..aa7bf555c6 100644 --- a/cmd/reconciler-manager/main.go +++ b/cmd/reconciler-manager/main.go @@ -41,10 +41,12 @@ var ( clusterName = flag.String("cluster-name", os.Getenv(reconcilermanager.ClusterNameKey), "Cluster name to use for Cluster selection") - reconcilerPollingPeriod = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod), + // TODO: rename to `reconciler-polling-delay` to match internal name. + reconcilerPollingDelay = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay), "How often the reconciler should poll the filesystem for updates to the source or rendered configs.") - hydrationPollingPeriod = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingPeriod), + // TODO: rename to `hydration-polling-delay` to match internal name. + hydrationPollingDelay = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay), "How often the hydration-controller should poll the filesystem for rendering the DRY configs.") setupLog = ctrl.Log.WithName("setup") @@ -78,7 +80,7 @@ func main() { watchFleetMembership := fleetMembershipCRDExists(mgr.GetConfig(), mgr.GetRESTMapper()) - repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(), + repoSync := controllers.NewRepoSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(), ctrl.Log.WithName("controllers").WithName(configsync.RepoSyncKind), mgr.GetScheme()) if err := repoSync.SetupWithManager(mgr, watchFleetMembership); err != nil { @@ -86,7 +88,7 @@ func main() { os.Exit(1) } - rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingPeriod, *hydrationPollingPeriod, mgr.GetClient(), + rootSync := controllers.NewRootSyncReconciler(*clusterName, *reconcilerPollingDelay, *hydrationPollingDelay, mgr.GetClient(), ctrl.Log.WithName("controllers").WithName(configsync.RootSyncKind), mgr.GetScheme()) if err := rootSync.SetupWithManager(mgr, watchFleetMembership); err != nil { diff --git a/cmd/reconciler/main.go b/cmd/reconciler/main.go index b078023971..ead45244ac 100644 --- a/cmd/reconciler/main.go +++ b/cmd/reconciler/main.go @@ -72,11 +72,14 @@ var ( fightDetectionThreshold = flag.Float64( "fight-detection-threshold", 5.0, "The rate of updates per minute to an API Resource at which the Syncer logs warnings about too many updates to the resource.") + // TODO: rename to `resync-delay` to match internal name resyncPeriod = flag.Duration("resync-period", time.Hour, "Period of time between forced re-syncs from source (even without a new commit).") workers = flag.Int("workers", 1, "Number of concurrent remediator workers to run at once.") - filesystemPollingPeriod = flag.Duration("filesystem-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod), + // TODO: rename to `filesystem-polling-delay` to match internal name. + pollingDelay = flag.Duration("filesystem-polling-period", + controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay), "Period of time between checking the filesystem for updates to the source or rendered configs.") // Root-Repo-only flags. If set for a Namespace-scoped Reconciler, causes the Reconciler to fail immediately. @@ -161,26 +164,28 @@ func main() { } opts := reconciler.Options{ - ClusterName: *clusterName, - FightDetectionThreshold: *fightDetectionThreshold, - NumWorkers: *workers, - ReconcilerScope: declared.Scope(*scope), - ResyncPeriod: *resyncPeriod, - FilesystemPollingFrequency: *filesystemPollingPeriod, - SourceRoot: absSourceDir, - RepoRoot: absRepoRoot, - HydratedRoot: *hydratedRootDir, - HydratedLink: *hydratedLinkDir, - SourceRev: *sourceRev, - SourceBranch: *sourceBranch, - SourceType: v1beta1.SourceType(*sourceType), - SourceRepo: *sourceRepo, - SyncDir: relSyncDir, - SyncName: *syncName, - ReconcilerName: *reconcilerName, - StatusMode: *statusMode, - ReconcileTimeout: *reconcileTimeout, - APIServerTimeout: *apiServerTimeout, + ClusterName: *clusterName, + FightDetectionThreshold: *fightDetectionThreshold, + NumWorkers: *workers, + ReconcilerScope: declared.Scope(*scope), + PollingDelay: *pollingDelay, + ResyncDelay: *resyncPeriod, + RetryDelay: configsync.DefaultReconcilerRetryDelay, + StatusUpdateDelay: configsync.DefaultSyncStatusUpdateDelay, + SourceRoot: absSourceDir, + RepoRoot: absRepoRoot, + HydratedRoot: *hydratedRootDir, + HydratedLink: *hydratedLinkDir, + SourceRev: *sourceRev, + SourceBranch: *sourceBranch, + SourceType: v1beta1.SourceType(*sourceType), + SourceRepo: *sourceRepo, + SyncDir: relSyncDir, + SyncName: *syncName, + ReconcilerName: *reconcilerName, + StatusMode: *statusMode, + ReconcileTimeout: *reconcileTimeout, + APIServerTimeout: *apiServerTimeout, } if declared.Scope(*scope) == declared.RootReconciler { diff --git a/e2e/nomostest/metrics/validate.go b/e2e/nomostest/metrics/validate.go index 8ec4864253..2a0a036c39 100644 --- a/e2e/nomostest/metrics/validate.go +++ b/e2e/nomostest/metrics/validate.go @@ -15,6 +15,7 @@ package metrics import ( + "fmt" "strconv" "github.com/google/go-cmp/cmp" @@ -276,12 +277,18 @@ func (csm ConfigSyncMetrics) ValidateReconcilerErrors(podName string, sourceValu sourceMetrics := podMetrics.FilterByComponent("source") err := sourceMetrics.validateMetric(metric, valueEquals(metric, sourceValue)) if err != nil { + // Source errors should always be recorded before Sync errors, so don't + // ignore NotFound. return errors.Wrapf(err, `for pod=%q and component="source"`, podName) } syncMetrics := podMetrics.FilterByComponent("sync") err = syncMetrics.validateMetric(metric, valueEquals(metric, syncValue)) if err != nil { - return errors.Wrapf(err, `for pod=%q and component="sync"`, podName) + // Sync errors may not be recorded if there were Source errors. + // So ignore NotFound if Source errors are expected and Sync errors are not. + if !(errors.Is(err, &MetricNotFoundError{Name: metric}) && sourceValue > 0 && syncValue == 0) { + return errors.Wrapf(err, `for pod=%q and component="sync"`, podName) + } } return nil } @@ -411,15 +418,19 @@ func (csm ConfigSyncMetrics) validateMetric(name string, validations ...Validati return true } - if entries, ok := csm[name]; ok { - for _, e := range entries { - if allValidated(e, validations) { - return nil - } + entries, ok := csm[name] + if !ok { + // Use a typed error so it can be caught by the caller. + err := &MetricNotFoundError{Name: name} + return errors.Wrapf(err, "validating metric %q", name) + } + + for _, e := range entries { + if allValidated(e, validations) { + return nil } - return errors.Wrapf(errs, "validating metric %q", name) } - return errors.Errorf("validating metric %q: metric not found", name) + return errors.Wrapf(errs, "validating metric %q", name) } // hasTags checks that the measurement contains all the expected tags. @@ -495,3 +506,23 @@ func valueGTE(name string, value int) Validation { return nil } } + +// MetricNotFoundError means the metric being validated was not found. +type MetricNotFoundError struct { + Name string +} + +// Error returns the error message string. +func (mnfe *MetricNotFoundError) Error() string { + return fmt.Sprintf("metric not found: %q", mnfe.Name) +} + +// Is returns true if the specified error is a *MetricNotFoundError and has the +// same Name. +func (mnfe *MetricNotFoundError) Is(err error) bool { + tErr, ok := err.(*MetricNotFoundError) + if !ok { + return false + } + return tErr.Name == mnfe.Name +} diff --git a/e2e/nomostest/wait_for_sync.go b/e2e/nomostest/wait_for_sync.go index 90fc0bbe06..5465242630 100644 --- a/e2e/nomostest/wait_for_sync.go +++ b/e2e/nomostest/wait_for_sync.go @@ -244,7 +244,7 @@ func (nt *NT) WaitForRootSyncRenderingError(rsName, code string, message string, } // WaitForRootSyncSyncError waits until the given error (code and message) is present on the RootSync resource -func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, ignoreSyncingCondition bool, opts ...WaitOption) { +func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, opts ...WaitOption) { Wait(nt.T, fmt.Sprintf("RootSync %s rendering error code %s", rsName, code), nt.DefaultWaitTimeout, func() error { rs := fake.RootSyncObjectV1Beta1(rsName) @@ -252,9 +252,6 @@ func (nt *NT) WaitForRootSyncSyncError(rsName, code string, message string, igno if err != nil { return err } - if ignoreSyncingCondition { - return validateError(rs.Status.Sync.Errors, code, message) - } syncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing) return validateRootSyncError(rs.Status.Sync.Errors, syncingCondition, code, message, []v1beta1.ErrorSource{v1beta1.SyncError}) }, diff --git a/e2e/testcases/hydration_test.go b/e2e/testcases/hydration_test.go index 6c7acf6448..d5b67895a4 100644 --- a/e2e/testcases/hydration_test.go +++ b/e2e/testcases/hydration_test.go @@ -19,26 +19,30 @@ import ( "reflect" "testing" - "github.com/pkg/errors" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - nomosstatus "kpt.dev/configsync/cmd/nomos/status" "kpt.dev/configsync/e2e/nomostest" "kpt.dev/configsync/e2e/nomostest/ntopts" nomostesting "kpt.dev/configsync/e2e/nomostest/testing" v1 "kpt.dev/configsync/pkg/api/configmanagement/v1" "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" + "kpt.dev/configsync/pkg/core" "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/importer/analyzer/validation/nonhierarchical" "kpt.dev/configsync/pkg/metadata" + "kpt.dev/configsync/pkg/parse" "kpt.dev/configsync/pkg/reconcilermanager" + "kpt.dev/configsync/pkg/rootsync" "kpt.dev/configsync/pkg/status" "kpt.dev/configsync/pkg/testing/fake" + "sigs.k8s.io/cli-utils/pkg/testutil" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -63,12 +67,9 @@ func TestHydrateKustomizeComponents(t *testing.T) { nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "kustomize-components"})) rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary := getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit := nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus := "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) validateAllTenants(nt, string(declared.RootReconciler), "base", "tenant-a", "tenant-b", "tenant-c") @@ -78,12 +79,9 @@ func TestHydrateKustomizeComponents(t *testing.T) { nt.WaitForRootSyncRenderingError(configsync.RootSyncName, status.ActionableHydrationErrorCode, "") rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "ERROR" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Errorf("%v\nExpected error: Kustomization should be missing.\n", err) - } + validateRootSyncRenderingErrors(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + status.ActionableHydrationErrorCode) nt.T.Log("Add kustomization.yaml back") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/kustomize-components/kustomization.yml", "./kustomize-components/kustomization.yml") @@ -91,12 +89,9 @@ func TestHydrateKustomizeComponents(t *testing.T) { nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "kustomize-components"})) rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Make kustomization.yaml invalid") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/invalid-kustomization.yaml", "./kustomize-components/kustomization.yml") @@ -104,12 +99,9 @@ func TestHydrateKustomizeComponents(t *testing.T) { nt.WaitForRootSyncRenderingError(configsync.RootSyncName, status.ActionableHydrationErrorCode, "") rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "ERROR" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Errorf("%v\nExpected error: Should fail to run kustomize build.\n", err) - } + validateRootSyncRenderingErrors(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + status.ActionableHydrationErrorCode) } func TestHydrateHelmComponents(t *testing.T) { @@ -135,12 +127,9 @@ func TestHydrateHelmComponents(t *testing.T) { nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "helm-components"})) rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary := getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit := nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus := "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) validateHelmComponents(nt, string(declared.RootReconciler)) @@ -155,12 +144,9 @@ func TestHydrateHelmComponents(t *testing.T) { } rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Use the render-helm-chart function to render the charts") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/krm-function-helm-components-kustomization.yaml", "./helm-components/kustomization.yaml") @@ -172,12 +158,9 @@ func TestHydrateHelmComponents(t *testing.T) { nt.T.Fatal(err) } rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Use the render-helm-chart function to render the charts with multiple remote values.yaml files") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/krm-function-helm-components-remote-values-kustomization.yaml", "./helm-components/kustomization.yaml") @@ -189,12 +172,9 @@ func TestHydrateHelmComponents(t *testing.T) { nt.T.Fatal(err) } rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) } func TestHydrateHelmOverlay(t *testing.T) { @@ -230,12 +210,9 @@ func TestHydrateHelmOverlay(t *testing.T) { } rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary := getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit := nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus := "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Make the hydration fail by checking in an invalid kustomization.yaml") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/resource-duplicate/kustomization.yaml", "./helm-overlay/kustomization.yaml") @@ -244,12 +221,9 @@ func TestHydrateHelmOverlay(t *testing.T) { nt.WaitForRootSyncRenderingError(configsync.RootSyncName, status.ActionableHydrationErrorCode, "") rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "ERROR" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Errorf("%v\nExpected errors: kustomization should be invalid.\n", err) - } + validateRootSyncRenderingErrors(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + status.ActionableHydrationErrorCode) nt.T.Log("Make the parsing fail by checking in a deprecated group and kind") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/deprecated-GK/kustomization.yaml", "./helm-overlay/kustomization.yaml") @@ -257,12 +231,9 @@ func TestHydrateHelmOverlay(t *testing.T) { nt.WaitForRootSyncSourceError(configsync.RootSyncName, nonhierarchical.DeprecatedGroupKindErrorCode, "") rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "ERROR" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Errorf("%v\nExpected errors: group and kind should be deprecated.\n", err) - } + validateRootSyncSourceErrors(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + nonhierarchical.DeprecatedGroupKindErrorCode) nt.T.Log("Use the render-helm-chart function to render the charts") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/helm-overlay/kustomization.yaml", "./helm-overlay/kustomization.yaml") @@ -271,12 +242,9 @@ func TestHydrateHelmOverlay(t *testing.T) { nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "helm-overlay"})) rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Make the parsing fail again by checking in a deprecated group and kind with the render-helm-chart function") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/krm-function-deprecated-GK-kustomization.yaml", "./helm-overlay/kustomization.yaml") @@ -284,12 +252,9 @@ func TestHydrateHelmOverlay(t *testing.T) { nt.WaitForRootSyncSourceError(configsync.RootSyncName, nonhierarchical.DeprecatedGroupKindErrorCode, "") rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary = getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit = nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus = "ERROR" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Errorf("%v\nExpected errors: group and kind should be deprecated.\n", err) - } + validateRootSyncSourceErrors(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + nonhierarchical.DeprecatedGroupKindErrorCode) } func TestHydrateRemoteResources(t *testing.T) { @@ -309,6 +274,7 @@ func TestHydrateRemoteResources(t *testing.T) { nt.T.Log("Add the remote-base root directory") nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/remote-base", ".") nt.RootRepos[configsync.RootSyncName].CommitAndPush("add DRY configs to the repository") + nt.T.Log("Update RootSync to sync from the remote-base directory without enable shell in hydration controller") rs := fake.RootSyncObjectV1Beta1(configsync.RootSyncName) nt.MustMergePatch(rs, `{"spec": {"git": {"dir": "remote-base"}}}`) @@ -317,11 +283,13 @@ func TestHydrateRemoteResources(t *testing.T) { nt.T.Log("Enable shell in hydration controller") nt.MustMergePatch(rs, `{"spec": {"override": {"enableShellInRendering": true}}}`) nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "remote-base"})) + err = nt.Validate(nomostest.DefaultRootReconcilerName, v1.NSConfigManagementSystem, &appsv1.Deployment{}, nomostest.HasExactlyImage(reconcilermanager.HydrationController, reconcilermanager.HydrationControllerWithShell, "", "")) if err != nil { nt.T.Fatal(err) } + expectedOrigin := "path: base/namespace.yaml\nrepo: https://github.com/config-sync-examples/kustomize-components\nref: main\n" nt.T.Log("Validate resources are synced") var expectedNamespaces = []string{"tenant-a"} @@ -351,10 +319,12 @@ func TestHydrateRemoteResources(t *testing.T) { nt.T.Log("Disable shell in hydration controller") nt.MustMergePatch(rs, `{"spec": {"override": {"enableShellInRendering": false}, "git": {"dir": "acme"}}}`) nt.WaitForRepoSyncs() + nt.RootRepos[configsync.RootSyncName].Copy("../testdata/hydration/remote-base", ".") nt.RootRepos[configsync.RootSyncName].CommitAndPush("add DRY configs to the repository") nt.T.Log("Update RootSync to sync from the remote-base directory when disable shell in hydration controller") nt.MustMergePatch(rs, `{"spec": {"git": {"dir": "remote-base"}}}`) + nt.WaitForRootSyncRenderingError(configsync.RootSyncName, status.ActionableHydrationErrorCode, "") err = nt.Validate(nomostest.DefaultRootReconcilerName, v1.NSConfigManagementSystem, &appsv1.Deployment{}, nomostest.HasExactlyImage(reconcilermanager.HydrationController, reconcilermanager.HydrationController, "", "")) @@ -381,12 +351,9 @@ func TestHydrateResourcesInRelativePath(t *testing.T) { nt.WaitForRepoSyncs(nomostest.WithSyncDirectoryMap(map[types.NamespacedName]string{nomostest.DefaultRootRepoNamespacedName: "relative-path/overlays/dev"})) rs = getUpdatedRootSync(nt, configsync.RootSyncName, configsync.ControllerNamespace) - rsCommit, rsStatus, rsErrorSummary := getRootSyncCommitStatusErrorSummary(rs, nil, false) - latestCommit := nt.RootRepos[configsync.RootSyncName].Hash() - expectedStatus := "SYNCED" - if err := validateRootSyncRepoState(latestCommit, rsCommit, expectedStatus, rsStatus, rsErrorSummary); err != nil { - nt.T.Error(err) - } + validateRootSyncSyncCompleted(nt, rs, + nt.RootRepos[configsync.RootSyncName].Hash(), + parse.RenderingSucceeded) nt.T.Log("Validating resources are synced") if err := nt.Validate("foo", "", &corev1.Namespace{}, nomostest.HasAnnotation(metadata.KustomizeOrigin, "path: ../../base/foo/namespace.yaml\n")); err != nil { @@ -424,6 +391,9 @@ func validateNamespaces(nt *nomostest.NT, expectedNamespaces []string, expectedO if !reflect.DeepEqual(actualNamespaces, expectedNamespaces) { nt.T.Errorf("expected namespaces: %v, but got: %v", expectedNamespaces, actualNamespaces) } + if nt.T.Failed() { + nt.T.FailNow() + } } func containerImagePullPolicy(policy string) nomostest.Predicate { @@ -441,13 +411,6 @@ func containerImagePullPolicy(policy string) nomostest.Predicate { } } -// getRootSyncCommitStatusErrorSummary converts the given rootSync into a RepoState, then into a string -func getRootSyncCommitStatusErrorSummary(rootSync *v1beta1.RootSync, rg *unstructured.Unstructured, syncingConditionSupported bool) (string, string, string) { - rs := nomosstatus.RootRepoStatus(rootSync, rg, syncingConditionSupported) - - return rs.GetCommit(), rs.GetStatus(), fmt.Sprintf("%v", rs.GetErrorSummary()) -} - // getUpdatedRootSync gets the most recent RootSync from Client func getUpdatedRootSync(nt *nomostest.NT, name string, namespace string) *v1beta1.RootSync { rs := &v1beta1.RootSync{} @@ -457,15 +420,6 @@ func getUpdatedRootSync(nt *nomostest.NT, name string, namespace string) *v1beta return rs } -// validateRootSyncRepoState verifies the output from getRootSyncCommitStatusErrorSummary is as expected. -func validateRootSyncRepoState(expectedCommit string, commit string, expectedStatus string, status string, errorSummary string) error { - if expectedCommit != commit || expectedStatus != status { - return errors.Errorf("Error: rootSync does not match expected. Got: commit: %v, status: %v\nError Summary: %v\nExpected: commit: %v, status: %v\n", - commit, status, errorSummary, expectedCommit, expectedStatus) - } - return nil -} - // validateAllTenants validates if resources for all tenants are rendered, created and managed by the reconciler. func validateAllTenants(nt *nomostest.NT, reconcilerScope, baseRelPath string, tenants ...string) { nt.T.Logf("Validate resources are synced for all tenants %s", tenants) @@ -496,6 +450,9 @@ func validateTenant(nt *nomostest.NT, reconcilerScope, tenant, baseRelPath strin nomostest.HasAnnotation(metadata.ResourceManagerKey, reconcilerScope)); err != nil { nt.T.Error(err) } + if nt.T.Failed() { + nt.T.FailNow() + } } // validateHelmComponents validates if all resources are rendered, created and managed by the reconciler. @@ -513,4 +470,262 @@ func validateHelmComponents(nt *nomostest.NT, reconcilerScope string) { nomostest.HasAnnotation(metadata.ResourceManagerKey, reconcilerScope)); err != nil { nt.T.Error(err) } + if nt.T.Failed() { + nt.T.FailNow() + } +} + +// gitRevisionOrDefault returns the specified Revision or the default value. +func gitRevisionOrDefault(git v1beta1.Git) string { + if git.Revision == "" { + return "HEAD" + } + return git.Revision +} + +func validateRootSyncSyncCompleted(nt *nomostest.NT, rs *v1beta1.RootSync, commit, renderingMessage string) { + nt.T.Helper() + + // Use a custom asserter so we can ignore hard-to-test fields. + // Testing whole structs makes debugging easier by printing the full + // expected and actual values, but it will also print any ignored fields. + asserter := testutil.NewAsserter( + cmpopts.IgnoreFields(v1beta1.SourceStatus{}, "LastUpdate"), + cmpopts.IgnoreFields(v1beta1.RenderingStatus{}, "LastUpdate"), + cmpopts.IgnoreFields(v1beta1.SyncStatus{}, "LastUpdate"), + cmpopts.IgnoreFields(v1beta1.RootSyncCondition{}, "LastUpdateTime", "LastTransitionTime", "Message"), + cmpopts.IgnoreFields(v1beta1.ConfigSyncError{}, "ErrorMessage", "Resources"), + ) + + expectedRootSyncStatus := v1beta1.Status{ + ObservedGeneration: rs.Generation, + Reconciler: core.RootReconcilerName(rs.Name), + LastSyncedCommit: commit, + Source: v1beta1.SourceStatus{ + Git: &v1beta1.GitStatus{ + Repo: rs.Spec.Repo, + Revision: gitRevisionOrDefault(*rs.Spec.Git), + Branch: rs.Spec.Git.Branch, + Dir: rs.Spec.Git.Dir, + }, + // LastUpdate ignored + Commit: commit, + Errors: nil, + ErrorSummary: nil, + }, + Rendering: v1beta1.RenderingStatus{ + Git: &v1beta1.GitStatus{ + Repo: rs.Spec.Repo, + Revision: gitRevisionOrDefault(*rs.Spec.Git), + Branch: rs.Spec.Git.Branch, + Dir: rs.Spec.Git.Dir, + }, + // LastUpdate ignored + Message: renderingMessage, // RenderingSucceeded/RenderingSkipped + Commit: commit, + Errors: nil, + ErrorSummary: nil, + }, + Sync: v1beta1.SyncStatus{ + Git: &v1beta1.GitStatus{ + Repo: rs.Spec.Repo, + Revision: gitRevisionOrDefault(*rs.Spec.Git), + Branch: rs.Spec.Git.Branch, + Dir: rs.Spec.Git.Dir, + }, + // LastUpdate ignored + Commit: commit, + Errors: nil, + ErrorSummary: nil, + }, + } + assertEqual(nt, asserter, expectedRootSyncStatus, rs.Status.Status, + "RootSync .status") + + // Validate Syncing condition fields + rsSyncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing) + expectedSyncingCondition := &v1beta1.RootSyncCondition{ + Type: v1beta1.RootSyncSyncing, + Status: metav1.ConditionFalse, + // LastUpdateTime ignored + // LastTransitionTime ignored + Reason: "Sync", + Message: "Sync Completed", + Commit: commit, + // Errors unused by the Syncing condition (always nil) + ErrorSourceRefs: nil, + ErrorSummary: nil, + } + assertEqual(nt, asserter, expectedSyncingCondition, rsSyncingCondition, + "RootSync .status.conditions[.status=%q]", v1beta1.RootSyncSyncing) + + if nt.T.Failed() { + nt.T.FailNow() + } +} + +func validateRootSyncRenderingErrors(nt *nomostest.NT, rs *v1beta1.RootSync, commit string, errCodes ...string) { + nt.T.Helper() + + if len(errCodes) == 0 { + nt.T.Fatal("Invalid test: expected specific errors to validate, but none were specified") + } + + // Use a custom asserter so we can ignore hard-to-test fields. + // Testing whole structs makes debugging easier by printing the full + // expected and actual values, but it will also print any ignored fields. + asserter := testutil.NewAsserter( + cmpopts.IgnoreFields(v1beta1.RenderingStatus{}, "LastUpdate"), + cmpopts.IgnoreFields(v1beta1.RootSyncCondition{}, "LastUpdateTime", "LastTransitionTime", "Message"), + cmpopts.IgnoreFields(v1beta1.ConfigSyncError{}, "ErrorMessage", "Resources"), + // Ignore the current Syncing condition status. Retry will flip it back to True. + cmpopts.IgnoreFields(v1beta1.RootSyncCondition{}, "Status"), + ) + + // Build untruncated ErrorSummary & fake Errors list from error codes + var errorList []v1beta1.ConfigSyncError + var errorSummary *v1beta1.ErrorSummary + var errorSources []v1beta1.ErrorSource + + errorSources = append(errorSources, v1beta1.RenderingError) + errorSummary = &v1beta1.ErrorSummary{ + TotalCount: len(errCodes), + Truncated: false, + ErrorCountAfterTruncation: len(errCodes), + } + for _, errCode := range errCodes { + errorList = append(errorList, v1beta1.ConfigSyncError{ + Code: errCode, + // ErrorMessage ignored + // Resources ignored + }) + } + + // Validate .status.rendering fields. + expectedRenderingStatus := v1beta1.RenderingStatus{ + Git: &v1beta1.GitStatus{ + Repo: rs.Spec.Repo, + Revision: gitRevisionOrDefault(*rs.Spec.Git), + Branch: rs.Spec.Git.Branch, + Dir: rs.Spec.Git.Dir, + }, + // LastUpdate ignored + Message: parse.RenderingFailed, + Commit: commit, + Errors: errorList, + ErrorSummary: errorSummary, + } + assertEqual(nt, asserter, expectedRenderingStatus, rs.Status.Rendering, + "RootSync .status.rendering") + + // Validate Syncing condition fields + rsSyncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing) + expectedSyncingCondition := &v1beta1.RootSyncCondition{ + Type: v1beta1.RootSyncSyncing, + // Status ignored + // LastUpdateTime ignored + // LastTransitionTime ignored + Reason: "Rendering", + Message: "Rendering failed", + Commit: commit, + // Errors unused by the Syncing condition (always nil) + ErrorSourceRefs: errorSources, + ErrorSummary: errorSummary, + } + assertEqual(nt, asserter, expectedSyncingCondition, rsSyncingCondition, + "RootSync .status.conditions[.status=%q]", v1beta1.RootSyncSyncing) + + if nt.T.Failed() { + nt.T.FailNow() + } +} + +func validateRootSyncSourceErrors(nt *nomostest.NT, rs *v1beta1.RootSync, commit string, errCodes ...string) { + nt.T.Helper() + + if len(errCodes) == 0 { + nt.T.Fatal("Invalid test: expected specific errors to validate, but none were specified") + } + + // Use a custom asserter so we can ignore hard-to-test fields. + // Testing whole structs makes debugging easier by printing the full + // expected and actual values, but it will also print any ignored fields. + asserter := testutil.NewAsserter( + cmpopts.IgnoreFields(v1beta1.SourceStatus{}, "LastUpdate"), + cmpopts.IgnoreFields(v1beta1.RootSyncCondition{}, "LastUpdateTime", "LastTransitionTime", "Message"), + cmpopts.IgnoreFields(v1beta1.ConfigSyncError{}, "ErrorMessage", "Resources"), + // Ignore the current Syncing condition status. Retry will flip it back to True. + cmpopts.IgnoreFields(v1beta1.RootSyncCondition{}, "Status"), + ) + + // Build untruncated ErrorSummary & fake Errors list from error codes + var errorList []v1beta1.ConfigSyncError + var errorSummary *v1beta1.ErrorSummary + var errorSources []v1beta1.ErrorSource + + errorSources = append(errorSources, v1beta1.SourceError) + errorSummary = &v1beta1.ErrorSummary{ + TotalCount: len(errCodes), + Truncated: false, + ErrorCountAfterTruncation: len(errCodes), + } + for _, errCode := range errCodes { + errorList = append(errorList, v1beta1.ConfigSyncError{ + Code: errCode, + // ErrorMessage ignored + // Resources ignored + }) + } + + // Validate .status.source fields. + expectedRenderingStatus := v1beta1.SourceStatus{ + Git: &v1beta1.GitStatus{ + Repo: rs.Spec.Repo, + Revision: gitRevisionOrDefault(*rs.Spec.Git), + Branch: rs.Spec.Git.Branch, + Dir: rs.Spec.Git.Dir, + }, + // LastUpdate ignored + Commit: commit, + Errors: errorList, + ErrorSummary: errorSummary, + } + assertEqual(nt, asserter, expectedRenderingStatus, rs.Status.Source, + "RootSync .status.source") + + // Validate Syncing condition fields + rsSyncingCondition := rootsync.GetCondition(rs.Status.Conditions, v1beta1.RootSyncSyncing) + expectedSyncingCondition := &v1beta1.RootSyncCondition{ + Type: v1beta1.RootSyncSyncing, + // Status ignored + // LastUpdateTime ignored + // LastTransitionTime ignored + Reason: "Source", + Message: "Source", + Commit: commit, + // Errors unused by the Syncing condition (always nil) + ErrorSourceRefs: errorSources, + ErrorSummary: errorSummary, + } + assertEqual(nt, asserter, expectedSyncingCondition, rsSyncingCondition, + "RootSync .status.conditions[.status=%q]", v1beta1.RootSyncSyncing) + + if nt.T.Failed() { + nt.T.FailNow() + } +} + +// assertEqual simulates testutil.AssertEqual, but works with our fake Testing +// interface. +func assertEqual(nt *nomostest.NT, asserter *testutil.Asserter, expected, actual interface{}, msgAndArgs ...interface{}) { + nt.T.Helper() + matcher := asserter.EqualMatcher(expected) + match, err := matcher.Match(actual) + if err != nil { + nt.T.Fatalf("errored testing equality: %v", err) + return + } + if !match { + assert.Fail(nt.T, matcher.FailureMessage(actual), msgAndArgs...) + } } diff --git a/e2e/testcases/multi_sync_test.go b/e2e/testcases/multi_sync_test.go index c3f8f66578..0572dbc455 100644 --- a/e2e/testcases/multi_sync_test.go +++ b/e2e/testcases/multi_sync_test.go @@ -399,10 +399,21 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) { nt.RootRepos[rootSync2].CommitAndPush("add conflicting pod owner role") nt.T.Logf("The admission webhook should deny the update request in Root %s", rootSync2) - nt.WaitForRootSyncSyncError(rootSync2, applier.ApplierErrorCode, "denied the request", false) - nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName) - // Ignore the Syncing condition because the sync status is updated by another reconciler, which doesn't update the SyncingCondition. - nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository", true) + nt.WaitForRootSyncSyncError(rootSync2, applier.ApplierErrorCode, "denied the request") + + // The first RootSync to apply the object will apply its manager annotation, + // which causes the webhook to reject attempts to update the object by other + // RootSyncs, the manager annotation won't change, so the first RootSync + // won't get any management conflict errors. + // The first RootSync would only get an error if the webhook was disabled. + + // TODO: Should adoption fights happen with the webhook enabled? + // The remediator doesn't return a KptManagementConflictError, so the + // following error validation will never succeed. + + // nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName) + // nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository") + nt.T.Logf("The Role resource version should not be changed") if err := nt.Validate("pods", testNs, &rbacv1.Role{}, nomostest.ResourceVersionEquals(nt, roleResourceVersion)); err != nil { @@ -411,6 +422,7 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) { nt.T.Logf("Stop the admission webhook, the remediator should report the conflicts") nomostest.StopWebhook(nt) + nt.T.Logf("The Role resource version should be changed because two reconcilers are fighting with each other") if _, err := nomostest.Retry(90*time.Second, func() error { return nt.Validate("pods", testNs, &rbacv1.Role{}, @@ -418,10 +430,10 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) { }); err != nil { nt.T.Fatal(err) } + nt.T.Logf("Both of the two RootSyncs still report problems because the remediators detect the conflicts") - // Ignore the Syncing condition because the sync status might be updated by another reconciler, which doesn't update the SyncingCondition. - nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository", true) - nt.WaitForRootSyncSyncError(rootSync2, status.ManagementConflictErrorCode, "declared in another repository", true) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository") + nt.WaitForRootSyncSyncError(rootSync2, status.ManagementConflictErrorCode, "declared in another repository") nt.T.Logf("Remove the declaration from one Root repo %s", configsync.RootSyncName) nt.RootRepos[configsync.RootSyncName].Remove(podRoleFilePath) diff --git a/e2e/testcases/sync_ordering_test.go b/e2e/testcases/sync_ordering_test.go index df17ec5f37..4d6016406b 100644 --- a/e2e/testcases/sync_ordering_test.go +++ b/e2e/testcases/sync_ordering_test.go @@ -156,7 +156,7 @@ func TestMultiDependencies(t *testing.T) { nt.RootRepos[configsync.RootSyncName].Add("acme/cm0.yaml", fake.ConfigMapObject(core.Name(cm0Name), core.Namespace(namespaceName), core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm2"))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Create a cyclic dependency between cm0, cm1, and cm2") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "cyclic dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "cyclic dependency") nt.T.Log("Verify that cm0 does not have the dependsOn annotation") if err := nt.Validate(cm0Name, namespaceName, &corev1.ConfigMap{}, nomostest.MissingAnnotation(dependson.Annotation)); err != nil { @@ -331,7 +331,7 @@ func TestExternalDependencyError(t *testing.T) { nt.WaitForRepoSyncs() nt.RootRepos[configsync.RootSyncName].Remove("acme/cm0.yaml") nt.RootRepos[configsync.RootSyncName].CommitAndPush("Removing cm0 from the git repo") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency") // TestCase: cm1 depends on cm0; both are managed by ConfigSync. // Both exist in the repo and in the cluster. @@ -345,7 +345,7 @@ func TestExternalDependencyError(t *testing.T) { nt.RootRepos[configsync.RootSyncName].Add("acme/cm0.yaml", fake.ConfigMapObject(core.Name(cm0Name), core.Namespace(namespaceName), core.Annotation(metadata.ResourceManagementKey, metadata.ResourceManagementDisabled))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Disabling management for cm0 in the git repo") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency") // TestCase: cm1 depends on cm0; cm0 is disabled. // Neither exists in the cluster. @@ -364,7 +364,7 @@ func TestExternalDependencyError(t *testing.T) { nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName), core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm0"))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1 and cm0: cm1 depends on cm0, cm0 is disabled") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency") // TestCase: cm1 depends on object that is not in the repo or cluster. // Expected an ExternalDependencyError @@ -376,7 +376,7 @@ func TestExternalDependencyError(t *testing.T) { nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName), core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm-not-exist"))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1: cm1 depends on a resource that doesn't exist in either the repo or in cluster") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency") // TestCase: cm1 depends on an object that is not in the repo, but in the cluster // Expected an ExternalDependencyError @@ -391,7 +391,7 @@ func TestExternalDependencyError(t *testing.T) { nt.RootRepos[configsync.RootSyncName].Add("acme/cm1.yaml", fake.ConfigMapObject(core.Name(cm1Name), core.Namespace(namespaceName), core.Annotation(dependson.Annotation, "/namespaces/bookstore/ConfigMap/cm4"))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Adding cm1: cm1 depends on a resource that only exists in the cluster") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "external dependency") nt.T.Log("Cleaning up") nt.RootRepos[configsync.RootSyncName].Remove("acme/cm1.yaml") nt.RootRepos[configsync.RootSyncName].CommitAndPush("remove cm1") @@ -492,7 +492,7 @@ func TestDependencyWithReconciliation(t *testing.T) { core.Annotation(dependson.Annotation, "/namespaces/bookstore/Pod/pod3"))) nt.RootRepos[configsync.RootSyncName].CommitAndPush("Add pod3 and pod4 (pod4 depends on pod3 and pod3 won't be reconciled)") nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, - "skipped apply of Pod, bookstore/pod4: dependency apply reconcile timeout: bookstore_pod3__Pod", false) + "skipped apply of Pod, bookstore/pod4: dependency apply reconcile timeout: bookstore_pod3__Pod") _, err = nomostest.Retry(20, func() error { pod3 := &corev1.Pod{} @@ -546,7 +546,7 @@ func TestDependencyWithReconciliation(t *testing.T) { nt.T.Logf("Remove pod5 from the repo") nt.RootRepos[configsync.RootSyncName].Remove("acme/pod5.yaml") nt.RootRepos[configsync.RootSyncName].CommitAndPush("Remove pod5") - nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency", false) + nt.WaitForRootSyncSyncError(configsync.RootSyncName, applier.ApplierErrorCode, "dependency") _, err = nomostest.Retry(20, func() error { pod5 := &corev1.Pod{} diff --git a/pkg/api/configsync/register.go b/pkg/api/configsync/register.go index 872e0317fa..dde0738b6d 100644 --- a/pkg/api/configsync/register.go +++ b/pkg/api/configsync/register.go @@ -46,13 +46,29 @@ const ( // DefaultPeriodSecs is the default value in seconds between consecutive syncs. DefaultPeriodSecs = 15 - // DefaultReconcilerPollingPeriod defines how often the reconciler should poll - // the filesystem for updates to the source or rendered configs. - DefaultReconcilerPollingPeriod = 5 * time.Second + // DefaultReconcilerPollingDelay is the time delay between polling the + // filesystem for config updates to sync. + DefaultReconcilerPollingDelay = 5 * time.Second - // DefaultHydrationPollingPeriod defines how often the hydration controller - // should poll the filesystem for rendering the DRY configs. - DefaultHydrationPollingPeriod = 5 * time.Second + // DefaultHydrationPollingDelay is the time delay between polling the + // filesystem for config updates to render. + DefaultHydrationPollingDelay = 5 * time.Second + + // DefaultHydrationRetryDelay is the time delay between attempts to + // re-render config after an error. + // TODO: replace with retry-backoff strategy + DefaultHydrationRetryDelay = 30 * time.Minute + + // DefaultReconcilerRetryDelay is the time delay between parser runs when + // the last run errored or remediator watches need updating or there are + // management conflicts. + // TODO: replace with retry-backoff strategy + DefaultReconcilerRetryDelay = time.Second + + // DefaultSyncStatusUpdateDelay is the time delay between status updates + // when the parser is not running. These updates report new management + // conflict errors from the remediator, if there are any. + DefaultSyncStatusUpdateDelay = 5 * time.Second // DefaultReconcileTimeout defines the timeout of kpt applier reconcile/prune task DefaultReconcileTimeout = 5 * time.Minute diff --git a/pkg/applier/kpt_applier.go b/pkg/applier/kpt_applier.go index 51e4f93517..c1ba4fb627 100644 --- a/pkg/applier/kpt_applier.go +++ b/pkg/applier/kpt_applier.go @@ -74,30 +74,37 @@ type Applier struct { client client.Client // configFlags for creating clients configFlags *genericclioptions.ConfigFlags - // errs tracks all the errors the applier encounters. - // This field is cleared at the start of the `Applier.Apply` method - errs status.MultiError - // syncing indicates whether the applier is syncing. - syncing bool // syncKind is the Kind of the RSync object: RootSync or RepoSync syncKind string // syncName is the name of RSync object syncName string // syncNamespace is the namespace of RSync object syncNamespace string - // mux is an Applier-level mutext to prevent concurrent Apply() and Refresh() - mux sync.Mutex // statusMode controls if the applier injects the acutation status into the // ResourceGroup object statusMode string // reconcileTimeout controls the reconcile and prune timeout reconcileTimeout time.Duration + + // syncingLock protects reading `syncing` while changing its value. + syncingLock sync.RWMutex + // syncing indicates whether the applier or destroyer is running. + syncing bool + + // errsLock protects reading `errs` while changing its value. + errsLock sync.RWMutex + // errs tracks all the errors the applier encounters. + // This field is cleared at the start of an Apply + errs status.MultiError + + // syncLock prevents multiple Apply methods from running concurrently. + syncLock sync.Mutex } -// Interface is a fake-able subset of the interface Applier implements. +// KptApplier is a fake-able subset of the interface Applier implements. // // Placed here to make discovering the production implementation (above) easier. -type Interface interface { +type KptApplier interface { // Apply updates the resource API server with the latest parsed git resource. // This is called when a new change in the git resource is detected. It also // returns a map of the GVKs which were successfully applied by the Applier. @@ -108,7 +115,7 @@ type Interface interface { Syncing() bool } -var _ Interface = &Applier{} +var _ KptApplier = &Applier{} // NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from // the API server. @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err return SkipErrorForResource(err, id, actuation.ActuationStrategyApply) } +// processPruneEvent handles PruneEvents from the Applier func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error { id := idFrom(e.Identifier) klog.V(4).Infof("prune %v for object: %v", e.Status, id) @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client) } } -// sync triggers a kpt live apply library call to apply a set of resources. -func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { +// applyInner triggers a kpt live apply library call to apply a set of resources. +func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} { cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode) if err != nil { - return nil, Error(err) + a.addError(err) + return nil } a.checkInventoryObjectSize(ctx, cs.client) @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // through annotation. enabledObjs, disabledObjs := partitionObjs(objs) if len(disabledObjs) > 0 { - klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs)) + klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs)) disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs) if err != nil { - a.errs = status.Append(a.errs, err) - return nil, a.errs + a.addError(err) + return nil } s.DisableObjs = &stats.DisabledObjStats{ Total: uint64(len(disabledObjs)), Succeeded: disabledCount, } } - klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs)) - resources, toUnsErrs := toUnstructured(enabledObjs) - if toUnsErrs != nil { - return nil, toUnsErrs + klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs)) + resources, err := toUnstructured(enabledObjs) + if err != nil { + a.addError(err) + return nil } unknownTypeResources := make(map[core.ID]struct{}) @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // This allows for picking up CRD changes. mapper, err := a.configFlags.ToRESTMapper() if err != nil { - return nil, status.Append(nil, err) + a.addError(err) + return nil } meta.MaybeResetRESTMapper(mapper) @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr switch e.Type { case event.InitType: for _, ag := range e.InitEvent.ActionGroups { - klog.Info("InitEvent", ag) + klog.Info("Applier InitEvent", ag) } case event.ActionGroupType: klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.V(4).Info(e.ErrorEvent) if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) + a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) } else { - a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err)) + a.addError(e.ErrorEvent.Err) } s.ErrorTypeEvents++ case event.WaitType: @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // a reconciled object may become pending before a wait task times out. // Record the objs that have been reconciled. klog.V(4).Info(e.WaitEvent) - a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap) + if err != nil { + a.addError(err) + } case event.ApplyType: logEvent := event.ApplyEvent{ GroupName: e.ApplyEvent.GroupName, @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.ApplyEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)) + err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources) + if err != nil { + a.addError(err) + } case event.PruneType: logEvent := event.PruneEvent{ GroupName: e.PruneEvent.GroupName, @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.PruneEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)) + err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs) + if err != nil { + a.addError(err) + } default: - klog.V(4).Infof("skipped %v event", e.Type) + klog.V(4).Infof("Applier skipped %v event", e.Type) } } @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr } gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{} } - if a.errs == nil { - klog.V(4).Infof("all resources are up to date.") + if a.Errors() == nil { + klog.V(4).Infof("Applier completed without error") } if s.Empty() { - klog.V(4).Infof("The applier made no new progress") + klog.V(4).Infof("Applier made no new progress") } else { - klog.Infof("The applier made new progress: %s", s.String()) + klog.Infof("Applier made new progress: %s", s.String()) objStatusMap.Log(klog.V(0)) } - return gvks, a.errs + return gvks } -// Errors implements Interface. // Errors returns the errors encountered during apply. +// Errors implements Interface. func (a *Applier) Errors() status.MultiError { + a.errsLock.RLock() + defer a.errsLock.RUnlock() return a.errs } -// Syncing implements Interface. +func (a *Applier) addError(err error) { + // Ignore nil errors, to allow the caller to skip the nil check + if err == nil { + return + } + a.errsLock.Lock() + defer a.errsLock.Unlock() + + // Default to an applier.Error + _, isStatusErr := err.(status.Error) + _, isMultiErr := err.(status.MultiError) + if !isStatusErr && !isMultiErr { + err = Error(err) + } + a.errs = status.Append(a.errs, err) +} + +func (a *Applier) clearErrors() { + a.errsLock.Lock() + defer a.errsLock.Unlock() + a.errs = nil +} + // Syncing returns whether the applier is syncing. +// Syncing implements Interface. func (a *Applier) Syncing() bool { + a.syncingLock.RLock() + defer a.syncingLock.RUnlock() return a.syncing } +func (a *Applier) setSyncing(syncing bool) { + a.syncingLock.Lock() + defer a.syncingLock.Unlock() + a.syncing = syncing +} + +// Apply all managed resource objects and return any errors. // Apply implements Interface. func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { - // Clear the `errs` field at the start. - a.errs = nil - // Set the `syncing` field to `true` at the start. - a.syncing = true - - defer func() { - // Make sure to clear the `syncing` field before `Apply` returns. - a.syncing = false - }() - - a.mux.Lock() - defer a.mux.Unlock() - - // Pull the actual resources from the API server to compare against the - // declared resources. Note that we do not immediately return on error here - // because the Applier needs to try to do as much work as it can on each - // cycle. We collect and return all errors at the end. Some of those errors - // are transient and resolve in future cycles based on partial work completed - // in a previous cycle (eg ignore an error about a CR so that we can apply the - // CRD, then a future cycle is able to apply the CR). - // TODO: Here and elsewhere, pass the MultiError as a parameter. - return a.sync(ctx, desiredResource) + a.syncLock.Lock() + defer a.syncLock.Unlock() + + a.setSyncing(true) + defer a.setSyncing(false) + + // Clear the errors at the start of a new apply. + // TODO: Keep track of old errors and new errors and merge them, until applyInner returns. + a.clearErrors() + + objectStatusMap := a.applyInner(ctx, desiredResource) + return objectStatusMap, a.Errors() } // newInventoryUnstructured creates an inventory object as an unstructured. diff --git a/pkg/applier/kpt_applier_test.go b/pkg/applier/kpt_applier_test.go index ab8cb0944d..1746f35411 100644 --- a/pkg/applier/kpt_applier_test.go +++ b/pkg/applier/kpt_applier_test.go @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct return events } -func TestSync(t *testing.T) { +func TestApply(t *testing.T) { deploymentObj := newDeploymentObj() deploymentID := object.UnstructuredToObjMetadata(deploymentObj) @@ -263,7 +263,7 @@ func TestSync(t *testing.T) { } else { applier.clientSetFunc = applierFunc var gvks map[schema.GroupVersionKind]struct{} - gvks, errs = applier.sync(context.Background(), objs) + gvks, errs = applier.Apply(context.Background(), objs) if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff) } diff --git a/pkg/hydrate/controller.go b/pkg/hydrate/controller.go index 8bf3aef6ea..7a1755bf80 100644 --- a/pkg/hydrate/controller.go +++ b/pkg/hydrate/controller.go @@ -59,10 +59,10 @@ type Hydrator struct { HydratedLink string // SyncDir is the relative path to the configs within the Git repository. SyncDir cmpath.Relative - // PollingFrequency is the period of time between checking the filesystem for rendering the DRY configs. - PollingFrequency time.Duration - // RehydrateFrequency is the period of time between rehydrating on errors. - RehydrateFrequency time.Duration + // PollingDelay is the time delay between checking the filesystem for rendering the DRY configs. + PollingDelay time.Duration + // RehydrateDelay is the time delay between rehydrating on errors. + RehydrateDelay time.Duration // ReconcilerName is the name of the reconciler. ReconcilerName string } @@ -72,9 +72,9 @@ func (h *Hydrator) Run(ctx context.Context) { // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution // takes longer than the tick duration. - runTimer := time.NewTimer(h.PollingFrequency) + runTimer := time.NewTimer(h.PollingDelay) defer runTimer.Stop() - rehydrateTimer := time.NewTimer(h.RehydrateFrequency) + rehydrateTimer := time.NewTimer(h.RehydrateDelay) defer rehydrateTimer.Stop() absSourceDir := h.SourceRoot.Join(cmpath.RelativeSlash(h.SourceLink)) for { @@ -88,7 +88,7 @@ func (h *Hydrator) Run(ctx context.Context) { } else { h.rehydrateOnError(commit, syncDir.OSPath()) } - rehydrateTimer.Reset(h.RehydrateFrequency) // Schedule rehydrate attempt + rehydrateTimer.Reset(h.RehydrateDelay) // Schedule rehydrate attempt case <-runTimer.C: commit, syncDir, err := SourceCommitAndDir(h.SourceType, absSourceDir, h.SyncDir, h.ReconcilerName) if err != nil { @@ -102,7 +102,7 @@ func (h *Hydrator) Run(ctx context.Context) { klog.Errorf("failed to complete the rendering execution for commit %q: %v", commit, err) } } - runTimer.Reset(h.PollingFrequency) // Schedule re-run attempt + runTimer.Reset(h.PollingDelay) // Schedule re-run attempt } } } diff --git a/pkg/metrics/tagkeys.go b/pkg/metrics/tagkeys.go index ed3e9f5e85..c87730c27a 100644 --- a/pkg/metrics/tagkeys.go +++ b/pkg/metrics/tagkeys.go @@ -107,7 +107,7 @@ func StatusTagKey(err error) string { // StatusTagValueFromSummary returns error if the summary indicates at least 1 // error, otherwise success. func StatusTagValueFromSummary(summary *v1beta1.ErrorSummary) string { - if summary.TotalCount == 0 { + if summary == nil || summary.TotalCount == 0 { return StatusSuccess } return StatusError diff --git a/pkg/parse/namespace.go b/pkg/parse/namespace.go index 10be9df7a3..6ae667968d 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/namespace.go @@ -41,7 +41,8 @@ import ( ) // NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +// TODO: replace with builder pattern to avoid too many arguments. +func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingDelay, resyncDelay, retryDelay, statusUpdateDelay time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -49,14 +50,16 @@ func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope decl return &namespace{ opts: opts{ - clusterName: clusterName, - client: c, - syncName: syncName, - reconcilerName: reconcilerName, - pollingFrequency: pollingFrequency, - resyncPeriod: resyncPeriod, - files: files{FileSource: fs}, - parser: filesystem.NewParser(fileReader), + clusterName: clusterName, + client: c, + syncName: syncName, + reconcilerName: reconcilerName, + pollingDelay: pollingDelay, + resyncDelay: resyncDelay, + retryDelay: retryDelay, + statusUpdateDelay: statusUpdateDelay, + files: files{FileSource: fs}, + parser: filesystem.NewParser(fileReader), updater: updater{ scope: scope, resources: resources, @@ -160,14 +163,16 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so currentRS := rs.DeepCopy() - setSourceStatus(&rs.Status.Source, p, newStatus, denominator) + setSourceStatusFields(&rs.Status.Source, p, newStatus, denominator) - continueSyncing := (rs.Status.Source.ErrorSummary.TotalCount == 0) - var errorSource []v1beta1.ErrorSource - if len(rs.Status.Source.Errors) > 0 { - errorSource = []v1beta1.ErrorSource{v1beta1.SourceError} + sourceErrorCount := 0 + if rs.Status.Source.ErrorSummary != nil { + sourceErrorCount = rs.Status.Source.ErrorSummary.TotalCount } - reposync.SetSyncing(&rs, continueSyncing, "Source", "Source", newStatus.commit, errorSource, rs.Status.Source.ErrorSummary, newStatus.lastUpdate) + syncing := (sourceErrorCount == 0) // TODO: Syncing=True if errors are non-blocking + + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + reposync.SetSyncing(&rs, syncing, "Source", "Source", newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Source.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -191,7 +196,8 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so if err := p.client.Status().Update(ctx, &rs); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) + klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", + sourceErrorCount, denominator, err) return p.setSourceStatusWithRetries(ctx, newStatus, denominator*2) } return status.APIServerError(err, "failed to update RepoSync source status from parser") @@ -222,14 +228,16 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus currentRS := rs.DeepCopy() - setRenderingStatus(&rs.Status.Rendering, p, newStatus, denominator) + setRenderingStatusFields(&rs.Status.Rendering, p, newStatus, denominator) - continueSyncing := (rs.Status.Rendering.ErrorSummary.TotalCount == 0) - var errorSource []v1beta1.ErrorSource - if len(rs.Status.Rendering.Errors) > 0 { - errorSource = []v1beta1.ErrorSource{v1beta1.RenderingError} + renderingErrorCount := 0 + if rs.Status.Rendering.ErrorSummary != nil { + renderingErrorCount = rs.Status.Rendering.ErrorSummary.TotalCount } - reposync.SetSyncing(&rs, continueSyncing, "Rendering", newStatus.message, newStatus.commit, errorSource, rs.Status.Rendering.ErrorSummary, newStatus.lastUpdate) + syncing := (renderingErrorCount == 0) + + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + reposync.SetSyncing(&rs, syncing, "Rendering", newStatus.message, newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Rendering.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -260,7 +268,8 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus if err := p.client.Status().Update(ctx, &rs); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) + klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", + renderingErrorCount, denominator, err) return p.setRenderingStatusWithRetires(ctx, newStatus, denominator*2) } return status.APIServerError(err, "failed to update RepoSync rendering status from parser") @@ -268,16 +277,15 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus return nil } -// SetSyncStatus implements the Parser interface -// SetSyncStatus sets the RepoSync sync status. -// `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +// setSyncStatus updates RepoSync `.status.sync` and Syncing condition. +// setSyncStatus implements the Parser interface. +func (p *namespace) setSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() - return p.setSyncStatusWithRetries(ctx, errs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, errs, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *namespace) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -289,20 +297,24 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() + setSyncStatusFields(&rs.Status.Status, status.ToCSE(errs), denominator) - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) + syncErrorCount := 0 + if rs.Status.Sync.ErrorSummary != nil { + syncErrorCount = rs.Status.Sync.ErrorSummary.TotalCount + } - errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + var message string if syncing { - reposync.SetSyncing(rs, true, "Sync", "Syncing", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) + message = "Syncing" } else { - if errorSummary.TotalCount == 0 { + message = "Sync Completed" + if errorSummary == nil || errorSummary.TotalCount == 0 { rs.Status.LastSyncedCommit = rs.Status.Sync.Commit } - reposync.SetSyncing(rs, false, "Sync", "Sync Completed", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) } + reposync.SetSyncing(rs, syncing, "Sync", message, rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Sync.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -322,32 +334,18 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu } if klog.V(5).Enabled() { - klog.Infof("Updating status for RepoSync %s/%s:\nDiff (- Expected, + Actual):\n%s", + klog.Infof("Updating sync status for RepoSync %s/%s:\nDiff (- Old, + New):\n%s", rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status)) } if err := p.client.Status().Update(ctx, rs); err != nil { // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", + syncErrorCount, denominator, err) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope)) } return nil } - -// ApplierErrors implements the Parser interface -func (p *namespace) ApplierErrors() status.MultiError { - return p.applier.Errors() -} - -// RemediatorConflictErrors implements the Parser interface -func (p *namespace) RemediatorConflictErrors() []status.ManagementConflictError { - return p.remediator.ConflictErrors() -} - -// K8sClient implements the Parser interface -func (p *namespace) K8sClient() client.Client { - return p.client -} diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 134a9f24a0..767e37df5d 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -43,14 +43,20 @@ type opts struct { // syncName is the name of the RootSync or RepoSync object. syncName string - // pollingFrequency is how often to re-import configuration from the filesystem. - // - // For tests, use zero as it will poll continuously. - pollingFrequency time.Duration + // pollingDelay is how long the parser waits between run attempts. + // Each attempt re-reads the latest configuration from the filesystem. + pollingDelay time.Duration - // ResyncPeriod is the period of time between forced re-sync from source (even - // without a new commit). - resyncPeriod time.Duration + // resyncDelay is how long the parser waits between full re-sync from source + // (even without a new commit). + resyncDelay time.Duration + + // retryDelay is how long the parser waits between retries, after an error. + retryDelay time.Duration + + // statusUpdateDelay is how long the parser waits between updates of the + // sync status, to account for management conflict errors from the remediator. + statusUpdateDelay time.Duration // discoveryInterface is how the parser learns what types are currently // available on the cluster. @@ -60,9 +66,6 @@ type opts struct { // objects in Git. converter *declared.ValueConverter - // reconciling indicates whether the reconciler is reconciling a change. - reconciling bool - // mux prevents status update conflicts. mux *sync.Mutex @@ -75,18 +78,8 @@ type Parser interface { parseSource(ctx context.Context, state sourceState) ([]ast.FileObject, status.MultiError) setSourceStatus(ctx context.Context, newStatus sourceStatus) error setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error - SetSyncStatus(ctx context.Context, errs status.MultiError) error + setSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error options() *opts - // SetReconciling sets the field indicating whether the reconciler is reconciling a change. - SetReconciling(value bool) - // Reconciling returns whether the reconciler is reconciling a change. - Reconciling() bool - // ApplierErrors returns the errors surfaced by the applier. - ApplierErrors() status.MultiError - // RemediatorConflictErrors returns the conflict errors detected by the remediator. - RemediatorConflictErrors() []status.ManagementConflictError - // K8sClient returns the Kubernetes client that talks to the API server. - K8sClient() client.Client } func (o *opts) k8sClient() client.Client { @@ -96,11 +89,3 @@ func (o *opts) k8sClient() client.Client { func (o *opts) discoveryClient() discovery.ServerResourcer { return o.discoveryInterface } - -func (o *opts) SetReconciling(value bool) { - o.reconciling = value -} - -func (o *opts) Reconciling() bool { - return o.reconciling -} diff --git a/pkg/parse/root.go b/pkg/parse/root.go index 7218489d51..d4b954055c 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root.go @@ -51,32 +51,37 @@ import ( ) // NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +// TODO: replace with builder pattern to avoid too many arguments. +func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingDelay, resyncDelay, retryDelay, statusUpdateDelay time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err } - opts := opts{ - clusterName: clusterName, - syncName: syncName, - reconcilerName: reconcilerName, - client: c, - pollingFrequency: pollingFrequency, - resyncPeriod: resyncPeriod, - files: files{FileSource: fs}, - parser: filesystem.NewParser(fileReader), - updater: updater{ - scope: declared.RootReconciler, - resources: resources, - applier: app, - remediator: rem, + return &root{ + opts: opts{ + clusterName: clusterName, + syncName: syncName, + reconcilerName: reconcilerName, + client: c, + pollingDelay: pollingDelay, + resyncDelay: resyncDelay, + retryDelay: retryDelay, + statusUpdateDelay: statusUpdateDelay, + files: files{FileSource: fs}, + parser: filesystem.NewParser(fileReader), + updater: updater{ + scope: declared.RootReconciler, + resources: resources, + applier: app, + remediator: rem, + }, + discoveryInterface: dc, + converter: converter, + mux: &sync.Mutex{}, }, - discoveryInterface: dc, - converter: converter, - mux: &sync.Mutex{}, - } - return &root{opts: opts, sourceFormat: format}, nil + sourceFormat: format, + }, nil } type root struct { @@ -171,14 +176,16 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus sourceS currentRS := rs.DeepCopy() - setSourceStatus(&rs.Status.Source, p, newStatus, denominator) + setSourceStatusFields(&rs.Status.Source, p, newStatus, denominator) - continueSyncing := (rs.Status.Source.ErrorSummary.TotalCount == 0) - var errorSource []v1beta1.ErrorSource - if len(rs.Status.Source.Errors) > 0 { - errorSource = []v1beta1.ErrorSource{v1beta1.SourceError} + sourceErrorCount := 0 + if rs.Status.Source.ErrorSummary != nil { + sourceErrorCount = rs.Status.Source.ErrorSummary.TotalCount } - rootsync.SetSyncing(&rs, continueSyncing, "Source", "Source", newStatus.commit, errorSource, rs.Status.Source.ErrorSummary, newStatus.lastUpdate) + syncing := (sourceErrorCount == 0) // TODO: Syncing=True if errors are non-blocking + + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + rootsync.SetSyncing(&rs, syncing, "Source", "Source", newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Source.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -195,14 +202,15 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus sourceS } if klog.V(5).Enabled() { - klog.Infof("Updating source status for RootSync %s/%s:\nDiff (- Expected, + Actual):\n%s", + klog.Infof("Updating source status for RootSync %s/%s:\nDiff (- Old, + New):\n%s", rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status)) } if err := p.client.Status().Update(ctx, &rs); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RootSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err) + klog.Infof("Failed to update RootSync source status (total error count: %d, denominator: %d): %s.", + sourceErrorCount, denominator, err) return p.setSourceStatusWithRetries(ctx, newStatus, denominator*2) } return status.APIServerError(err, "failed to update RootSync source status from parser") @@ -210,7 +218,7 @@ func (p *root) setSourceStatusWithRetries(ctx context.Context, newStatus sourceS return nil } -func setSourceStatus(source *v1beta1.SourceStatus, p Parser, newStatus sourceStatus, denominator int) { +func setSourceStatusFields(source *v1beta1.SourceStatus, p Parser, newStatus sourceStatus, denominator int) { cse := status.ToCSE(newStatus.errs) source.Commit = newStatus.commit switch p.options().SourceType { @@ -239,13 +247,17 @@ func setSourceStatus(source *v1beta1.SourceStatus, p Parser, newStatus sourceSta source.Git = nil source.Oci = nil } - errorSummary := &v1beta1.ErrorSummary{ - TotalCount: len(cse), - Truncated: denominator != 1, - ErrorCountAfterTruncation: len(cse) / denominator, + if len(cse) > 0 { + source.Errors = cse[0 : len(cse)/denominator] + source.ErrorSummary = &v1beta1.ErrorSummary{ + TotalCount: len(cse), + Truncated: denominator != 1, + ErrorCountAfterTruncation: len(cse) / denominator, + } + } else { + source.Errors = nil + source.ErrorSummary = nil } - source.Errors = cse[0 : len(cse)/denominator] - source.ErrorSummary = errorSummary source.LastUpdate = newStatus.lastUpdate } @@ -272,14 +284,16 @@ func (p *root) setRenderingStatusWithRetires(ctx context.Context, newStatus rend currentRS := rs.DeepCopy() - setRenderingStatus(&rs.Status.Rendering, p, newStatus, denominator) + setRenderingStatusFields(&rs.Status.Rendering, p, newStatus, denominator) - continueSyncing := (rs.Status.Rendering.ErrorSummary.TotalCount == 0) - var errorSource []v1beta1.ErrorSource - if len(rs.Status.Rendering.Errors) > 0 { - errorSource = []v1beta1.ErrorSource{v1beta1.RenderingError} + renderingErrorCount := 0 + if rs.Status.Rendering.ErrorSummary != nil { + renderingErrorCount = rs.Status.Rendering.ErrorSummary.TotalCount } - rootsync.SetSyncing(&rs, continueSyncing, "Rendering", newStatus.message, newStatus.commit, errorSource, rs.Status.Rendering.ErrorSummary, newStatus.lastUpdate) + syncing := (renderingErrorCount == 0) + + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + rootsync.SetSyncing(&rs, syncing, "Rendering", newStatus.message, newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Rendering.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -303,14 +317,15 @@ func (p *root) setRenderingStatusWithRetires(ctx context.Context, newStatus rend } if klog.V(5).Enabled() { - klog.Infof("Updating rendering status for RootSync %s/%s:\nDiff (- Expected, + Actual):\n%s", + klog.Infof("Updating rendering status for RootSync %s/%s:\nDiff (- Old, + New):\n%s", rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status)) } if err := p.client.Status().Update(ctx, &rs); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RootSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err) + klog.Infof("Failed to update RootSync rendering status (total error count: %d, denominator: %d): %s.", + renderingErrorCount, denominator, err) return p.setRenderingStatusWithRetires(ctx, newStatus, denominator*2) } return status.APIServerError(err, "failed to update RootSync rendering status from parser") @@ -318,7 +333,7 @@ func (p *root) setRenderingStatusWithRetires(ctx context.Context, newStatus rend return nil } -func setRenderingStatus(rendering *v1beta1.RenderingStatus, p Parser, newStatus renderingStatus, denominator int) { +func setRenderingStatusFields(rendering *v1beta1.RenderingStatus, p Parser, newStatus renderingStatus, denominator int) { cse := status.ToCSE(newStatus.errs) rendering.Commit = newStatus.commit switch p.options().SourceType { @@ -348,32 +363,29 @@ func setRenderingStatus(rendering *v1beta1.RenderingStatus, p Parser, newStatus rendering.Oci = nil } rendering.Message = newStatus.message - errorSummary := &v1beta1.ErrorSummary{ - TotalCount: len(cse), - Truncated: denominator != 1, + if len(cse) > 0 { + rendering.Errors = cse[0 : len(cse)/denominator] + rendering.ErrorSummary = &v1beta1.ErrorSummary{ + TotalCount: len(cse), + Truncated: denominator != 1, + ErrorCountAfterTruncation: len(cse) / denominator, + } + } else { + rendering.Errors = nil + rendering.ErrorSummary = nil } - rendering.Errors = cse[0 : len(cse)/denominator] - rendering.ErrorSummary = errorSummary rendering.LastUpdate = newStatus.lastUpdate } -// SetSyncStatus implements the Parser interface -// SetSyncStatus sets the RootSync sync status. -// `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +// setSyncStatus updates RootSync `.status.sync` and Syncing condition. +// setSyncStatus implements the Parser interface. +func (p *root) setSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() - var allErrs status.MultiError - remediatorErrs := p.remediator.ConflictErrors() - for _, e := range remediatorErrs { - allErrs = status.Append(allErrs, e) - } - // Add conflicting errors before other apply errors. - allErrs = status.Append(allErrs, errs) - return p.setSyncStatusWithRetries(ctx, allErrs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, errs, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *root) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -385,20 +397,24 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() + setSyncStatusFields(&rs.Status.Status, status.ToCSE(errs), denominator) - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) + syncErrorCount := 0 + if rs.Status.Sync.ErrorSummary != nil { + syncErrorCount = rs.Status.Sync.ErrorSummary.TotalCount + } - errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) + errorSources, errorSummary := summarizeErrors(rs.Status.Status) + var message string if syncing { - rootsync.SetSyncing(rs, true, "Sync", "Syncing", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) + message = "Syncing" } else { - if errorSummary.TotalCount == 0 { + message = "Sync Completed" + if errorSummary == nil || errorSummary.TotalCount == 0 { rs.Status.LastSyncedCommit = rs.Status.Sync.Commit } - rootsync.SetSyncing(rs, false, "Sync", "Sync Completed", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) } + rootsync.SetSyncing(rs, syncing, "Sync", message, rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate) // Avoid unnecessary status updates. if !currentRS.Status.Sync.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) { @@ -418,56 +434,77 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr } if klog.V(5).Enabled() { - klog.Infof("Updating sync status for RootSync %s/%s:\nDiff (- Expected, + Actual):\n%s", + klog.Infof("Updating sync status for RootSync %s/%s:\nDiff (- Old, + New):\n%s", rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status)) } if err := p.client.Status().Update(ctx, rs); err != nil { // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { - klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", + syncErrorCount, denominator, err) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, "failed to update RootSync sync status") } return nil } -func setSyncStatus(syncStatus *v1beta1.Status, syncErrs []v1beta1.ConfigSyncError, denominator int) { - syncStatus.Sync.Commit = syncStatus.Source.Commit - syncStatus.Sync.Git = syncStatus.Source.Git - syncStatus.Sync.Oci = syncStatus.Source.Oci - syncStatus.Sync.Helm = syncStatus.Source.Helm - syncStatus.Sync.ErrorSummary = &v1beta1.ErrorSummary{ - TotalCount: len(syncErrs), - Truncated: denominator != 1, - } - syncStatus.Sync.Errors = syncErrs[0 : len(syncErrs)/denominator] - syncStatus.Sync.LastUpdate = metav1.Now() +func setSyncStatusFields(rsStatus *v1beta1.Status, cse []v1beta1.ConfigSyncError, denominator int) { + rsStatus.Sync.Commit = rsStatus.Source.Commit + rsStatus.Sync.Git = rsStatus.Source.Git + rsStatus.Sync.Oci = rsStatus.Source.Oci + rsStatus.Sync.Helm = rsStatus.Source.Helm + if len(cse) > 0 { + rsStatus.Sync.Errors = cse[0 : len(cse)/denominator] + rsStatus.Sync.ErrorSummary = &v1beta1.ErrorSummary{ + TotalCount: len(cse), + Truncated: denominator != 1, + ErrorCountAfterTruncation: len(cse) / denominator, + } + } else { + rsStatus.Sync.Errors = nil + rsStatus.Sync.ErrorSummary = nil + } + rsStatus.Sync.LastUpdate = metav1.Now() } -// summarizeErrors summarizes the errors from `sourceStatus` and `syncStatus`, and returns an ErrorSource slice and an ErrorSummary. -func summarizeErrors(sourceStatus v1beta1.SourceStatus, syncStatus v1beta1.SyncStatus) ([]v1beta1.ErrorSource, *v1beta1.ErrorSummary) { +// summarizeErrors summarizes the rendering, source, and sync. +// Returns an ErrorSource slice and ErrorSummary. +func summarizeErrors(rsStatus v1beta1.Status) ([]v1beta1.ErrorSource, *v1beta1.ErrorSummary) { + var errorSummary v1beta1.ErrorSummary var errorSources []v1beta1.ErrorSource - if len(sourceStatus.Errors) > 0 { + if len(rsStatus.Rendering.Errors) > 0 { + errorSources = append(errorSources, v1beta1.RenderingError) + summary := rsStatus.Rendering.ErrorSummary + if summary != nil { + errorSummary.TotalCount += summary.TotalCount + errorSummary.ErrorCountAfterTruncation += summary.ErrorCountAfterTruncation + errorSummary.Truncated = errorSummary.Truncated || summary.Truncated + } + } + if len(rsStatus.Source.Errors) > 0 { errorSources = append(errorSources, v1beta1.SourceError) + summary := rsStatus.Source.ErrorSummary + if summary != nil { + errorSummary.TotalCount += summary.TotalCount + errorSummary.ErrorCountAfterTruncation += summary.ErrorCountAfterTruncation + errorSummary.Truncated = errorSummary.Truncated || summary.Truncated + } } - if len(syncStatus.Errors) > 0 { + if len(rsStatus.Sync.Errors) > 0 { errorSources = append(errorSources, v1beta1.SyncError) - } - - errorSummary := &v1beta1.ErrorSummary{} - for _, summary := range []*v1beta1.ErrorSummary{sourceStatus.ErrorSummary, syncStatus.ErrorSummary} { - if summary == nil { - continue - } - errorSummary.TotalCount += summary.TotalCount - errorSummary.ErrorCountAfterTruncation += summary.ErrorCountAfterTruncation - if summary.Truncated { - errorSummary.Truncated = true + summary := rsStatus.Sync.ErrorSummary + if summary != nil { + errorSummary.TotalCount += summary.TotalCount + errorSummary.ErrorCountAfterTruncation += summary.ErrorCountAfterTruncation + errorSummary.Truncated = errorSummary.Truncated || summary.Truncated } } - return errorSources, errorSummary + if errorSummary.TotalCount > 0 { + return errorSources, &errorSummary + } + return nil, nil } // addImplicitNamespaces hydrates the given FileObjects by injecting implicit @@ -532,66 +569,3 @@ func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, s return objs, errs } - -// ApplierErrors implements the Parser interface -func (p *root) ApplierErrors() status.MultiError { - return p.applier.Errors() -} - -// RemediatorConflictErrors implements the Parser interface -func (p *root) RemediatorConflictErrors() []status.ManagementConflictError { - return p.remediator.ConflictErrors() -} - -// K8sClient implements the Parser interface -func (p *root) K8sClient() client.Client { - return p.client -} - -// prependRootSyncRemediatorStatus adds the conflict error detected by the remediator to the front of the sync errors. -func prependRootSyncRemediatorStatus(ctx context.Context, client client.Client, syncName string, conflictErrs []status.ManagementConflictError, denominator int) error { - if denominator <= 0 { - return fmt.Errorf("The denominator must be a positive number") - } - - var rs v1beta1.RootSync - if err := client.Get(ctx, rootsync.ObjectKey(syncName), &rs); err != nil { - return status.APIServerError(err, "failed to get RootSync: "+syncName) - } - - var errs []v1beta1.ConfigSyncError - for _, conflictErr := range conflictErrs { - conflictCSEError := conflictErr.ToCSE() - conflictPairCSEError := conflictErr.CurrentManagerError().ToCSE() - errorFound := false - for _, e := range rs.Status.Sync.Errors { - // Dedup the same remediator conflict error. - if e.Code == status.ManagementConflictErrorCode && (e.ErrorMessage == conflictCSEError.ErrorMessage || e.ErrorMessage == conflictPairCSEError.ErrorMessage) { - errorFound = true - break - } - } - if !errorFound { - errs = append(errs, conflictCSEError) - } - } - - // No new errors, so no update - if len(errs) == 0 { - return nil - } - - // Add the remeditor conflict errors before other sync errors for more visibility. - errs = append(errs, rs.Status.Sync.Errors...) - setSyncStatus(&rs.Status.Status, errs, denominator) - - if err := client.Status().Update(ctx, &rs); err != nil { - // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. - if isRequestTooLargeError(err) { - klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return prependRootSyncRemediatorStatus(ctx, client, syncName, conflictErrs, denominator*2) - } - return status.APIServerError(err, "failed to update RootSync sync status") - } - return nil -} diff --git a/pkg/parse/root_test.go b/pkg/parse/root_test.go index 64bf5dec8d..d8161a317a 100644 --- a/pkg/parse/root_test.go +++ b/pkg/parse/root_test.go @@ -937,32 +937,85 @@ func (a *fakeApplier) Syncing() bool { func TestSummarizeErrors(t *testing.T) { testCases := []struct { name string - sourceStatus v1beta1.SourceStatus - syncStatus v1beta1.SyncStatus + rsStatus v1beta1.Status expectedErrorSources []v1beta1.ErrorSource expectedErrorSummary *v1beta1.ErrorSummary }{ { - name: "both sourceStatus and syncStatus are empty", - sourceStatus: v1beta1.SourceStatus{}, - syncStatus: v1beta1.SyncStatus{}, + name: "all status fields are empty", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{}, + Sync: v1beta1.SyncStatus{}, + }, expectedErrorSources: nil, - expectedErrorSummary: &v1beta1.ErrorSummary{}, + expectedErrorSummary: nil, }, { - name: "sourceStatus is not empty (no trucation), syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, + name: "rendering is not empty (no trucation), source and sync are empty", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1001", ErrorMessage: "1001-error-message"}, + {Code: "1002", ErrorMessage: "1002-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, + Source: v1beta1.SourceStatus{}, + Sync: v1beta1.SyncStatus{}, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "rendering is not empty and trucates errors, source and sync are empty", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1001", ErrorMessage: "1001-error-message"}, + {Code: "1002", ErrorMessage: "1002-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + Source: v1beta1.SourceStatus{}, + Sync: v1beta1.SyncStatus{}, + }, + expectedErrorSources: []v1beta1.ErrorSource{v1beta1.RenderingError}, + expectedErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, + }, + { + name: "source is not empty (no trucation), sync is empty", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, + Sync: v1beta1.SyncStatus{}, }, - syncStatus: v1beta1.SyncStatus{}, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, expectedErrorSummary: &v1beta1.ErrorSummary{ TotalCount: 2, @@ -971,19 +1024,22 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "sourceStatus is not empty and trucates errors, syncStatus is empty", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, + name: "source is not empty and trucates errors, sync is empty", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, + Sync: v1beta1.SyncStatus{}, }, - syncStatus: v1beta1.SyncStatus{}, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError}, expectedErrorSummary: &v1beta1.ErrorSummary{ TotalCount: 100, @@ -992,17 +1048,20 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "sourceStatus is empty, syncStatus is not empty (no trucation)", - sourceStatus: v1beta1.SourceStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, + name: "source is empty, sync is not empty (no trucation)", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{}, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, }, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, @@ -1013,17 +1072,20 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "sourceStatus is empty, syncStatus is not empty and trucates errors", - sourceStatus: v1beta1.SourceStatus{}, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, + name: "source is empty, sync is not empty and trucates errors", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{}, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, }, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SyncError}, @@ -1034,27 +1096,30 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "neither sourceStatus nor syncStatus is empty or trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, + name: "neither source nor sync is empty or trucates errors", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, }, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, @@ -1065,27 +1130,30 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "neither sourceStatus nor syncStatus is empty, sourceStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, + name: "neither source nor sync is empty, source trucates errors", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, }, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, @@ -1096,28 +1164,30 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "neither sourceStatus nor syncStatus is empty, syncStatus trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 2, - Truncated: false, - ErrorCountAfterTruncation: 2, - }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, + name: "neither source nor sync is empty, sync trucates errors", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 2, + Truncated: false, + ErrorCountAfterTruncation: 2, + }, }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, }, expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, @@ -1128,42 +1198,58 @@ func TestSummarizeErrors(t *testing.T) { }, }, { - name: "neither sourceStatus nor syncStatus is empty, both trucates errors", - sourceStatus: v1beta1.SourceStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "1021", ErrorMessage: "1021-error-message"}, - {Code: "1022", ErrorMessage: "1022-error-message"}, - }, - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, + name: "neither rendering nor source nor sync is empty, all trucate errors", + rsStatus: v1beta1.Status{ + Rendering: v1beta1.RenderingStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1001", ErrorMessage: "1001-error-message"}, + {Code: "1002", ErrorMessage: "1002-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, - }, - syncStatus: v1beta1.SyncStatus{ - Errors: []v1beta1.ConfigSyncError{ - {Code: "2009", ErrorMessage: "apiserver error"}, - {Code: "2009", ErrorMessage: "webhook error"}, + Source: v1beta1.SourceStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "1021", ErrorMessage: "1021-error-message"}, + {Code: "1022", ErrorMessage: "1022-error-message"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, - - ErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 100, - Truncated: true, - ErrorCountAfterTruncation: 2, + Sync: v1beta1.SyncStatus{ + Errors: []v1beta1.ConfigSyncError{ + {Code: "2009", ErrorMessage: "apiserver error"}, + {Code: "2009", ErrorMessage: "webhook error"}, + }, + ErrorSummary: &v1beta1.ErrorSummary{ + TotalCount: 100, + Truncated: true, + ErrorCountAfterTruncation: 2, + }, }, }, - expectedErrorSources: []v1beta1.ErrorSource{v1beta1.SourceError, v1beta1.SyncError}, + expectedErrorSources: []v1beta1.ErrorSource{ + v1beta1.RenderingError, + v1beta1.SourceError, + v1beta1.SyncError, + }, expectedErrorSummary: &v1beta1.ErrorSummary{ - TotalCount: 200, + TotalCount: 300, Truncated: true, - ErrorCountAfterTruncation: 4, + ErrorCountAfterTruncation: 6, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - gotErrorSources, gotErrorSummary := summarizeErrors(tc.sourceStatus, tc.syncStatus) + gotErrorSources, gotErrorSummary := summarizeErrors(tc.rsStatus) if diff := cmp.Diff(tc.expectedErrorSources, gotErrorSources); diff != "" { t.Errorf("summarizeErrors() got %v, expected %v", gotErrorSources, tc.expectedErrorSources) } diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 8b5e4eb9b4..6d484f4222 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -21,13 +21,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/status" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -58,36 +56,43 @@ func Run(ctx context.Context, p Parser) { // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution // takes longer than the tick duration. - runTimer := time.NewTimer(opts.pollingFrequency) - defer runTimer.Stop() - resyncTimer := time.NewTimer(opts.resyncPeriod) + // Timers are sorted in order of priority and expected period duration. + resyncTimer := time.NewTimer(opts.resyncDelay) defer resyncTimer.Stop() - retryTimer := time.NewTimer(time.Second) + + runTimer := time.NewTimer(opts.pollingDelay) + defer runTimer.Stop() + + retryTimer := time.NewTimer(opts.retryDelay) defer retryTimer.Stop() + + statusUpdateTimer := time.NewTimer(opts.statusUpdateDelay) + defer statusUpdateTimer.Stop() + state := &reconcilerState{} for { select { case <-ctx.Done(): return - // it is time to reapply the configuration even if no changes have been detected - // This case should be checked first since it resets the cache + // Re-apply even if no changes have been detected. + // This case should be checked first since it resets the cache. case <-resyncTimer.C: klog.Infof("It is time for a force-resync") // Reset the cache to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. state.resetAllButSourceState() run(ctx, p, triggerResync, state) - resyncTimer.Reset(opts.resyncPeriod) // Schedule resync attempt - retryTimer.Reset(time.Second) // Schedule retry attempt + resyncTimer.Reset(opts.resyncDelay) // Schedule resync attempt + retryTimer.Reset(time.Second) // Schedule retry attempt - // it is time to re-import the configuration from the filesystem + // Re-import declared resources from the filesystem (from git-sync). case <-runTimer.C: run(ctx, p, triggerReimport, state) - runTimer.Reset(opts.pollingFrequency) // Schedule re-run attempt - retryTimer.Reset(time.Second) // Schedule retry attempt + runTimer.Reset(opts.pollingDelay) // Schedule re-run attempt + retryTimer.Reset(time.Second) // Schedule retry attempt - // it is time to check whether the last parse-apply-watch loop failed or any watches need to be updated + // Retry if there was an error, conflict, or any watches need to be updated case <-retryTimer.C: var trigger string if opts.managementConflict() { @@ -96,7 +101,7 @@ func Run(ctx context.Context, p Parser) { state.resetAllButSourceState() trigger = triggerManagementConflict // When conflict is detected, wait longer (same as the polling frequency) for the next retry. - time.Sleep(opts.pollingFrequency) + time.Sleep(opts.pollingDelay) } else if state.cache.needToRetry && state.cache.readyToRetry() { klog.Infof("The last reconciliation failed") trigger = triggerRetry @@ -108,17 +113,32 @@ func Run(ctx context.Context, p Parser) { continue } run(ctx, p, trigger, state) - retryTimer.Reset(time.Second) + retryTimer.Reset(opts.retryDelay) + + // Update the sync status to report management conflicts (from the remediator). + // Performing the `updateSyncStatus` in this loop prevents it from + // running while the `run` function is executing. The `run` function + // handles calling `updateSyncStatus` while it's running. + case <-statusUpdateTimer.C: + // Skip sync status update if there are source or rendering errors. + // This may hide new remediator errors for the old commit, but the + // alternative would be to erase new rendering errors for the new commit. + // TODO: Disable the remediator while a new commit is being rendered/parsed/synced. + // TODO: skip if opts.managementConflict() != true? + if state.sourceStatus.commit == state.renderingStatus.commit && + state.sourceStatus.errs == nil && state.renderingStatus.errs == nil { + klog.V(3).Info("Updating sync status (periodic while not syncing)") + syncing, syncErrs := p.options().status() + if err := setSyncStatus(ctx, p, state, syncing, syncErrs); err != nil { + klog.Warningf("failed to update sync status: %v", err) + } + } + statusUpdateTimer.Reset(opts.statusUpdateDelay) } } } func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) { - p.SetReconciling(true) - defer func() { - p.SetReconciling(false) - }() - var syncDir cmpath.Absolute gs := sourceStatus{} gs.commit, syncDir, gs.errs = hydrate.SourceCommitAndDir(p.options().SourceType, p.options().SourceDir, p.options().SyncDir, p.options().reconcilerName) @@ -353,73 +373,61 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // Create a new context with its cancellation function. ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background()) - go updateSyncStatus(ctxForUpdateSyncStatus, p) + go updateSyncStatusPeriodically(ctxForUpdateSyncStatus, p, state) start := time.Now() syncErrs := p.options().update(ctx, &state.cache) metrics.RecordParserDuration(ctx, trigger, "update", metrics.StatusTagKey(syncErrs), start) - // This is to terminate `updateSyncStatus`. + // Terminate `updateSyncStatusPeriodically`. cancel() + klog.V(3).Info("Updating sync status (after sync)") + if err := setSyncStatus(ctx, p, state, false, syncErrs); err != nil { + syncErrs = status.Append(syncErrs, err) + } + + return status.Append(sourceErrs, syncErrs) +} + +// setSyncStatus updates `.status.sync` and the Syncing condition, if needed, +// as well as `state.syncStatus` and `state.syncingConditionLastUpdate` if +// the update is successful. +func setSyncStatus(ctx context.Context, p Parser, state *reconcilerState, syncing bool, syncErrs status.MultiError) error { newSyncStatus := sourceStatus{ commit: state.cache.source.commit, errs: syncErrs, lastUpdate: metav1.Now(), } if state.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, syncErrs); err != nil { - syncErrs = status.Append(syncErrs, err) - } else { - state.syncStatus = newSyncStatus - state.syncingConditionLastUpdate = newSyncStatus.lastUpdate + if err := p.setSyncStatus(ctx, syncing, syncErrs); err != nil { + return err } + state.syncStatus = newSyncStatus + state.syncingConditionLastUpdate = newSyncStatus.lastUpdate } - - return status.Append(sourceErrs, syncErrs) + return nil } -// updateSyncStatus update the sync status periodically until the cancellation function of the context is called. -func updateSyncStatus(ctx context.Context, p Parser) { - updatePeriod := 5 * time.Second +// updateSyncStatusPeriodically updates the sync status periodically until the +// context is cancelled. +func updateSyncStatusPeriodically(ctx context.Context, p Parser, state *reconcilerState) { + updatePeriod := p.options().statusUpdateDelay updateTimer := time.NewTimer(updatePeriod) defer updateTimer.Stop() for { select { case <-ctx.Done(): - // ctx.Done() is closed when the cancellation function of the context is called. + // ctx.Done() is closed when the context is cancelled. return case <-updateTimer.C: - if err := p.SetSyncStatus(ctx, p.options().updater.applier.Errors()); err != nil { + klog.V(3).Info("Updating sync status (periodic while syncing)") + syncing, syncErrs := p.options().status() + if err := setSyncStatus(ctx, p, state, syncing, syncErrs); err != nil { klog.Warningf("failed to update sync status: %v", err) } - remediatorErrs := p.options().updater.remediator.ConflictErrors() - if len(remediatorErrs) > 0 { - UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()) - } updateTimer.Reset(updatePeriod) } } } - -// UpdateConflictManagerStatus reports the conflict in the conflicting manager. -func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) { - conflictingManagerErrors := map[string][]status.ManagementConflictError{} - for _, conflictError := range conflictErrs { - conflictingManager := conflictError.ConflictingManager() - err := conflictError.ConflictingManagerError() - conflictingManagerErrors[conflictingManager] = append(conflictingManagerErrors[conflictingManager], err) - } - - for conflictingManager, conflictErrors := range conflictingManagerErrors { - scope, name := declared.ManagerScopeAndName(conflictingManager) - if scope != declared.RootReconciler { - klog.Infof("No need to notify namespace reconciler for the conflict") - continue - } - if err := prependRootSyncRemediatorStatus(ctx, k8sClient, name, conflictErrors, defaultDenominator); err != nil { - klog.Warningf("failed to add the management conflict error to RootSync %s: %v", name, err) - } - } -} diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index accdb8d77f..a42ecf5630 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -16,6 +16,7 @@ package parse import ( "context" + "sync" "time" "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1" @@ -36,7 +37,13 @@ type updater struct { scope declared.Scope resources *declared.Resources remediator remediator.Interface - applier applier.Interface + applier applier.KptApplier + + updateLock sync.Mutex + statusLock sync.RWMutex + updating bool + validationErrs status.MultiError + watchErrs status.MultiError } func (u *updater) needToUpdateWatch() bool { @@ -67,6 +74,11 @@ func (u *updater) declaredCRDs() ([]*v1beta1.CustomResourceDefinition, status.Mu // update updates the declared resources in memory, applies the resources, and sets // up the watches. func (u *updater) update(ctx context.Context, cache *cacheForCommit) status.MultiError { + u.updateLock.Lock() + defer u.updateLock.Unlock() + u.setUpdating(true) + defer u.setUpdating(false) + var errs status.MultiError objs := filesystem.AsCoreObjects(cache.objsToApply) @@ -75,11 +87,11 @@ func (u *updater) update(ctx context.Context, cache *cacheForCommit) status.Mult if !cache.resourceDeclSetUpdated { var err status.MultiError objs, err = u.resources.Update(ctx, objs) + u.setValidationErrors(err) if err != nil { klog.Infof("Terminate the reconciliation (failed to update the declared resources): %v", err) return err } - if cache.parserErrs == nil { cache.resourceDeclSetUpdated = true } @@ -102,6 +114,45 @@ func (u *updater) update(ctx context.Context, cache *cacheForCommit) status.Mult // Update the Remediator's watches to start new ones and stop old ones. watchErrs := u.remediator.UpdateWatches(ctx, gvks) errs = status.Append(errs, watchErrs) + u.setWatchErrors(watchErrs) - return errs + // Prepend conflict errors to raise awareness + var allErrs status.MultiError + for _, e := range u.remediator.ConflictErrors() { + allErrs = status.Append(allErrs, e) + } + return status.Append(allErrs, errs) +} + +func (u *updater) setUpdating(updating bool) { + u.statusLock.Lock() + defer u.statusLock.Unlock() + u.updating = updating +} + +func (u *updater) setValidationErrors(errs status.MultiError) { + u.statusLock.Lock() + defer u.statusLock.Unlock() + u.validationErrs = errs +} + +func (u *updater) setWatchErrors(errs status.MultiError) { + u.statusLock.Lock() + defer u.statusLock.Unlock() + u.watchErrs = errs +} + +// status returns the current status of the updater. +// This method is thread-safe to call whether the update is running or not. +func (u *updater) status() (updating bool, errs status.MultiError) { + u.statusLock.RLock() + defer u.statusLock.RUnlock() + // Prepend conflict errors to raise awareness + for _, e := range u.remediator.ConflictErrors() { + errs = status.Append(errs, e) + } + errs = status.Append(errs, u.validationErrs) + errs = status.Append(errs, u.applier.Errors()) + errs = status.Append(errs, u.watchErrs) + return u.updating, errs } diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 5d8351ca2c..2b51a64454 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -15,7 +15,6 @@ package reconciler import ( - "context" "time" "k8s.io/klog/v2" @@ -61,12 +60,17 @@ type Options struct { SyncName string // ReconcilerName is the name of the Reconciler Deployment. ReconcilerName string - // ResyncPeriod is the period of time between forced re-sync from source (even - // without a new commit). - ResyncPeriod time.Duration - // FilesystemPollingFrequency is how often to check the local source repository for - // changes. - FilesystemPollingFrequency time.Duration + // PollingDelay is how long the parser waits between run attempts. + // Each attempt re-reads the latest configuration from the filesystem. + PollingDelay time.Duration + // RetryDelay is how long the parser waits between retries, after an error. + RetryDelay time.Duration + // ResyncDelay is how long the parser waits between full re-sync from source + // (even without a new commit). + ResyncDelay time.Duration + // StatusUpdateDelay is how long the parser waits between updates of the + // sync status, to account for management conflict errors from the remediator. + StatusUpdateDelay time.Duration // SourceRoot is the absolute path to the source repository. // Usually contains a symlink that must be resolved every time before parsing. SourceRoot cmpath.Absolute @@ -203,13 +207,13 @@ func Run(opts Options) { } if opts.ReconcilerScope == declared.RootReconciler { parser, err = parse.NewRootRunner(opts.ClusterName, opts.SyncName, opts.ReconcilerName, opts.SourceFormat, &reader.File{}, cl, - opts.FilesystemPollingFrequency, opts.ResyncPeriod, fs, discoveryClient, decls, a, rem) + opts.PollingDelay, opts.ResyncDelay, opts.RetryDelay, opts.StatusUpdateDelay, fs, discoveryClient, decls, a, rem) if err != nil { klog.Fatalf("Instantiating Root Repository Parser: %v", err) } } else { parser, err = parse.NewNamespaceRunner(opts.ClusterName, opts.SyncName, opts.ReconcilerName, opts.ReconcilerScope, &reader.File{}, cl, - opts.FilesystemPollingFrequency, opts.ResyncPeriod, fs, discoveryClient, decls, a, rem) + opts.PollingDelay, opts.ResyncDelay, opts.RetryDelay, opts.StatusUpdateDelay, fs, discoveryClient, decls, a, rem) if err != nil { klog.Fatalf("Instantiating Namespace Repository Parser: %v", err) } @@ -220,14 +224,6 @@ func Run(opts Options) { // Start the Remediator (non-blocking). doneChanForRemediator := rem.Start(ctx) - // Start the StatusUpdater (non-blocking). - ctxForUpdateStatus, cancel := context.WithCancel(context.Background()) - doneChForUpdateStatus := make(chan struct{}) - go func() { - defer close(doneChForUpdateStatus) - updateStatus(ctxForUpdateStatus, parser) - }() - // Start the Parser (blocking). // This will not return until: // - the Context is cancelled, or @@ -235,37 +231,8 @@ func Run(opts Options) { parse.Run(ctx, parser) klog.Info("Parser exited") - // Stop the StatusUpdater - cancel() - // Wait for StatusUpdater to exit - <-doneChForUpdateStatus - klog.Info("StatusUpdater exited") - // Wait for Remediator to exit <-doneChanForRemediator klog.Info("Remediator exited") klog.Info("All controllers exited") } - -// updateStatus update the status periodically until the cancellation function of the context is called. -func updateStatus(ctx context.Context, p parse.Parser) { - updatePeriod := 5 * time.Second - updateTimer := time.NewTimer(updatePeriod) - defer updateTimer.Stop() - for { - select { - case <-ctx.Done(): - // ctx.Done() is closed when the cancellation function of the context is called. - return - case <-updateTimer.C: - if !p.Reconciling() { - if err := p.SetSyncStatus(ctx, p.ApplierErrors()); err != nil { - klog.Warningf("failed to update remediator errors: %v", err) - } - parse.UpdateConflictManagerStatus(ctx, p.RemediatorConflictErrors(), p.K8sClient()) - } - // else if `p.Reconciling` is true, `parse.Run` will update the status periodically. - updateTimer.Reset(updatePeriod) - } - } -} diff --git a/pkg/reconcilermanager/constants.go b/pkg/reconcilermanager/constants.go index 3699c73691..e82abeb5ec 100644 --- a/pkg/reconcilermanager/constants.go +++ b/pkg/reconcilermanager/constants.go @@ -96,10 +96,14 @@ const ( const ( // ReconcilerPollingPeriod defines how often the reconciler should poll the // filesystem for updates to the source or rendered configs. + // + // TODO: rename to `RECONCILER_POLLING_DELAY` to match internal name. ReconcilerPollingPeriod = "RECONCILER_POLLING_PERIOD" // HydrationPollingPeriod defines how often the hydration controller should // poll the filesystem for rendering the DRY configs. + // + // TODO: rename to `HYDRATION_POLLING_DELAY` to match internal name. HydrationPollingPeriod = "HYDRATION_POLLING_PERIOD" ) diff --git a/pkg/reconcilermanager/controllers/reconciler_base.go b/pkg/reconcilermanager/controllers/reconciler_base.go index f4741cdae7..e64d21a282 100644 --- a/pkg/reconcilermanager/controllers/reconciler_base.go +++ b/pkg/reconcilermanager/controllers/reconciler_base.go @@ -68,14 +68,14 @@ const ( // reconcilerBase provides common data and methods for the RepoSync and RootSync reconcilers type reconcilerBase struct { - clusterName string - client client.Client - log logr.Logger - scheme *runtime.Scheme - isAutopilotCluster *bool - reconcilerPollingPeriod time.Duration - hydrationPollingPeriod time.Duration - membership *hubv1.Membership + clusterName string + client client.Client + log logr.Logger + scheme *runtime.Scheme + isAutopilotCluster *bool + reconcilerPollingDelay time.Duration + hydrationPollingDelay time.Duration + membership *hubv1.Membership // syncKind is the kind of the sync object: RootSync or RepoSync. syncKind string diff --git a/pkg/reconcilermanager/controllers/reposync_controller.go b/pkg/reconcilermanager/controllers/reposync_controller.go index c05d0c1543..d01a7aad35 100644 --- a/pkg/reconcilermanager/controllers/reposync_controller.go +++ b/pkg/reconcilermanager/controllers/reposync_controller.go @@ -69,16 +69,16 @@ type RepoSyncReconciler struct { } // NewRepoSyncReconciler returns a new RepoSyncReconciler. -func NewRepoSyncReconciler(clusterName string, reconcilerPollingPeriod, hydrationPollingPeriod time.Duration, client client.Client, log logr.Logger, scheme *runtime.Scheme) *RepoSyncReconciler { +func NewRepoSyncReconciler(clusterName string, reconcilerPollingDelay, hydrationPollingDelay time.Duration, client client.Client, log logr.Logger, scheme *runtime.Scheme) *RepoSyncReconciler { return &RepoSyncReconciler{ reconcilerBase: reconcilerBase{ - clusterName: clusterName, - client: client, - log: log, - scheme: scheme, - reconcilerPollingPeriod: reconcilerPollingPeriod, - hydrationPollingPeriod: hydrationPollingPeriod, - syncKind: configsync.RepoSyncKind, + clusterName: clusterName, + client: client, + log: log, + scheme: scheme, + reconcilerPollingDelay: reconcilerPollingDelay, + hydrationPollingDelay: hydrationPollingDelay, + syncKind: configsync.RepoSyncKind, }, repoSyncs: make(map[types.NamespacedName]struct{}), } @@ -656,8 +656,8 @@ func requeueRepoSyncRequest(obj client.Object, rs *v1beta1.RepoSync) []reconcile func (r *RepoSyncReconciler) populateContainerEnvs(ctx context.Context, rs *v1beta1.RepoSync, reconcilerName string) map[string][]corev1.EnvVar { result := map[string][]corev1.EnvVar{ - reconcilermanager.HydrationController: hydrationEnvs(rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, declared.Scope(rs.Namespace), reconcilerName, r.hydrationPollingPeriod.String()), - reconcilermanager.Reconciler: reconcilerEnvs(r.clusterName, rs.Name, reconcilerName, declared.Scope(rs.Namespace), rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, reposync.GetHelmBase(rs.Spec.Helm), r.reconcilerPollingPeriod.String(), rs.Spec.SafeOverride().StatusMode, v1beta1.GetReconcileTimeout(rs.Spec.SafeOverride().ReconcileTimeout), v1beta1.GetAPIServerTimeout(rs.Spec.SafeOverride().APIServerTimeout)), + reconcilermanager.HydrationController: hydrationEnvs(rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, declared.Scope(rs.Namespace), reconcilerName, r.hydrationPollingDelay.String()), + reconcilermanager.Reconciler: reconcilerEnvs(r.clusterName, rs.Name, reconcilerName, declared.Scope(rs.Namespace), rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, reposync.GetHelmBase(rs.Spec.Helm), r.reconcilerPollingDelay.String(), rs.Spec.SafeOverride().StatusMode, v1beta1.GetReconcileTimeout(rs.Spec.SafeOverride().ReconcileTimeout), v1beta1.GetAPIServerTimeout(rs.Spec.SafeOverride().APIServerTimeout)), } switch v1beta1.SourceType(rs.Spec.SourceType) { case v1beta1.GitSource: diff --git a/pkg/reconcilermanager/controllers/rootsync_controller.go b/pkg/reconcilermanager/controllers/rootsync_controller.go index df5391994e..51ed864d96 100644 --- a/pkg/reconcilermanager/controllers/rootsync_controller.go +++ b/pkg/reconcilermanager/controllers/rootsync_controller.go @@ -77,16 +77,16 @@ type RootSyncReconciler struct { } // NewRootSyncReconciler returns a new RootSyncReconciler. -func NewRootSyncReconciler(clusterName string, reconcilerPollingPeriod, hydrationPollingPeriod time.Duration, client client.Client, log logr.Logger, scheme *runtime.Scheme) *RootSyncReconciler { +func NewRootSyncReconciler(clusterName string, reconcilerPollingDelay, hydrationPollingDelay time.Duration, client client.Client, log logr.Logger, scheme *runtime.Scheme) *RootSyncReconciler { return &RootSyncReconciler{ reconcilerBase: reconcilerBase{ - clusterName: clusterName, - client: client, - log: log, - scheme: scheme, - reconcilerPollingPeriod: reconcilerPollingPeriod, - hydrationPollingPeriod: hydrationPollingPeriod, - syncKind: configsync.RootSyncKind, + clusterName: clusterName, + client: client, + log: log, + scheme: scheme, + reconcilerPollingDelay: reconcilerPollingDelay, + hydrationPollingDelay: hydrationPollingDelay, + syncKind: configsync.RootSyncKind, }, } } @@ -460,8 +460,8 @@ func (r *RootSyncReconciler) mapSecretToRootSyncs(secret client.Object) []reconc func (r *RootSyncReconciler) populateContainerEnvs(ctx context.Context, rs *v1beta1.RootSync, reconcilerName string) map[string][]corev1.EnvVar { result := map[string][]corev1.EnvVar{ - reconcilermanager.HydrationController: hydrationEnvs(rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, declared.RootReconciler, reconcilerName, r.hydrationPollingPeriod.String()), - reconcilermanager.Reconciler: append(reconcilerEnvs(r.clusterName, rs.Name, reconcilerName, declared.RootReconciler, rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, rootsync.GetHelmBase(rs.Spec.Helm), r.reconcilerPollingPeriod.String(), rs.Spec.SafeOverride().StatusMode, v1beta1.GetReconcileTimeout(rs.Spec.SafeOverride().ReconcileTimeout), v1beta1.GetAPIServerTimeout(rs.Spec.SafeOverride().APIServerTimeout)), sourceFormatEnv(rs.Spec.SourceFormat)), + reconcilermanager.HydrationController: hydrationEnvs(rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, declared.RootReconciler, reconcilerName, r.hydrationPollingDelay.String()), + reconcilermanager.Reconciler: append(reconcilerEnvs(r.clusterName, rs.Name, reconcilerName, declared.RootReconciler, rs.Spec.SourceType, rs.Spec.Git, rs.Spec.Oci, rootsync.GetHelmBase(rs.Spec.Helm), r.reconcilerPollingDelay.String(), rs.Spec.SafeOverride().StatusMode, v1beta1.GetReconcileTimeout(rs.Spec.SafeOverride().ReconcileTimeout), v1beta1.GetAPIServerTimeout(rs.Spec.SafeOverride().APIServerTimeout)), sourceFormatEnv(rs.Spec.SourceFormat)), } switch v1beta1.SourceType(rs.Spec.SourceType) { case v1beta1.GitSource: diff --git a/pkg/reconcilermanager/controllers/util.go b/pkg/reconcilermanager/controllers/util.go index 028d568d0e..e658ec5113 100644 --- a/pkg/reconcilermanager/controllers/util.go +++ b/pkg/reconcilermanager/controllers/util.go @@ -35,7 +35,7 @@ import ( ) // hydrationEnvs returns environment variables for the hydration controller. -func hydrationEnvs(sourceType string, gitConfig *v1beta1.Git, ociConfig *v1beta1.Oci, scope declared.Scope, reconcilerName, pollPeriod string) []corev1.EnvVar { +func hydrationEnvs(sourceType string, gitConfig *v1beta1.Git, ociConfig *v1beta1.Oci, scope declared.Scope, reconcilerName, pollingPeriod string) []corev1.EnvVar { var result []corev1.EnvVar var syncDir string switch v1beta1.SourceType(sourceType) { @@ -68,16 +68,15 @@ func hydrationEnvs(sourceType string, gitConfig *v1beta1.Git, ociConfig *v1beta1 Name: reconcilermanager.SyncDirKey, Value: syncDir, }, - // Add Hydration Polling Period. corev1.EnvVar{ Name: reconcilermanager.HydrationPollingPeriod, - Value: pollPeriod, + Value: pollingPeriod, }) return result } // reconcilerEnvs returns environment variables for namespace reconciler. -func reconcilerEnvs(clusterName, syncName, reconcilerName string, reconcilerScope declared.Scope, sourceType string, gitConfig *v1beta1.Git, ociConfig *v1beta1.Oci, helmConfig *v1beta1.HelmBase, pollPeriod, statusMode string, reconcileTimeout string, apiServerTimeout string) []corev1.EnvVar { +func reconcilerEnvs(clusterName, syncName, reconcilerName string, reconcilerScope declared.Scope, sourceType string, gitConfig *v1beta1.Git, ociConfig *v1beta1.Oci, helmConfig *v1beta1.HelmBase, pollingPeriod, statusMode string, reconcileTimeout string, apiServerTimeout string) []corev1.EnvVar { var result []corev1.EnvVar if statusMode == "" { statusMode = applier.StatusEnabled @@ -154,10 +153,9 @@ func reconcilerEnvs(clusterName, syncName, reconcilerName string, reconcilerScop Name: reconcilermanager.ReconcileTimeout, Value: reconcileTimeout, }, - // Add Filesystem Polling Period. corev1.EnvVar{ Name: reconcilermanager.ReconcilerPollingPeriod, - Value: pollPeriod, + Value: pollingPeriod, }, corev1.EnvVar{ Name: reconcilermanager.APIServerTimeout, diff --git a/pkg/remediator/remediator.go b/pkg/remediator/remediator.go index be644190f7..a3ebdc6565 100644 --- a/pkg/remediator/remediator.go +++ b/pkg/remediator/remediator.go @@ -40,7 +40,7 @@ type Remediator struct { watchMgr *watch.Manager workers []*reconcile.Worker // The following fields are guarded by the mutex. - mux sync.Mutex + mux sync.RWMutex // conflictErrs tracks all the management conflicts the remediator encounters, // and report to RootSync|RepoSync status. conflictErrs []status.ManagementConflictError @@ -128,6 +128,8 @@ func (r *Remediator) ManagementConflict() bool { // ConflictErrors implements Interface. func (r *Remediator) ConflictErrors() []status.ManagementConflictError { + r.mux.RLock() + defer r.mux.RUnlock() return r.conflictErrs }