Skip to content

Commit

Permalink
Update Syncing condition to include all errors
Browse files Browse the repository at this point in the history
- Update the Syncing condition to reflect errors from all phases
  with the current commit. This prevents hiding/dropping errors,
  especially non-blocking errors.
- Add/clarify some comments
- Set ErrorSummary and ErrorSources to nil when empty (no errors)

Change-Id: I28d767388ef9da2f7bb6484d6bad8e903e23e461
  • Loading branch information
karlkfi committed Nov 18, 2022
1 parent 498a6da commit e851d04
Show file tree
Hide file tree
Showing 9 changed files with 983 additions and 409 deletions.
421 changes: 318 additions & 103 deletions e2e/testcases/hydration_test.go

Large diffs are not rendered by default.

16 changes: 9 additions & 7 deletions pkg/applier/kpt_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err
return SkipErrorForResource(err, id, actuation.ActuationStrategyApply)
}

// processPruneEvent handles PruneEvents from the Applier
func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error {
id := idFrom(e.Identifier)
klog.V(4).Infof("prune %v for object: %v", e.Status, id)
Expand Down Expand Up @@ -411,7 +412,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
// through annotation.
enabledObjs, disabledObjs := partitionObjs(objs)
if len(disabledObjs) > 0 {
klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs))
klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs))
disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs)
if err != nil {
a.addError(err)
Expand All @@ -422,7 +423,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
Succeeded: disabledCount,
}
}
klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs))
klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, err := toUnstructured(enabledObjs)
if err != nil {
a.addError(err)
Expand Down Expand Up @@ -460,7 +461,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
switch e.Type {
case event.InitType:
for _, ag := range e.InitEvent.ActionGroups {
klog.Info("InitEvent", ag)
klog.Info("Applier InitEvent", ag)
}
case event.ActionGroupType:
klog.Info(e.ActionGroupEvent)
Expand Down Expand Up @@ -502,7 +503,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
klog.V(4).Info(logEvent)
a.addError(processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
default:
klog.V(4).Infof("skipped %v event", e.Type)
klog.V(4).Infof("Applier skipped %v event", e.Type)
}
}

Expand All @@ -517,12 +518,12 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch

errs := a.Errors()
if errs == nil {
klog.V(4).Infof("all resources are up to date.")
klog.V(4).Infof("Applier completed without error")
}
if s.Empty() {
klog.V(4).Infof("The applier made no new progress")
klog.V(4).Infof("Applier made no new progress")
} else {
klog.Infof("The applier made new progress: %s", s.String())
klog.Infof("Applier made new progress: %s", s.String())
objStatusMap.Log(klog.V(0))
}
return gvks, errs
Expand Down Expand Up @@ -558,6 +559,7 @@ func (a *Applier) invalidateErrors() {
a.errs = nil
}

// Apply all managed resource objects and return any errors.
// Apply implements Interface.
func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
a.applyMux.Lock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Resources struct {
objectSet map[core.ID]*unstructured.Unstructured
}

// Update performs an atomic update on the resource declaration set.
// Update performs an atomic update on the resource declaration set, converts
// the objects to Unstructured and validating that not all namespaces are
// deleted at once.
func (r *Resources) Update(ctx context.Context, objects []client.Object) ([]client.Object, status.Error) {
// First build up the new map using a local pointer/reference.
newSet := make(map[core.ID]*unstructured.Unstructured)
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/tagkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func StatusTagKey(err error) string {
// StatusTagValueFromSummary returns error if the summary indicates at least 1
// error, otherwise success.
func StatusTagValueFromSummary(summary *v1beta1.ErrorSummary) string {
if summary.TotalCount == 0 {
if summary == nil || summary.TotalCount == 0 {
return StatusSuccess
}
return StatusError
Expand Down
51 changes: 30 additions & 21 deletions pkg/parse/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
)

// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo.
// TODO: replace with builder pattern to avoid too many arguments.
func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingPeriod, resyncPeriod, retryPeriod, statusUpdatePeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) {
converter, err := declared.NewValueConverter(dc)
if err != nil {
Expand Down Expand Up @@ -161,13 +162,14 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so
currentRS := rs.DeepCopy()

setSourceStatusFields(&rs.Status.Source, p, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

continueSyncing := (rs.Status.Source.ErrorSummary.TotalCount == 0)
var errorSource []v1beta1.ErrorSource
if len(rs.Status.Source.Errors) > 0 {
errorSource = []v1beta1.ErrorSource{v1beta1.SourceError}
}
reposync.SetSyncing(&rs, continueSyncing, "Source", "Source", newStatus.commit, errorSource, rs.Status.Source.ErrorSummary, newStatus.lastUpdate)
// Syncing is stopped/prevented if there are any errors from the same commit.
// TODO: Handle non-blocking errors
syncing := (syncErrorCount == 0)

reposync.SetSyncing(&rs, syncing, "Source", "Source", newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Source.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -191,7 +193,8 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so
if err := p.client.Status().Update(ctx, &rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setSourceStatusWithRetries(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, "failed to update RepoSync source status from parser")
Expand All @@ -201,7 +204,7 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so

// setRenderingStatus implements the Parser interface
func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error {
if oldStatus.equal(newStatus) {
if oldStatus.Equal(newStatus) {
return nil
}

Expand All @@ -223,13 +226,14 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus
currentRS := rs.DeepCopy()

setRenderingStatusFields(&rs.Status.Rendering, p, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

continueSyncing := (rs.Status.Rendering.ErrorSummary.TotalCount == 0)
var errorSource []v1beta1.ErrorSource
if len(rs.Status.Rendering.Errors) > 0 {
errorSource = []v1beta1.ErrorSource{v1beta1.RenderingError}
}
reposync.SetSyncing(&rs, continueSyncing, "Rendering", newStatus.message, newStatus.commit, errorSource, rs.Status.Rendering.ErrorSummary, newStatus.lastUpdate)
// Syncing is stopped/prevented if there are any errors from the same commit.
// TODO: Handle non-blocking errors
syncing := (syncErrorCount == 0)

reposync.SetSyncing(&rs, syncing, "Rendering", newStatus.message, newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Rendering.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -253,7 +257,8 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus
if err := p.client.Status().Update(ctx, &rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setRenderingStatusWithRetires(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, "failed to update RepoSync rendering status from parser")
Expand Down Expand Up @@ -283,16 +288,19 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus sync
currentRS := rs.DeepCopy()

setSyncStatusFields(&rs.Status.Status, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync)
var message string
if newStatus.syncing {
reposync.SetSyncing(rs, true, "Sync", "Syncing", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)
message = "Syncing"
} else {
if errorSummary.TotalCount == 0 {
message = "Sync Completed"
if syncErrorCount == 0 {
rs.Status.LastSyncedCommit = rs.Status.Sync.Commit
}
reposync.SetSyncing(rs, false, "Sync", "Sync Completed", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)
}
reposync.SetSyncing(rs, newStatus.syncing, "Sync", message, rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Sync.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -312,14 +320,15 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus sync
}

if klog.V(5).Enabled() {
klog.Infof("Updating status for RepoSync %s/%s:\nDiff (- Expected, + Actual):\n%s",
klog.Infof("Updating sync status for RepoSync %s/%s:\nDiff (- Old, + New):\n%s",
rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status))
}

if err := p.client.Status().Update(ctx, rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setSyncStatusWithRetries(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope))
Expand Down
Loading

0 comments on commit e851d04

Please sign in to comment.