From 9124c33511ebec5e4d6e361fd902b5f6eff20c77 Mon Sep 17 00:00:00 2001 From: Karl Isenberg Date: Thu, 3 Nov 2022 16:16:44 -0700 Subject: [PATCH] Clean up Applier - Uses locks to protect the MultiError, Sycning, and Apply method independently. This is a hack to imporve threadsafety without changing the Applier interface. More work will probably be needed to actually fix the status inconsistency issues. - Modify all the places calling SetSyncStatus to also call UpdateConflictManagerStatus. This series of multiple updates is still a mess that needs more work, but at least it should be consistently flakey now. Every status update will remove the conflict errors, and then the conflict errors are prepended back in with another update. - Rename applier.Interface to KptApplier to make way for the KptDestroyer interface later. - TODO: I still don't understand why only RootSyncs have conflict errors from the remediator added to their status... Change-Id: I1c3373f98bb5d842ce113ba61dc0c87e1119c665 --- pkg/applier/kpt_applier.go | 158 ++++++++++++++++++++------------ pkg/applier/kpt_applier_test.go | 4 +- pkg/parse/namespace.go | 18 ++-- pkg/parse/opts.go | 4 +- pkg/parse/root.go | 18 ++-- pkg/parse/run.go | 28 ++++-- pkg/parse/updater.go | 2 +- pkg/reconciler/reconciler.go | 11 ++- 8 files changed, 158 insertions(+), 85 deletions(-) diff --git a/pkg/applier/kpt_applier.go b/pkg/applier/kpt_applier.go index 51e4f93517..c1ba4fb627 100644 --- a/pkg/applier/kpt_applier.go +++ b/pkg/applier/kpt_applier.go @@ -74,30 +74,37 @@ type Applier struct { client client.Client // configFlags for creating clients configFlags *genericclioptions.ConfigFlags - // errs tracks all the errors the applier encounters. - // This field is cleared at the start of the `Applier.Apply` method - errs status.MultiError - // syncing indicates whether the applier is syncing. - syncing bool // syncKind is the Kind of the RSync object: RootSync or RepoSync syncKind string // syncName is the name of RSync object syncName string // syncNamespace is the namespace of RSync object syncNamespace string - // mux is an Applier-level mutext to prevent concurrent Apply() and Refresh() - mux sync.Mutex // statusMode controls if the applier injects the acutation status into the // ResourceGroup object statusMode string // reconcileTimeout controls the reconcile and prune timeout reconcileTimeout time.Duration + + // syncingLock protects reading `syncing` while changing its value. + syncingLock sync.RWMutex + // syncing indicates whether the applier or destroyer is running. + syncing bool + + // errsLock protects reading `errs` while changing its value. + errsLock sync.RWMutex + // errs tracks all the errors the applier encounters. + // This field is cleared at the start of an Apply + errs status.MultiError + + // syncLock prevents multiple Apply methods from running concurrently. + syncLock sync.Mutex } -// Interface is a fake-able subset of the interface Applier implements. +// KptApplier is a fake-able subset of the interface Applier implements. // // Placed here to make discovering the production implementation (above) easier. -type Interface interface { +type KptApplier interface { // Apply updates the resource API server with the latest parsed git resource. // This is called when a new change in the git resource is detected. It also // returns a map of the GVKs which were successfully applied by the Applier. @@ -108,7 +115,7 @@ type Interface interface { Syncing() bool } -var _ Interface = &Applier{} +var _ KptApplier = &Applier{} // NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from // the API server. @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err return SkipErrorForResource(err, id, actuation.ActuationStrategyApply) } +// processPruneEvent handles PruneEvents from the Applier func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error { id := idFrom(e.Identifier) klog.V(4).Infof("prune %v for object: %v", e.Status, id) @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client) } } -// sync triggers a kpt live apply library call to apply a set of resources. -func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { +// applyInner triggers a kpt live apply library call to apply a set of resources. +func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} { cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode) if err != nil { - return nil, Error(err) + a.addError(err) + return nil } a.checkInventoryObjectSize(ctx, cs.client) @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // through annotation. enabledObjs, disabledObjs := partitionObjs(objs) if len(disabledObjs) > 0 { - klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs)) + klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs)) disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs) if err != nil { - a.errs = status.Append(a.errs, err) - return nil, a.errs + a.addError(err) + return nil } s.DisableObjs = &stats.DisabledObjStats{ Total: uint64(len(disabledObjs)), Succeeded: disabledCount, } } - klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs)) - resources, toUnsErrs := toUnstructured(enabledObjs) - if toUnsErrs != nil { - return nil, toUnsErrs + klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs)) + resources, err := toUnstructured(enabledObjs) + if err != nil { + a.addError(err) + return nil } unknownTypeResources := make(map[core.ID]struct{}) @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // This allows for picking up CRD changes. mapper, err := a.configFlags.ToRESTMapper() if err != nil { - return nil, status.Append(nil, err) + a.addError(err) + return nil } meta.MaybeResetRESTMapper(mapper) @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr switch e.Type { case event.InitType: for _, ag := range e.InitEvent.ActionGroups { - klog.Info("InitEvent", ag) + klog.Info("Applier InitEvent", ag) } case event.ActionGroupType: klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.V(4).Info(e.ErrorEvent) if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) + a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) } else { - a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err)) + a.addError(e.ErrorEvent.Err) } s.ErrorTypeEvents++ case event.WaitType: @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // a reconciled object may become pending before a wait task times out. // Record the objs that have been reconciled. klog.V(4).Info(e.WaitEvent) - a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap) + if err != nil { + a.addError(err) + } case event.ApplyType: logEvent := event.ApplyEvent{ GroupName: e.ApplyEvent.GroupName, @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.ApplyEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)) + err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources) + if err != nil { + a.addError(err) + } case event.PruneType: logEvent := event.PruneEvent{ GroupName: e.PruneEvent.GroupName, @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.PruneEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)) + err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs) + if err != nil { + a.addError(err) + } default: - klog.V(4).Infof("skipped %v event", e.Type) + klog.V(4).Infof("Applier skipped %v event", e.Type) } } @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr } gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{} } - if a.errs == nil { - klog.V(4).Infof("all resources are up to date.") + if a.Errors() == nil { + klog.V(4).Infof("Applier completed without error") } if s.Empty() { - klog.V(4).Infof("The applier made no new progress") + klog.V(4).Infof("Applier made no new progress") } else { - klog.Infof("The applier made new progress: %s", s.String()) + klog.Infof("Applier made new progress: %s", s.String()) objStatusMap.Log(klog.V(0)) } - return gvks, a.errs + return gvks } -// Errors implements Interface. // Errors returns the errors encountered during apply. +// Errors implements Interface. func (a *Applier) Errors() status.MultiError { + a.errsLock.RLock() + defer a.errsLock.RUnlock() return a.errs } -// Syncing implements Interface. +func (a *Applier) addError(err error) { + // Ignore nil errors, to allow the caller to skip the nil check + if err == nil { + return + } + a.errsLock.Lock() + defer a.errsLock.Unlock() + + // Default to an applier.Error + _, isStatusErr := err.(status.Error) + _, isMultiErr := err.(status.MultiError) + if !isStatusErr && !isMultiErr { + err = Error(err) + } + a.errs = status.Append(a.errs, err) +} + +func (a *Applier) clearErrors() { + a.errsLock.Lock() + defer a.errsLock.Unlock() + a.errs = nil +} + // Syncing returns whether the applier is syncing. +// Syncing implements Interface. func (a *Applier) Syncing() bool { + a.syncingLock.RLock() + defer a.syncingLock.RUnlock() return a.syncing } +func (a *Applier) setSyncing(syncing bool) { + a.syncingLock.Lock() + defer a.syncingLock.Unlock() + a.syncing = syncing +} + +// Apply all managed resource objects and return any errors. // Apply implements Interface. func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { - // Clear the `errs` field at the start. - a.errs = nil - // Set the `syncing` field to `true` at the start. - a.syncing = true - - defer func() { - // Make sure to clear the `syncing` field before `Apply` returns. - a.syncing = false - }() - - a.mux.Lock() - defer a.mux.Unlock() - - // Pull the actual resources from the API server to compare against the - // declared resources. Note that we do not immediately return on error here - // because the Applier needs to try to do as much work as it can on each - // cycle. We collect and return all errors at the end. Some of those errors - // are transient and resolve in future cycles based on partial work completed - // in a previous cycle (eg ignore an error about a CR so that we can apply the - // CRD, then a future cycle is able to apply the CR). - // TODO: Here and elsewhere, pass the MultiError as a parameter. - return a.sync(ctx, desiredResource) + a.syncLock.Lock() + defer a.syncLock.Unlock() + + a.setSyncing(true) + defer a.setSyncing(false) + + // Clear the errors at the start of a new apply. + // TODO: Keep track of old errors and new errors and merge them, until applyInner returns. + a.clearErrors() + + objectStatusMap := a.applyInner(ctx, desiredResource) + return objectStatusMap, a.Errors() } // newInventoryUnstructured creates an inventory object as an unstructured. diff --git a/pkg/applier/kpt_applier_test.go b/pkg/applier/kpt_applier_test.go index ab8cb0944d..1746f35411 100644 --- a/pkg/applier/kpt_applier_test.go +++ b/pkg/applier/kpt_applier_test.go @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct return events } -func TestSync(t *testing.T) { +func TestApply(t *testing.T) { deploymentObj := newDeploymentObj() deploymentID := object.UnstructuredToObjMetadata(deploymentObj) @@ -263,7 +263,7 @@ func TestSync(t *testing.T) { } else { applier.clientSetFunc = applierFunc var gvks map[schema.GroupVersionKind]struct{} - gvks, errs = applier.sync(context.Background(), objs) + gvks, errs = applier.Apply(context.Background(), objs) if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff) } diff --git a/pkg/parse/namespace.go b/pkg/parse/namespace.go index 10be9df7a3..0c534edefd 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/namespace.go @@ -41,7 +41,7 @@ import ( ) // NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -271,13 +271,13 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RepoSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +func (p *namespace) SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() - return p.setSyncStatusWithRetries(ctx, errs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, errs, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *namespace) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -289,9 +289,6 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -330,13 +327,18 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope)) } return nil } +// ApplierSyncing returns true if the applier is syncing +func (p *namespace) ApplierSyncing() bool { + return p.applier.Syncing() +} + // ApplierErrors implements the Parser interface func (p *namespace) ApplierErrors() status.MultiError { return p.applier.Errors() diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 134a9f24a0..637f125008 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -75,7 +75,7 @@ type Parser interface { parseSource(ctx context.Context, state sourceState) ([]ast.FileObject, status.MultiError) setSourceStatus(ctx context.Context, newStatus sourceStatus) error setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error - SetSyncStatus(ctx context.Context, errs status.MultiError) error + SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error options() *opts // SetReconciling sets the field indicating whether the reconciler is reconciling a change. SetReconciling(value bool) @@ -83,6 +83,8 @@ type Parser interface { Reconciling() bool // ApplierErrors returns the errors surfaced by the applier. ApplierErrors() status.MultiError + // ApplierSyncing returns true if the Applier is syncing + ApplierSyncing() bool // RemediatorConflictErrors returns the conflict errors detected by the remediator. RemediatorConflictErrors() []status.ManagementConflictError // K8sClient returns the Kubernetes client that talks to the API server. diff --git a/pkg/parse/root.go b/pkg/parse/root.go index 7218489d51..2d2c99577f 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root.go @@ -51,7 +51,7 @@ import ( ) // NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -360,7 +360,7 @@ func setRenderingStatus(rendering *v1beta1.RenderingStatus, p Parser, newStatus // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RootSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +func (p *root) SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() var allErrs status.MultiError @@ -370,10 +370,10 @@ func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error } // Add conflicting errors before other apply errors. allErrs = status.Append(allErrs, errs) - return p.setSyncStatusWithRetries(ctx, allErrs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, allErrs, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *root) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -385,9 +385,6 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -426,7 +423,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, "failed to update RootSync sync status") } @@ -533,6 +530,11 @@ func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, s return objs, errs } +// ApplierSyncing returns true if the applier is syncing +func (p *root) ApplierSyncing() bool { + return p.applier.Syncing() +} + // ApplierErrors implements the Parser interface func (p *root) ApplierErrors() status.MultiError { return p.applier.Errors() diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 8b5e4eb9b4..462c1eddc5 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -28,6 +28,7 @@ import ( "kpt.dev/configsync/pkg/status" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kind/pkg/errors" ) const ( @@ -368,12 +369,22 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc lastUpdate: metav1.Now(), } if state.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, syncErrs); err != nil { + // Set Syncing=False because the Applier has stopped. + if err := p.SetSyncStatus(ctx, false, syncErrs); err != nil { syncErrs = status.Append(syncErrs, err) } else { state.syncStatus = newSyncStatus state.syncingConditionLastUpdate = newSyncStatus.lastUpdate } + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() + if len(remediatorErrs) > 0 { + if err := UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + syncErrs = status.Append(syncErrs, err) + } + } + // TODO: Am I suppossed to update state.syncStatus or state.syncingConditionLastUpdate again? } return status.Append(sourceErrs, syncErrs) @@ -391,12 +402,16 @@ func updateSyncStatus(ctx context.Context, p Parser) { return case <-updateTimer.C: - if err := p.SetSyncStatus(ctx, p.options().updater.applier.Errors()); err != nil { + if err := p.SetSyncStatus(ctx, p.ApplierSyncing(), p.ApplierErrors()); err != nil { klog.Warningf("failed to update sync status: %v", err) } - remediatorErrs := p.options().updater.remediator.ConflictErrors() + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() if len(remediatorErrs) > 0 { - UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()) + if err := UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + klog.Warningf("failed to update sync status with remediator conflicts: %v", err) + } } updateTimer.Reset(updatePeriod) } @@ -404,7 +419,7 @@ func updateSyncStatus(ctx context.Context, p Parser) { } // UpdateConflictManagerStatus reports the conflict in the conflicting manager. -func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) { +func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) error { conflictingManagerErrors := map[string][]status.ManagementConflictError{} for _, conflictError := range conflictErrs { conflictingManager := conflictError.ConflictingManager() @@ -419,7 +434,8 @@ func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.Mana continue } if err := prependRootSyncRemediatorStatus(ctx, k8sClient, name, conflictErrors, defaultDenominator); err != nil { - klog.Warningf("failed to add the management conflict error to RootSync %s: %v", name, err) + return errors.Wrapf(err, "prepending management conflict errors to RootSync %s", name) } } + return nil } diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index accdb8d77f..671ad43569 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -36,7 +36,7 @@ type updater struct { scope declared.Scope resources *declared.Resources remediator remediator.Interface - applier applier.Interface + applier applier.KptApplier } func (u *updater) needToUpdateWatch() bool { diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 5d8351ca2c..11f7f694f1 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -259,10 +259,17 @@ func updateStatus(ctx context.Context, p parse.Parser) { return case <-updateTimer.C: if !p.Reconciling() { - if err := p.SetSyncStatus(ctx, p.ApplierErrors()); err != nil { + if err := p.SetSyncStatus(ctx, p.ApplierSyncing(), p.ApplierErrors()); err != nil { klog.Warningf("failed to update remediator errors: %v", err) } - parse.UpdateConflictManagerStatus(ctx, p.RemediatorConflictErrors(), p.K8sClient()) + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() + if len(remediatorErrs) > 0 { + if err := parse.UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + klog.Warningf("failed to update sync status with remediator conflicts: %v", err) + } + } } // else if `p.Reconciling` is true, `parse.Run` will update the status periodically. updateTimer.Reset(updatePeriod)