diff --git a/pkg/applier/kpt_applier.go b/pkg/applier/kpt_applier.go index 51e4f93517..c1ba4fb627 100644 --- a/pkg/applier/kpt_applier.go +++ b/pkg/applier/kpt_applier.go @@ -74,30 +74,37 @@ type Applier struct { client client.Client // configFlags for creating clients configFlags *genericclioptions.ConfigFlags - // errs tracks all the errors the applier encounters. - // This field is cleared at the start of the `Applier.Apply` method - errs status.MultiError - // syncing indicates whether the applier is syncing. - syncing bool // syncKind is the Kind of the RSync object: RootSync or RepoSync syncKind string // syncName is the name of RSync object syncName string // syncNamespace is the namespace of RSync object syncNamespace string - // mux is an Applier-level mutext to prevent concurrent Apply() and Refresh() - mux sync.Mutex // statusMode controls if the applier injects the acutation status into the // ResourceGroup object statusMode string // reconcileTimeout controls the reconcile and prune timeout reconcileTimeout time.Duration + + // syncingLock protects reading `syncing` while changing its value. + syncingLock sync.RWMutex + // syncing indicates whether the applier or destroyer is running. + syncing bool + + // errsLock protects reading `errs` while changing its value. + errsLock sync.RWMutex + // errs tracks all the errors the applier encounters. + // This field is cleared at the start of an Apply + errs status.MultiError + + // syncLock prevents multiple Apply methods from running concurrently. + syncLock sync.Mutex } -// Interface is a fake-able subset of the interface Applier implements. +// KptApplier is a fake-able subset of the interface Applier implements. // // Placed here to make discovering the production implementation (above) easier. -type Interface interface { +type KptApplier interface { // Apply updates the resource API server with the latest parsed git resource. // This is called when a new change in the git resource is detected. It also // returns a map of the GVKs which were successfully applied by the Applier. @@ -108,7 +115,7 @@ type Interface interface { Syncing() bool } -var _ Interface = &Applier{} +var _ KptApplier = &Applier{} // NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from // the API server. @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err return SkipErrorForResource(err, id, actuation.ActuationStrategyApply) } +// processPruneEvent handles PruneEvents from the Applier func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error { id := idFrom(e.Identifier) klog.V(4).Infof("prune %v for object: %v", e.Status, id) @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client) } } -// sync triggers a kpt live apply library call to apply a set of resources. -func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { +// applyInner triggers a kpt live apply library call to apply a set of resources. +func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} { cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode) if err != nil { - return nil, Error(err) + a.addError(err) + return nil } a.checkInventoryObjectSize(ctx, cs.client) @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // through annotation. enabledObjs, disabledObjs := partitionObjs(objs) if len(disabledObjs) > 0 { - klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs)) + klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs)) disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs) if err != nil { - a.errs = status.Append(a.errs, err) - return nil, a.errs + a.addError(err) + return nil } s.DisableObjs = &stats.DisabledObjStats{ Total: uint64(len(disabledObjs)), Succeeded: disabledCount, } } - klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs)) - resources, toUnsErrs := toUnstructured(enabledObjs) - if toUnsErrs != nil { - return nil, toUnsErrs + klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs)) + resources, err := toUnstructured(enabledObjs) + if err != nil { + a.addError(err) + return nil } unknownTypeResources := make(map[core.ID]struct{}) @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // This allows for picking up CRD changes. mapper, err := a.configFlags.ToRESTMapper() if err != nil { - return nil, status.Append(nil, err) + a.addError(err) + return nil } meta.MaybeResetRESTMapper(mapper) @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr switch e.Type { case event.InitType: for _, ag := range e.InitEvent.ActionGroups { - klog.Info("InitEvent", ag) + klog.Info("Applier InitEvent", ag) } case event.ActionGroupType: klog.Info(e.ActionGroupEvent) case event.ErrorType: klog.V(4).Info(e.ErrorEvent) if util.IsRequestTooLargeError(e.ErrorEvent.Err) { - a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) + a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory))) } else { - a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err)) + a.addError(e.ErrorEvent.Err) } s.ErrorTypeEvents++ case event.WaitType: @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr // a reconciled object may become pending before a wait task times out. // Record the objs that have been reconciled. klog.V(4).Info(e.WaitEvent) - a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)) + err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap) + if err != nil { + a.addError(err) + } case event.ApplyType: logEvent := event.ApplyEvent{ GroupName: e.ApplyEvent.GroupName, @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.ApplyEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)) + err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources) + if err != nil { + a.addError(err) + } case event.PruneType: logEvent := event.PruneEvent{ GroupName: e.PruneEvent.GroupName, @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr Error: e.PruneEvent.Error, } klog.V(4).Info(logEvent) - a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)) + err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs) + if err != nil { + a.addError(err) + } default: - klog.V(4).Infof("skipped %v event", e.Type) + klog.V(4).Infof("Applier skipped %v event", e.Type) } } @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr } gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{} } - if a.errs == nil { - klog.V(4).Infof("all resources are up to date.") + if a.Errors() == nil { + klog.V(4).Infof("Applier completed without error") } if s.Empty() { - klog.V(4).Infof("The applier made no new progress") + klog.V(4).Infof("Applier made no new progress") } else { - klog.Infof("The applier made new progress: %s", s.String()) + klog.Infof("Applier made new progress: %s", s.String()) objStatusMap.Log(klog.V(0)) } - return gvks, a.errs + return gvks } -// Errors implements Interface. // Errors returns the errors encountered during apply. +// Errors implements Interface. func (a *Applier) Errors() status.MultiError { + a.errsLock.RLock() + defer a.errsLock.RUnlock() return a.errs } -// Syncing implements Interface. +func (a *Applier) addError(err error) { + // Ignore nil errors, to allow the caller to skip the nil check + if err == nil { + return + } + a.errsLock.Lock() + defer a.errsLock.Unlock() + + // Default to an applier.Error + _, isStatusErr := err.(status.Error) + _, isMultiErr := err.(status.MultiError) + if !isStatusErr && !isMultiErr { + err = Error(err) + } + a.errs = status.Append(a.errs, err) +} + +func (a *Applier) clearErrors() { + a.errsLock.Lock() + defer a.errsLock.Unlock() + a.errs = nil +} + // Syncing returns whether the applier is syncing. +// Syncing implements Interface. func (a *Applier) Syncing() bool { + a.syncingLock.RLock() + defer a.syncingLock.RUnlock() return a.syncing } +func (a *Applier) setSyncing(syncing bool) { + a.syncingLock.Lock() + defer a.syncingLock.Unlock() + a.syncing = syncing +} + +// Apply all managed resource objects and return any errors. // Apply implements Interface. func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) { - // Clear the `errs` field at the start. - a.errs = nil - // Set the `syncing` field to `true` at the start. - a.syncing = true - - defer func() { - // Make sure to clear the `syncing` field before `Apply` returns. - a.syncing = false - }() - - a.mux.Lock() - defer a.mux.Unlock() - - // Pull the actual resources from the API server to compare against the - // declared resources. Note that we do not immediately return on error here - // because the Applier needs to try to do as much work as it can on each - // cycle. We collect and return all errors at the end. Some of those errors - // are transient and resolve in future cycles based on partial work completed - // in a previous cycle (eg ignore an error about a CR so that we can apply the - // CRD, then a future cycle is able to apply the CR). - // TODO: Here and elsewhere, pass the MultiError as a parameter. - return a.sync(ctx, desiredResource) + a.syncLock.Lock() + defer a.syncLock.Unlock() + + a.setSyncing(true) + defer a.setSyncing(false) + + // Clear the errors at the start of a new apply. + // TODO: Keep track of old errors and new errors and merge them, until applyInner returns. + a.clearErrors() + + objectStatusMap := a.applyInner(ctx, desiredResource) + return objectStatusMap, a.Errors() } // newInventoryUnstructured creates an inventory object as an unstructured. diff --git a/pkg/applier/kpt_applier_test.go b/pkg/applier/kpt_applier_test.go index ab8cb0944d..1746f35411 100644 --- a/pkg/applier/kpt_applier_test.go +++ b/pkg/applier/kpt_applier_test.go @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct return events } -func TestSync(t *testing.T) { +func TestApply(t *testing.T) { deploymentObj := newDeploymentObj() deploymentID := object.UnstructuredToObjMetadata(deploymentObj) @@ -263,7 +263,7 @@ func TestSync(t *testing.T) { } else { applier.clientSetFunc = applierFunc var gvks map[schema.GroupVersionKind]struct{} - gvks, errs = applier.sync(context.Background(), objs) + gvks, errs = applier.Apply(context.Background(), objs) if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" { t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff) } diff --git a/pkg/parse/namespace.go b/pkg/parse/namespace.go index 10be9df7a3..0c534edefd 100644 --- a/pkg/parse/namespace.go +++ b/pkg/parse/namespace.go @@ -41,7 +41,7 @@ import ( ) // NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo. -func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -271,13 +271,13 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RepoSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *namespace) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +func (p *namespace) SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() - return p.setSyncStatusWithRetries(ctx, errs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, errs, defaultDenominator) } -func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *namespace) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -289,9 +289,6 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -330,13 +327,18 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu // If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope)) } return nil } +// ApplierSyncing returns true if the applier is syncing +func (p *namespace) ApplierSyncing() bool { + return p.applier.Syncing() +} + // ApplierErrors implements the Parser interface func (p *namespace) ApplierErrors() status.MultiError { return p.applier.Errors() diff --git a/pkg/parse/opts.go b/pkg/parse/opts.go index 134a9f24a0..637f125008 100644 --- a/pkg/parse/opts.go +++ b/pkg/parse/opts.go @@ -75,7 +75,7 @@ type Parser interface { parseSource(ctx context.Context, state sourceState) ([]ast.FileObject, status.MultiError) setSourceStatus(ctx context.Context, newStatus sourceStatus) error setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error - SetSyncStatus(ctx context.Context, errs status.MultiError) error + SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error options() *opts // SetReconciling sets the field indicating whether the reconciler is reconciling a change. SetReconciling(value bool) @@ -83,6 +83,8 @@ type Parser interface { Reconciling() bool // ApplierErrors returns the errors surfaced by the applier. ApplierErrors() status.MultiError + // ApplierSyncing returns true if the Applier is syncing + ApplierSyncing() bool // RemediatorConflictErrors returns the conflict errors detected by the remediator. RemediatorConflictErrors() []status.ManagementConflictError // K8sClient returns the Kubernetes client that talks to the API server. diff --git a/pkg/parse/root.go b/pkg/parse/root.go index 7218489d51..2d2c99577f 100644 --- a/pkg/parse/root.go +++ b/pkg/parse/root.go @@ -51,7 +51,7 @@ import ( ) // NewRootRunner creates a new runnable parser for parsing a Root repository. -func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) { +func NewRootRunner(clusterName, syncName, reconcilerName string, format filesystem.SourceFormat, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) { converter, err := declared.NewValueConverter(dc) if err != nil { return nil, err @@ -360,7 +360,7 @@ func setRenderingStatus(rendering *v1beta1.RenderingStatus, p Parser, newStatus // SetSyncStatus implements the Parser interface // SetSyncStatus sets the RootSync sync status. // `errs` includes the errors encountered during the apply step; -func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error { +func (p *root) SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error { p.mux.Lock() defer p.mux.Unlock() var allErrs status.MultiError @@ -370,10 +370,10 @@ func (p *root) SetSyncStatus(ctx context.Context, errs status.MultiError) error } // Add conflicting errors before other apply errors. allErrs = status.Append(allErrs, errs) - return p.setSyncStatusWithRetries(ctx, allErrs, defaultDenominator) + return p.setSyncStatusWithRetries(ctx, syncing, allErrs, defaultDenominator) } -func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error { +func (p *root) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error { if denominator <= 0 { return fmt.Errorf("The denominator must be a positive number") } @@ -385,9 +385,6 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr currentRS := rs.DeepCopy() - // syncing indicates whether the applier is syncing. - syncing := p.applier.Syncing() - setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator) errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync) @@ -426,7 +423,7 @@ func (p *root) setSyncStatusWithRetries(ctx context.Context, errs status.MultiEr // If the update failure was caused by the size of the RootSync object, we would truncate the errors and retry. if isRequestTooLargeError(err) { klog.Infof("Failed to update RootSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err) - return p.setSyncStatusWithRetries(ctx, errs, denominator*2) + return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2) } return status.APIServerError(err, "failed to update RootSync sync status") } @@ -533,6 +530,11 @@ func (p *root) addImplicitNamespaces(objs []ast.FileObject) ([]ast.FileObject, s return objs, errs } +// ApplierSyncing returns true if the applier is syncing +func (p *root) ApplierSyncing() bool { + return p.applier.Syncing() +} + // ApplierErrors implements the Parser interface func (p *root) ApplierErrors() status.MultiError { return p.applier.Errors() diff --git a/pkg/parse/run.go b/pkg/parse/run.go index 8b5e4eb9b4..462c1eddc5 100644 --- a/pkg/parse/run.go +++ b/pkg/parse/run.go @@ -28,6 +28,7 @@ import ( "kpt.dev/configsync/pkg/status" webhookconfiguration "kpt.dev/configsync/pkg/webhook/configuration" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/kind/pkg/errors" ) const ( @@ -368,12 +369,22 @@ func parseAndUpdate(ctx context.Context, p Parser, trigger string, state *reconc lastUpdate: metav1.Now(), } if state.needToSetSyncStatus(newSyncStatus) { - if err := p.SetSyncStatus(ctx, syncErrs); err != nil { + // Set Syncing=False because the Applier has stopped. + if err := p.SetSyncStatus(ctx, false, syncErrs); err != nil { syncErrs = status.Append(syncErrs, err) } else { state.syncStatus = newSyncStatus state.syncingConditionLastUpdate = newSyncStatus.lastUpdate } + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() + if len(remediatorErrs) > 0 { + if err := UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + syncErrs = status.Append(syncErrs, err) + } + } + // TODO: Am I suppossed to update state.syncStatus or state.syncingConditionLastUpdate again? } return status.Append(sourceErrs, syncErrs) @@ -391,12 +402,16 @@ func updateSyncStatus(ctx context.Context, p Parser) { return case <-updateTimer.C: - if err := p.SetSyncStatus(ctx, p.options().updater.applier.Errors()); err != nil { + if err := p.SetSyncStatus(ctx, p.ApplierSyncing(), p.ApplierErrors()); err != nil { klog.Warningf("failed to update sync status: %v", err) } - remediatorErrs := p.options().updater.remediator.ConflictErrors() + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() if len(remediatorErrs) > 0 { - UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()) + if err := UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + klog.Warningf("failed to update sync status with remediator conflicts: %v", err) + } } updateTimer.Reset(updatePeriod) } @@ -404,7 +419,7 @@ func updateSyncStatus(ctx context.Context, p Parser) { } // UpdateConflictManagerStatus reports the conflict in the conflicting manager. -func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) { +func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.ManagementConflictError, k8sClient client.Client) error { conflictingManagerErrors := map[string][]status.ManagementConflictError{} for _, conflictError := range conflictErrs { conflictingManager := conflictError.ConflictingManager() @@ -419,7 +434,8 @@ func UpdateConflictManagerStatus(ctx context.Context, conflictErrs []status.Mana continue } if err := prependRootSyncRemediatorStatus(ctx, k8sClient, name, conflictErrors, defaultDenominator); err != nil { - klog.Warningf("failed to add the management conflict error to RootSync %s: %v", name, err) + return errors.Wrapf(err, "prepending management conflict errors to RootSync %s", name) } } + return nil } diff --git a/pkg/parse/updater.go b/pkg/parse/updater.go index accdb8d77f..671ad43569 100644 --- a/pkg/parse/updater.go +++ b/pkg/parse/updater.go @@ -36,7 +36,7 @@ type updater struct { scope declared.Scope resources *declared.Resources remediator remediator.Interface - applier applier.Interface + applier applier.KptApplier } func (u *updater) needToUpdateWatch() bool { diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 5d8351ca2c..11f7f694f1 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -259,10 +259,17 @@ func updateStatus(ctx context.Context, p parse.Parser) { return case <-updateTimer.C: if !p.Reconciling() { - if err := p.SetSyncStatus(ctx, p.ApplierErrors()); err != nil { + if err := p.SetSyncStatus(ctx, p.ApplierSyncing(), p.ApplierErrors()); err != nil { klog.Warningf("failed to update remediator errors: %v", err) } - parse.UpdateConflictManagerStatus(ctx, p.RemediatorConflictErrors(), p.K8sClient()) + // Prepend any remediator errors + // TODO: combine this with SetSyncStatus so it's a single update + remediatorErrs := p.RemediatorConflictErrors() + if len(remediatorErrs) > 0 { + if err := parse.UpdateConflictManagerStatus(ctx, remediatorErrs, p.K8sClient()); err != nil { + klog.Warningf("failed to update sync status with remediator conflicts: %v", err) + } + } } // else if `p.Reconciling` is true, `parse.Run` will update the status periodically. updateTimer.Reset(updatePeriod)