From b689d33afc93b9f0a001fe7f3654c10962adbbdf Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Thu, 3 Nov 2022 16:16:44 -0700 Subject: [PATCH] Clean up Applier - Use locks to protect the MultiError, Syncing, and Apply methods independently. This is a hack to improve thread-safety without changing the Applier interface. In the future, the interface should probably be rewritten to use channels instead. - Replace `parser.SetSyncStatus` with `updateSyncStatus`, which handles prepending management conflict errors from the remediator. - Move the periodic sync status update from the reconciler into the parser. This removes the need for some synchronization that wasn't previously thread-safe. - Remove several parser methods which are obsolete with the above changes. - Rename applier.Interface to KptApplier to make way for the KptDestroyer interface later. - Route the retry delay and status update delay up through the parser and reconciler. These are now hard-coded with the rest of the constants. They could be made into options later, but I didn't want to expose new options until we need them exposed. - Rename -Frequency and -Period options to -Delay which more accurately reflects how they are used. Period is how long something takes. Frequency is how often something occurs. Delay is the time between occurances. Change-Id: I1c3373f98bb5d842ce113ba61dc0c87e1119c665 --- cmd/hydration-controller/main.go | 37 ++++--- cmd/reconciler-manager/main.go | 4 +- cmd/reconciler/main.go | 47 +++++---- pkg/api/configsync/register.go | 28 +++-- pkg/applier/kpt_applier.go | 158 ++++++++++++++++++----------- pkg/applier/kpt_applier_test.go | 4 +- pkg/hydrate/controller.go | 16 +-- pkg/parse/namespace.go | 61 +++++------ pkg/parse/opts.go | 43 +++----- pkg/parse/root.go | 116 +++++---------------- pkg/parse/run.go | 90 +++++++--------- pkg/parse/updater.go | 2 +- pkg/reconciler/reconciler.go | 59 +++-------- pkg/reconcilermanager/constants.go | 2 + 14 files changed, 301 insertions(+), 366 deletions(-) diff --git a/cmd/hydration-controller/main.go b/cmd/hydration-controller/main.go index 805d9e6b23..127e54d850 100644 --- a/cmd/hydration-controller/main.go +++ b/cmd/hydration-controller/main.go @@ -19,16 +19,17 @@ import ( "flag" "os" "strings" - "time" "k8s.io/klog/v2" "k8s.io/klog/v2/klogr" + "kpt.dev/configsync/pkg/api/configsync" "kpt.dev/configsync/pkg/api/configsync/v1beta1" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/kmetrics" "kpt.dev/configsync/pkg/profiler" "kpt.dev/configsync/pkg/reconcilermanager" + "kpt.dev/configsync/pkg/reconcilermanager/controllers" "kpt.dev/configsync/pkg/util/log" ctrl "sigs.k8s.io/controller-runtime" ) @@ -55,15 +56,18 @@ var ( syncDir = flag.String("sync-dir", os.Getenv(reconcilermanager.SyncDirKey), "Relative path of the root directory within the repo.") - hydrationPollingPeriodStr = flag.String("polling-period", os.Getenv(reconcilermanager.HydrationPollingPeriod), + // TODO: rename to `polling-delay` to match internal name. + pollingDelay = flag.Duration("polling-period", + controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay), "Period of time between checking the filesystem for rendering the DRY configs.") - // rehydratePeriod sets the hydration-controller to re-run the hydration process + // rehydrateDelay sets the hydration-controller to re-run the hydration process // periodically when errors happen. It retries on both transient errors and permanent errors. // Other ways to trigger the hydration process are: // - push a new commit // - delete the done file from the hydration-controller. - rehydratePeriod = flag.Duration("rehydrate-period", 30*time.Minute, + // TODO: rename to `rehydrate-delay` to match internal name. + rehydrateDelay = flag.Duration("rehydrate-period", configsync.DefaultHydrationRetryDelay, "Period of time between rehydrating on errors.") reconcilerName = flag.String("reconciler-name", os.Getenv(reconcilermanager.ReconcilerNameKey), @@ -107,22 +111,17 @@ func main() { dir := strings.TrimPrefix(*syncDir, "/") relSyncDir := cmpath.RelativeOS(dir) - hydrationPollingPeriod, err := time.ParseDuration(*hydrationPollingPeriodStr) - if err != nil { - klog.Fatalf("Failed to get hydration polling period: %v", err) - } - hydrator := &hydrate.Hydrator{ - DonePath: absDonePath, - SourceType: v1beta1.SourceType(*sourceType), - SourceRoot: absSourceRootDir, - HydratedRoot: absHydratedRootDir, - SourceLink: *sourceLinkDir, - HydratedLink: *hydratedLinkDir, - SyncDir: relSyncDir, - PollingFrequency: hydrationPollingPeriod, - RehydrateFrequency: *rehydratePeriod, - ReconcilerName: *reconcilerName, + DonePath: absDonePath, + SourceType: v1beta1.SourceType(*sourceType), + SourceRoot: absSourceRootDir, + HydratedRoot: absHydratedRootDir, + SourceLink: *sourceLinkDir, + HydratedLink: *hydratedLinkDir, + SyncDir: relSyncDir, + PollingDelay: *pollingDelay, + RehydrateDelay: *rehydrateDelay, + ReconcilerName: *reconcilerName, } hydrator.Run(context.Background()) diff --git a/cmd/reconciler-manager/main.go b/cmd/reconciler-manager/main.go index 51379f2530..316db7f7a5 100644 --- a/cmd/reconciler-manager/main.go +++ b/cmd/reconciler-manager/main.go @@ -41,10 +41,10 @@ var ( clusterName = flag.String("cluster-name", os.Getenv(reconcilermanager.ClusterNameKey), "Cluster name to use for Cluster selection") - reconcilerPollingPeriod = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod), + reconcilerPollingPeriod = flag.Duration("reconciler-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay), "How often the reconciler should poll the filesystem for updates to the source or rendered configs.") - hydrationPollingPeriod = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingPeriod), + hydrationPollingPeriod = flag.Duration("hydration-polling-period", controllers.PollingPeriod(reconcilermanager.HydrationPollingPeriod, configsync.DefaultHydrationPollingDelay), "How often the hydration-controller should poll the filesystem for rendering the DRY configs.") setupLog = ctrl.Log.WithName("setup") diff --git a/cmd/reconciler/main.go b/cmd/reconciler/main.go index b078023971..ead45244ac 100644 --- a/cmd/reconciler/main.go +++ b/cmd/reconciler/main.go @@ -72,11 +72,14 @@ var ( fightDetectionThreshold = flag.Float64( "fight-detection-threshold", 5.0, "The rate of updates per minute to an API Resource at which the Syncer logs warnings about too many updates to the resource.") + // TODO: rename to `resync-delay` to match internal name resyncPeriod = flag.Duration("resync-period", time.Hour, "Period of time between forced re-syncs from source (even without a new commit).") workers = flag.Int("workers", 1, "Number of concurrent remediator workers to run at once.") - filesystemPollingPeriod = flag.Duration("filesystem-polling-period", controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingPeriod), + // TODO: rename to `filesystem-polling-delay` to match internal name. + pollingDelay = flag.Duration("filesystem-polling-period", + controllers.PollingPeriod(reconcilermanager.ReconcilerPollingPeriod, configsync.DefaultReconcilerPollingDelay), "Period of time between checking the filesystem for updates to the source or rendered configs.") // Root-Repo-only flags. If set for a Namespace-scoped Reconciler, causes the Reconciler to fail immediately. @@ -161,26 +164,28 @@ func main() { } opts := reconciler.Options{ - ClusterName: *clusterName, - FightDetectionThreshold: *fightDetectionThreshold, - NumWorkers: *workers, - ReconcilerScope: declared.Scope(*scope), - ResyncPeriod: *resyncPeriod, - FilesystemPollingFrequency: *filesystemPollingPeriod, - SourceRoot: absSourceDir, - RepoRoot: absRepoRoot, - HydratedRoot: *hydratedRootDir, - HydratedLink: *hydratedLinkDir, - SourceRev: *sourceRev, - SourceBranch: *sourceBranch, - SourceType: v1beta1.SourceType(*sourceType), - SourceRepo: *sourceRepo, - SyncDir: relSyncDir, - SyncName: *syncName, - ReconcilerName: *reconcilerName, - StatusMode: *statusMode, - ReconcileTimeout: *reconcileTimeout, - APIServerTimeout: *apiServerTimeout, + ClusterName: *clusterName, + FightDetectionThreshold: *fightDetectionThreshold, + NumWorkers: *workers, + ReconcilerScope: declared.Scope(*scope), + PollingDelay: *pollingDelay, + ResyncDelay: *resyncPeriod, + RetryDelay: configsync.DefaultReconcilerRetryDelay, + StatusUpdateDelay: configsync.DefaultSyncStatusUpdateDelay, + SourceRoot: absSourceDir, + RepoRoot: absRepoRoot, + HydratedRoot: *hydratedRootDir, + HydratedLink: *hydratedLinkDir, + SourceRev: *sourceRev, + SourceBranch: *sourceBranch, + SourceType: v1beta1.SourceType(*sourceType), + SourceRepo: *sourceRepo, + SyncDir: relSyncDir, + SyncName: *syncName, + ReconcilerName: *reconcilerName, + StatusMode: *statusMode, + ReconcileTimeout: *reconcileTimeout, + APIServerTimeout: *apiServerTimeout, } if declared.Scope(*scope) == declared.RootReconciler { diff --git a/pkg/api/configsync/register.go b/pkg/api/configsync/register.go index 872e0317fa..dde0738b6d 100644 --- a/pkg/api/configsync/register.go +++ b/pkg/api/configsync/register.go @@ -46,13 +46,29 @@ const ( // DefaultPeriodSecs is the default value in seconds between consecutive syncs. DefaultPeriodSecs = 15 - // DefaultReconcilerPollingPeriod defines how often the reconciler should poll - // the filesystem for updates to the source or rendered configs. - DefaultReconcilerPollingPeriod = 5 * time.Second + // DefaultReconcilerPollingDelay is the time delay between polling the + // filesystem for config updates to sync. + DefaultReconcilerPollingDelay = 5 * time.Second - // DefaultHydrationPollingPeriod defines how often the hydration controller - // should poll the filesystem for rendering the DRY configs. - DefaultHydrationPollingPeriod = 5 * time.Second + // DefaultHydrationPollingDelay is the time delay between polling the + // filesystem for config updates to render. + DefaultHydrationPollingDelay = 5 * time.Second + + // DefaultHydrationRetryDelay is the time delay between attempts to + // re-render config after an error. + // TODO: replace with retry-backoff strategy + DefaultHydrationRetryDelay = 30 * time.Minute + + // DefaultReconcilerRetryDelay is the time delay between parser runs when + // the last run errored or remediator watches need updating or there are + // management conflicts. + // TODO: replace with retry-backoff strategy + DefaultReconcilerRetryDelay = time.Second + + // DefaultSyncStatusUpdateDelay is the time delay between status updates + // when the parser is not running. These updates report new management + // conflict errors from the remediator, if there are any. + DefaultSyncStatusUpdateDelay = 5 * time.Second // DefaultReconcileTimeout defines the timeout of kpt applier reconcile/prune task DefaultReconcileTimeout = 5 * time.Minute diff --git a/pkg/applier/kpt_applier.go b/pkg/applier/kpt_applier.go index 51e4f93517..c1ba4fb627 100644 --- a/pkg/applier/kpt_applier.go +++ b/pkg/applier/kpt_applier.go @@ -74,30 +74,37 @@ type Applier struct { client client.Client // configFlags for creating clients configFlags *genericclioptions.ConfigFlags - // errs tracks all the errors the applier encounters. - // This field is cleared at the start of the `Applier.Apply` method - errs status.MultiError - // syncing indicates whether the applier is syncing. - syncing bool // syncKind is the Kind of the RSync object: RootSync or RepoSync syncKind string // syncName is the name of RSync object syncName string // syncNamespace is the namespace of RSync object syncNamespace string - // mux is an Applier-level mutext to prevent concurrent Apply() and Refresh() - mux sync.Mutex // statusMode controls if the applier injects the acutation status into the // ResourceGroup object statusMode string // reconcileTimeout controls the reconcile and prune timeout reconcileTimeout time.Duration + + // syncingLock protects reading `syncing` while changing its value. + syncingLock sync.RWMutex + // syncing indicates whether the applier or destroyer is running. + syncing bool + + // errsLock protects reading `errs` while changing its value. + errsLock sync.RWMutex + // errs tracks all the errors the applier encounters. + // This field is cleared at the start of an Apply + errs status.MultiError + + // syncLock prevents multiple Apply methods from running concurrently. + syncLock sync.Mutex } -// Interface is a fake-able subset of the interface Applier implements. +// KptApplier is a fake-able subset of the interface Applier implements. // // Placed here to make discovering the production implementation (above) easier. -type Interface interface { +type KptApplier interface { // Apply updates the resource API server with the latest parsed git resource. // This is called when a new change in the git resource is detected. It also // returns a map of the GVKs which were successfully applied by the Applier. @@ -108,7 +115,7 @@ type Interface interface { Syncing() bool } -var _ Interface = &Applier{} +var _ KptApplier = &Applier{} // NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from // the API server. @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err return SkipErrorForResource(err, id, actuation.ActuationStrategyApply) } +// processPruneEvent handles PruneEvents from the Applier func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error { id := idFrom(e.Identifier) klog.V(4).Infof("prune %v for object: %v", e.Status, id) @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client) } } -// sync triggers a kpt live apply library call to apply a set of resources. -func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { +// applyInner triggers a kpt live apply library call to apply a set of resources. +func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} { cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode) if err != nil { - return nil, Error(err) + a.addError(err) + return nil } a.checkInventoryObjectSize(ctx, cs.client) @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // through annotation. enabledObjs, disabledObjs := partitionObjs(objs) if len(disabledObjs) > 0 { - klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs)) + klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs)) disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs) if err != nil { - a.errs = status.Append(a.errs, err) - return nil, a.errs + a.addError(err) + return nil } s.DisableObjs = &stats.DisabledObjStats{ Total: uint64(len(disabledObjs)), Succeeded: disabledCount, } } - klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs)) - resources, toUnsErrs := toUnstructured(enabledObjs) - if toUnsErrs != nil { - return nil, toUnsErrs + klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs)) + resources, err := toUnstructured(enabledObjs) + if err != nil { + a.addError(err) + return nil } unknownTypeResources := make(map[core.ID]struct{}) @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // This allows for picking up CRD changes. mapper, err := a.configFlags.ToRESTMapper() if err != nil { - return nil, status.Append(nil, err) + a.addError(err) + return nil } meta.MaybeResetRESTMapper(mapper) @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr switch e.Type { case event.InitType: for _, ag := range e.InitEvent.ActionGroups { - klog.Info("InitEvent", ag) + klog.Info("Applier InitEvent", ag) } case event.ActionGroupType: klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.V(4).Info(e.ErrorEvent) if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) + a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) } else { - a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err)) + a.addError(e.ErrorEvent.Err) } s.ErrorTypeEvents++ case event.WaitType: @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // a reconciled object may become pending before a wait task times out. // Record the objs that have been reconciled. klog.V(4).Info(e.WaitEvent) - a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap) + if err != nil { + a.addError(err) + } case event.ApplyType: logEvent := event.ApplyEvent{ GroupName: e.ApplyEvent.GroupName, @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.ApplyEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)) + err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources) + if err != nil { + a.addError(err) + } case event.PruneType: logEvent := event.PruneEvent{ GroupName: e.PruneEvent.GroupName, @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.PruneEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)) + err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs) + if err != nil { + a.addError(err) + } default: - klog.V(4).Infof("skipped %v event", e.Type) + klog.V(4).Infof("Applier skipped %v event", e.Type) } } @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr } gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{} } - if a.errs == nil { - klog.V(4).Infof("all resources are up to date.") + if a.Errors() == nil { + klog.V(4).Infof("Applier completed without error") } if s.Empty() { - klog.V(4).Infof("The applier made no new progress") + klog.V(4).Infof("Applier made no new progress") } else { - klog.Infof("The applier made new progress: %s", s.String()) + klog.Infof("Applier made new progress: %s", s.String()) objStatusMap.Log(klog.V(0)) } - return gvks, a.errs + return gvks } -// Errors implements Interface. // Errors returns the errors encountered during apply. +// Errors implements Interface. func (a *Applier) Errors() status.MultiError { + a.errsLock.RLock() + defer a.errsLock.RUnlock() return a.errs } -// Syncing implements Interface. +func (a *Applier) addError(err error) { + // Ignore nil errors, to allow the caller to skip the nil check + if err == nil { + return + } + a.errsLock.Lock() + defer a.errsLock.Unlock() + + // Default to an applier.Error + _, isStatusErr := err.(status.Error) + _, isMultiErr := err.(status.MultiError) + if !isStatusErr && !isMultiErr { + err = Error(err) + } + a.errs = status.Append(a.errs, err) +} + +func (a *Applier) clearErrors() { + a.errsLock.Lock() + defer a.errsLock.Unlock() + a.errs = nil +} + // Syncing returns whether the applier is syncing. +// Syncing implements Interface. func (a *Applier) Syncing() bool { + a.syncingLock.RLock() + defer a.syncingLock.RUnlock() return a.syncing } +func (a *Applier) setSyncing(syncing bool) { + a.syncingLock.Lock() + defer a.syncingLock.Unlock() + a.syncing = syncing +} + +// Apply all managed resource objects and return any errors. // Apply implements Interface. func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { - // Clear the `errs` field at the start. - a.errs = nil - // Set the `syncing` field to `true` at the start. - a.syncing = true - - defer func() { - // Make sure to clear the `syncing` field before `Apply` returns. - a.syncing = false - }() - - a.mux.Lock() - defer a.mux.Unlock() - - // Pull the actual resources from the API server to compare against the - // declared resources. Note that we do not immediately return on error here - // because the Applier needs to try to do as much work as it can on each - // cycle. We collect and return all errors at the end. Some of those errors - // are transient and resolve in future cycles based on partial work completed - // in a previous cycle (eg ignore an error about a CR so that we can apply the - // CRD, then a future cycle is able to apply the CR). - // TODO: Here and elsewhere, pass the MultiError as a parameter. - return a.sync(ctx, desiredResource) + a.syncLock.Lock() + defer a.syncLock.Unlock() + + a.setSyncing(true) + defer a.setSyncing(false) + + // Clear the errors at the start of a new apply. + // TODO: Keep track of old errors and new errors and merge them, until applyInner returns. + a.clearErrors() + + objectStatusMap := a.applyInner(ctx, desiredResource) + return objectStatusMap, a.Errors() } // newInventoryUnstructured creates an inventory object as an unstructured. diff --git a/pkg/applier/kpt_applier_test.go b/pkg/applier/kpt_applier_test.go index ab8cb0944d..1746f35411 100644 --- a/pkg/applier/kpt_applier_test.go +++ b/pkg/applier/kpt_applier_test.go @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct return events } -func TestSync(t *testing.T) { +func TestApply(t *testing.T) { deploymentObj := newDeploymentObj() deploymentID := object.UnstructuredToObjMetadata(deploymentObj) @@ -263,7 +263,7 @@ func TestSync(t *testing.T) { } else { applier.clientSetFunc = applierFunc var gvks map[schema.GroupVersionKind]struct{} - gvks, errs = applier.sync(context.Background(), objs) + gvks, errs = applier.Apply(context.Background(), objs) if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff) } diff --git a/pkg/hydrate/controller.go b/pkg/hydrate/controller.go index 8bf3aef6ea..7a1755bf80 100644 --- a/pkg/hydrate/controller.go +++ b/pkg/hydrate/controller.go @@ -59,10 +59,10 @@ type Hydrator struct { HydratedLink string // SyncDir is the relative path to the configs within the Git repository. SyncDir cmpath.Relative - // PollingFrequency is the period of time between checking the filesystem for rendering the DRY configs. - PollingFrequency time.Duration - // RehydrateFrequency is the period of time between rehydrating on errors. - RehydrateFrequency time.Duration + // PollingDelay is the time delay between checking the filesystem for rendering the DRY configs. + PollingDelay time.Duration + // RehydrateDelay is the time delay between rehydrating on errors. + RehydrateDelay time.Duration // ReconcilerName is the name of the reconciler. ReconcilerName string } @@ -72,9 +72,9 @@ func (h *Hydrator) Run(ctx context.Context) { // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution // takes longer than the tick duration. - runTimer := time.NewTimer(h.PollingFrequency) + runTimer := time.NewTimer(h.PollingDelay) defer runTimer.Stop() - rehydrateTimer := time.NewTimer(h.RehydrateFrequency) + rehydrateTimer := time.NewTimer(h.RehydrateDelay) defer rehydrateTimer.Stop() absSourceDir := h.SourceRoot.Join(cmpath.RelativeSlash(h.SourceLink)) for { @@ -88,7 +88,7 @@ func (h *Hydrator) Run(ctx context.Context) { } else { h.rehydrateOnError(commit, syncDir.OSPath()) } - rehydrateTimer.Reset(h.RehydrateFrequency) // Schedule rehydrate attempt + rehydrateTimer.Reset(h.RehydrateDelay) // Schedule rehydrate attempt case <-runTimer.C: commit, syncDir, err := SourceCommitAndDir(h.SourceType, absSourceDir, h.SyncDir, h.ReconcilerName) if err != nil { @@ -102,7 +102,7 @@ func (h *Hydrator) Run(ctx context.Context) { klog.Errorf("failed to complete the rendering execution for commit %q: %v", commit, err) } } - runTimer.Reset(h.PollingFrequency) // Schedule re-run attempt + runTimer.Reset(h.PollingDelay) // Schedule re-run attempt } } } diff --git a/pkg/parse/namespace.go b/pkg/parse/namespace.go index 10be9df7a3..930509cd68 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/namespace.go @@ -41,7 +41,8 @@ import ( ) // NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +// TODO: replace with builder pattern to avoid too many arguments. +func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingDelay, resyncDelay, retryDelay, statusUpdateDelay time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -49,14 +50,16 @@ func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope decl return &namespace{ opts: opts{ - clusterName: clusterName, - client: c, - syncName: syncName, - reconcilerName: reconcilerName, - pollingFrequency: pollingFrequency, - resyncPeriod: resyncPeriod, - files: files{FileSource: fs}, - parser: filesystem.NewParser(fileReader), + clusterName: clusterName, + client: c, + syncName: syncName, + reconcilerName: reconcilerName, + pollingDelay: pollingDelay, + resyncDelay: resyncDelay, + retryDelay: retryDelay, + statusUpdateDelay: statusUpdateDelay, + files: files{FileSource: fs}, + parser: filesystem.NewParser(fileReader), updater: updater{ scope: scope, resources: resources, @@ -268,16 +271,24 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus return nil } -// SetSyncStatus implements the Parser interface -// SetSyncStatus sets the RepoSync sync status. -// `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +// updateSyncStatus updates RepoSync `.status.sync` and Syncing condition. +// UpdateSyncStatus implements the Parser interface. +func (p *namespace) updateSyncStatus(ctx context.Context) error { p.mux.Lock() defer p.mux.Unlock() - return p.setSyncStatusWithRetries(ctx, errs, defaultDenominator) + + var errs status.MultiError + // Add management conflict errors from the remediator + for _, e := range p.remediator.ConflictErrors() { + errs = status.Append(errs, e) + } + // Add apply errors from the applier + errs = status.Append(errs, p.applier.Errors()) + + return p.setSyncStatusWithRetries(ctx, p.applier.Syncing(), errs, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *namespace) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -289,9 +300,6 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -330,24 +338,9 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope)) } return nil } - -// ApplierErrors implements the Parser interface -func (p *namespace) ApplierErrors() status.MultiError { - return p.applier.Errors() -} - -// RemediatorConflictErrors implements the Parser interface -func (p *namespace) RemediatorConflictErrors() []status.ManagementConflictError { - return p.remediator.ConflictErrors() -} - -// K8sClient implements the Parser interface -func (p *namespace) K8sClient() client.Client { - return p.client -} diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 134a9f24a0..23efd233f7 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -43,14 +43,20 @@ type opts struct { // syncName is the name of the RootSync or RepoSync object. syncName string - // pollingFrequency is how often to re-import configuration from the filesystem. - // - // For tests, use zero as it will poll continuously. - pollingFrequency time.Duration + // pollingDelay is how long the parser waits between run attempts. + // Each attempt re-reads the latest configuration from the filesystem. + pollingDelay time.Duration - // ResyncPeriod is the period of time between forced re-sync from source (even - // without a new commit). - resyncPeriod time.Duration + // resyncDelay is how long the parser waits between full re-sync from source + // (even without a new commit). + resyncDelay time.Duration + + // retryDelay is how long the parser waits between retries, after an error. + retryDelay time.Duration + + // statusUpdateDelay is how long the parser waits between updates of the + // sync status, to account for management conflict errors from the remediator. + statusUpdateDelay time.Duration // discoveryInterface is how the parser learns what types are currently // available on the cluster. @@ -60,9 +66,6 @@ type opts struct { // objects in Git. converter *declared.ValueConverter - // reconciling indicates whether the reconciler is reconciling a change. - reconciling bool - // mux prevents status update conflicts. mux *sync.Mutex @@ -75,18 +78,8 @@ type Parser interface { parseSource(ctx context.Context, state sourceState) ([]ast.FileObject, status.MultiError) setSourceStatus(ctx context.Context, newStatus sourceStatus) error setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error - SetSyncStatus(ctx context.Context, errs status.MultiError) error + updateSyncStatus(ctx context.Context) error options() *opts - // SetReconciling sets the field indicating whether the reconciler is reconciling a change. - SetReconciling(value bool) - // Reconciling returns whether the reconciler is reconciling a change. - Reconciling() bool - // ApplierErrors returns the errors surfaced by the applier. - ApplierErrors() status.MultiError - // RemediatorConflictErrors returns the conflict errors detected by the remediator. - RemediatorConflictErrors() []status.ManagementConflictError - // K8sClient returns the Kubernetes client that talks to the API server. - K8sClient() client.Client } func (o *opts) k8sClient() client.Client { @@ -96,11 +89,3 @@ func (o *opts) k8sClient() client.Client { func (o *opts) discoveryClient() discovery.ServerResourcer { return o.discoveryInterface } - -func (o *opts) SetReconciling(value bool) { - o.reconciling = value -} - -func (o *opts) Reconciling() bool { - return o.reconciling -} diff --git a/pkg/parse/root.go b/pkg/parse/root.go index 7218489d51..a9250d6ce4 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root.go @@ -51,21 +51,24 @@ import ( ) // NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +// TODO: replace with builder pattern to avoid too many arguments. +func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingDelay, resyncDelay, retryDelay, statusUpdateDelay time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err } opts := opts{ - clusterName: clusterName, - syncName: syncName, - reconcilerName: reconcilerName, - client: c, - pollingFrequency: pollingFrequency, - resyncPeriod: resyncPeriod, - files: files{FileSource: fs}, - parser: filesystem.NewParser(fileReader), + clusterName: clusterName, + syncName: syncName, + reconcilerName: reconcilerName, + client: c, + pollingDelay: pollingDelay, + resyncDelay: resyncDelay, + retryDelay: retryDelay, + statusUpdateDelay: statusUpdateDelay, + files: files{FileSource: fs}, + parser: filesystem.NewParser(fileReader), updater: updater{ scope: declared.RootReconciler, resources: resources, @@ -357,23 +360,24 @@ func setRenderingStatus(rendering *v1beta1.RenderingStatus, p Parser, newStatus rendering.LastUpdate = newStatus.lastUpdate } -// SetSyncStatus implements the Parser interface -// SetSyncStatus sets the RootSync sync status. -// `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +// updateSyncStatus updates RootSync `.status.sync` and Syncing condition. +// updateSyncStatus implements the Parser interface. +func (p *root) updateSyncStatus(ctx context.Context) error { p.mux.Lock() defer p.mux.Unlock() - var allErrs status.MultiError - remediatorErrs := p.remediator.ConflictErrors() - for _, e := range remediatorErrs { - allErrs = status.Append(allErrs, e) - } - // Add conflicting errors before other apply errors. - allErrs = status.Append(allErrs, errs) - return p.setSyncStatusWithRetries(ctx, allErrs, defaultDenominator) + + var errs status.MultiError + // Add management conflict errors from the remediator + for _, e := range p.remediator.ConflictErrors() { + errs = status.Append(errs, e) + } + // Add apply errors from the applier + errs = status.Append(errs, p.applier.Errors()) + + return p.setSyncStatusWithRetries(ctx, p.applier.Syncing(), errs, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *root) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -385,9 +389,6 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -426,7 +427,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, "failed to update RootSync sync status") } @@ -532,66 +533,3 @@ func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, s return objs, errs } - -// ApplierErrors implements the Parser interface -func (p *root) ApplierErrors() status.MultiError { - return p.applier.Errors() -} - -// RemediatorConflictErrors implements the Parser interface -func (p *root) RemediatorConflictErrors() []status.ManagementConflictError { - return p.remediator.ConflictErrors() -} - -// K8sClient implements the Parser interface -func (p *root) K8sClient() client.Client { - return p.client -} - -// prependRootSyncRemediatorStatus adds the conflict error detected by the remediator to the front of the sync errors. -func prependRootSyncRemediatorStatus(ctx context.Context, client client.Client, syncName string, conflictErrs []status.ManagementConflictError, denominator int) error { - if denominator <= 0 { - return fmt.Errorf("The denominator must be a positive number") - } - - var rs v1beta1.RootSync - if err := client.Get(ctx, rootsync.ObjectKey(syncName), &rs); err != nil { - return status.APIServerError(err, "failed to get RootSync: "+syncName) - } - - var errs []v1beta1.ConfigSyncError - for _, conflictErr := range conflictErrs { - conflictCSEError := conflictErr.ToCSE() - conflictPairCSEError := conflictErr.CurrentManagerError().ToCSE() - errorFound := false - for _, e := range rs.Status.Sync.Errors { - // Dedup the same remediator conflict error. - if e.Code == status.ManagementConflictErrorCode && (e.ErrorMessage == conflictCSEError.ErrorMessage || e.ErrorMessage == conflictPairCSEError.ErrorMessage) { - errorFound = true - break - } - } - if !errorFound { - errs = append(errs, conflictCSEError) - } - } - - // No new errors, so no update - if len(errs) == 0 { - return nil - } - - // Add the remeditor conflict errors before other sync errors for more visibility. - errs = append(errs, rs.Status.Sync.Errors...) - setSyncStatus(&rs.Status.Status, errs, denominator) - - if err := client.Status().Update(ctx, &rs); err != nil { - // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. - if isRequestTooLargeError(err) { - klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return prependRootSyncRemediatorStatus(ctx, client, syncName, conflictErrs, denominator*2) - } - return status.APIServerError(err, "failed to update RootSync sync status") - } - return nil -} diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 8b5e4eb9b4..9f40065c4e 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -21,13 +21,11 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" - "kpt.dev/configsync/pkg/declared" "kpt.dev/configsync/pkg/hydrate" "kpt.dev/configsync/pkg/importer/filesystem/cmpath" "kpt.dev/configsync/pkg/metrics" "kpt.dev/configsync/pkg/status" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" - "sigs.k8s.io/controller-runtime/pkg/client" ) const ( @@ -58,34 +56,51 @@ func Run(ctx context.Context, p Parser) { // Use timers, not tickers. // Tickers can cause memory leaks and continuous execution, when execution // takes longer than the tick duration. - runTimer := time.NewTimer(opts.pollingFrequency) - defer runTimer.Stop() - resyncTimer := time.NewTimer(opts.resyncPeriod) + // Timers are sorted in order of priority and expected period duration. + resyncTimer := time.NewTimer(opts.resyncDelay) defer resyncTimer.Stop() - retryTimer := time.NewTimer(time.Second) + + runTimer := time.NewTimer(opts.pollingDelay) + defer runTimer.Stop() + + statusUpdateTimer := time.NewTimer(opts.statusUpdateDelay) + defer statusUpdateTimer.Stop() + + retryTimer := time.NewTimer(opts.retryDelay) defer retryTimer.Stop() + state := &reconcilerState{} for { select { case <-ctx.Done(): return - // it is time to reapply the configuration even if no changes have been detected - // This case should be checked first since it resets the cache + // Re-apply even if no changes have been detected. + // This case should be checked first since it resets the cache. case <-resyncTimer.C: klog.Infof("It is time for a force-resync") // Reset the cache to make sure all the steps of a parse-apply-watch loop will run. // The cached sourceState will not be reset to avoid reading all the source files unnecessarily. state.resetAllButSourceState() run(ctx, p, triggerResync, state) - resyncTimer.Reset(opts.resyncPeriod) // Schedule resync attempt - retryTimer.Reset(time.Second) // Schedule retry attempt + resyncTimer.Reset(opts.resyncDelay) // Schedule resync attempt + retryTimer.Reset(time.Second) // Schedule retry attempt - // it is time to re-import the configuration from the filesystem + // Re-import declared resources from the filesystem (from git-sync). case <-runTimer.C: run(ctx, p, triggerReimport, state) - runTimer.Reset(opts.pollingFrequency) // Schedule re-run attempt - retryTimer.Reset(time.Second) // Schedule retry attempt + runTimer.Reset(opts.pollingDelay) // Schedule re-run attempt + retryTimer.Reset(time.Second) // Schedule retry attempt + + // Update the sync status to report management conflicts (from the remediator). + // Performing the `updateSyncStatus` in this loop prevents it from + // running while the `run` function is executing. The `run` function + // handles calling `updateSyncStatus` while it's running. + case <-statusUpdateTimer.C: + if err := p.updateSyncStatus(ctx); err != nil { + klog.Warningf("failed to update sync status: %v", err) + } + statusUpdateTimer.Reset(opts.statusUpdateDelay) // it is time to check whether the last parse-apply-watch loop failed or any watches need to be updated case <-retryTimer.C: @@ -96,7 +111,7 @@ func Run(ctx context.Context, p Parser) { state.resetAllButSourceState() trigger = triggerManagementConflict // When conflict is detected, wait longer (same as the polling frequency) for the next retry. - time.Sleep(opts.pollingFrequency) + time.Sleep(opts.pollingDelay) } else if state.cache.needToRetry && state.cache.readyToRetry() { klog.Infof("The last reconciliation failed") trigger = triggerRetry @@ -108,17 +123,12 @@ func Run(ctx context.Context, p Parser) { continue } run(ctx, p, trigger, state) - retryTimer.Reset(time.Second) + retryTimer.Reset(opts.retryDelay) } } } func run(ctx context.Context, p Parser, trigger string, state *reconcilerState) { - p.SetReconciling(true) - defer func() { - p.SetReconciling(false) - }() - var syncDir cmpath.Absolute gs := sourceStatus{} gs.commit, syncDir, gs.errs = hydrate.SourceCommitAndDir(p.options().SourceType, p.options().SourceDir, p.options().SyncDir, p.options().reconcilerName) @@ -353,7 +363,7 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc // Create a new context with its cancellation function. ctxForUpdateSyncStatus, cancel := context.WithCancel(context.Background()) - go updateSyncStatus(ctxForUpdateSyncStatus, p) + go updateSyncStatusPeriodically(ctxForUpdateSyncStatus, p) start := time.Now() syncErrs := p.options().update(ctx, &state.cache) @@ -368,7 +378,7 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc lastUpdate: metav1.Now(), } if state.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, syncErrs); err != nil { + if err := p.updateSyncStatus(ctx); err != nil { syncErrs = status.Append(syncErrs, err) } else { state.syncStatus = newSyncStatus @@ -379,47 +389,23 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc return status.Append(sourceErrs, syncErrs) } -// updateSyncStatus update the sync status periodically until the cancellation function of the context is called. -func updateSyncStatus(ctx context.Context, p Parser) { - updatePeriod := 5 * time.Second +// updateSyncStatusPeriodically updates the sync status periodically until the +// context is cancelled. +func updateSyncStatusPeriodically(ctx context.Context, p Parser) { + updatePeriod := p.options().statusUpdateDelay updateTimer := time.NewTimer(updatePeriod) defer updateTimer.Stop() for { select { case <-ctx.Done(): - // ctx.Done() is closed when the cancellation function of the context is called. + // ctx.Done() is closed when the context is cancelled. return case <-updateTimer.C: - if err := p.SetSyncStatus(ctx, p.options().updater.applier.Errors()); err != nil { + if err := p.updateSyncStatus(ctx); err != nil { klog.Warningf("failed to update sync status: %v", err) } - remediatorErrs := p.options().updater.remediator.ConflictErrors() - if len(remediatorErrs) > 0 { - UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()) - } updateTimer.Reset(updatePeriod) } } } - -// UpdateConflictManagerStatus reports the conflict in the conflicting manager. -func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) { - conflictingManagerErrors := map[string][]status.ManagementConflictError{} - for _, conflictError := range conflictErrs { - conflictingManager := conflictError.ConflictingManager() - err := conflictError.ConflictingManagerError() - conflictingManagerErrors[conflictingManager] = append(conflictingManagerErrors[conflictingManager], err) - } - - for conflictingManager, conflictErrors := range conflictingManagerErrors { - scope, name := declared.ManagerScopeAndName(conflictingManager) - if scope != declared.RootReconciler { - klog.Infof("No need to notify namespace reconciler for the conflict") - continue - } - if err := prependRootSyncRemediatorStatus(ctx, k8sClient, name, conflictErrors, defaultDenominator); err != nil { - klog.Warningf("failed to add the management conflict error to RootSync %s: %v", name, err) - } - } -} diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index accdb8d77f..671ad43569 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -36,7 +36,7 @@ type updater struct { scope declared.Scope resources *declared.Resources remediator remediator.Interface - applier applier.Interface + applier applier.KptApplier } func (u *updater) needToUpdateWatch() bool { diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 5d8351ca2c..2b51a64454 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -15,7 +15,6 @@ package reconciler import ( - "context" "time" "k8s.io/klog/v2" @@ -61,12 +60,17 @@ type Options struct { SyncName string // ReconcilerName is the name of the Reconciler Deployment. ReconcilerName string - // ResyncPeriod is the period of time between forced re-sync from source (even - // without a new commit). - ResyncPeriod time.Duration - // FilesystemPollingFrequency is how often to check the local source repository for - // changes. - FilesystemPollingFrequency time.Duration + // PollingDelay is how long the parser waits between run attempts. + // Each attempt re-reads the latest configuration from the filesystem. + PollingDelay time.Duration + // RetryDelay is how long the parser waits between retries, after an error. + RetryDelay time.Duration + // ResyncDelay is how long the parser waits between full re-sync from source + // (even without a new commit). + ResyncDelay time.Duration + // StatusUpdateDelay is how long the parser waits between updates of the + // sync status, to account for management conflict errors from the remediator. + StatusUpdateDelay time.Duration // SourceRoot is the absolute path to the source repository. // Usually contains a symlink that must be resolved every time before parsing. SourceRoot cmpath.Absolute @@ -203,13 +207,13 @@ func Run(opts Options) { } if opts.ReconcilerScope == declared.RootReconciler { parser, err = parse.NewRootRunner(opts.ClusterName, opts.SyncName, opts.ReconcilerName, opts.SourceFormat, &reader.File{}, cl, - opts.FilesystemPollingFrequency, opts.ResyncPeriod, fs, discoveryClient, decls, a, rem) + opts.PollingDelay, opts.ResyncDelay, opts.RetryDelay, opts.StatusUpdateDelay, fs, discoveryClient, decls, a, rem) if err != nil { klog.Fatalf("Instantiating Root Repository Parser: %v", err) } } else { parser, err = parse.NewNamespaceRunner(opts.ClusterName, opts.SyncName, opts.ReconcilerName, opts.ReconcilerScope, &reader.File{}, cl, - opts.FilesystemPollingFrequency, opts.ResyncPeriod, fs, discoveryClient, decls, a, rem) + opts.PollingDelay, opts.ResyncDelay, opts.RetryDelay, opts.StatusUpdateDelay, fs, discoveryClient, decls, a, rem) if err != nil { klog.Fatalf("Instantiating Namespace Repository Parser: %v", err) } @@ -220,14 +224,6 @@ func Run(opts Options) { // Start the Remediator (non-blocking). doneChanForRemediator := rem.Start(ctx) - // Start the StatusUpdater (non-blocking). - ctxForUpdateStatus, cancel := context.WithCancel(context.Background()) - doneChForUpdateStatus := make(chan struct{}) - go func() { - defer close(doneChForUpdateStatus) - updateStatus(ctxForUpdateStatus, parser) - }() - // Start the Parser (blocking). // This will not return until: // - the Context is cancelled, or @@ -235,37 +231,8 @@ func Run(opts Options) { parse.Run(ctx, parser) klog.Info("Parser exited") - // Stop the StatusUpdater - cancel() - // Wait for StatusUpdater to exit - <-doneChForUpdateStatus - klog.Info("StatusUpdater exited") - // Wait for Remediator to exit <-doneChanForRemediator klog.Info("Remediator exited") klog.Info("All controllers exited") } - -// updateStatus update the status periodically until the cancellation function of the context is called. -func updateStatus(ctx context.Context, p parse.Parser) { - updatePeriod := 5 * time.Second - updateTimer := time.NewTimer(updatePeriod) - defer updateTimer.Stop() - for { - select { - case <-ctx.Done(): - // ctx.Done() is closed when the cancellation function of the context is called. - return - case <-updateTimer.C: - if !p.Reconciling() { - if err := p.SetSyncStatus(ctx, p.ApplierErrors()); err != nil { - klog.Warningf("failed to update remediator errors: %v", err) - } - parse.UpdateConflictManagerStatus(ctx, p.RemediatorConflictErrors(), p.K8sClient()) - } - // else if `p.Reconciling` is true, `parse.Run` will update the status periodically. - updateTimer.Reset(updatePeriod) - } - } -} diff --git a/pkg/reconcilermanager/constants.go b/pkg/reconcilermanager/constants.go index 3699c73691..22e0ef0714 100644 --- a/pkg/reconcilermanager/constants.go +++ b/pkg/reconcilermanager/constants.go @@ -96,10 +96,12 @@ const ( const ( // ReconcilerPollingPeriod defines how often the reconciler should poll the // filesystem for updates to the source or rendered configs. + // TODO: rename to `RECONCILER_POLLING_DELAY` to match internal name. ReconcilerPollingPeriod = "RECONCILER_POLLING_PERIOD" // HydrationPollingPeriod defines how often the hydration controller should // poll the filesystem for rendering the DRY configs. + // TODO: rename to `HYDRATION_POLLING_DELAY` to match internal name. HydrationPollingPeriod = "HYDRATION_POLLING_PERIOD" )