Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Update Syncing condition to include all errors #260

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions pkg/applier/kpt_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err
return SkipErrorForResource(err, id, actuation.ActuationStrategyApply)
}

// processPruneEvent handles PruneEvents from the Applier
func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error {
id := idFrom(e.Identifier)
klog.V(4).Infof("prune %v for object: %v", e.Status, id)
Expand Down Expand Up @@ -411,7 +412,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
// through annotation.
enabledObjs, disabledObjs := partitionObjs(objs)
if len(disabledObjs) > 0 {
klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs))
klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs))
disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs)
if err != nil {
a.addError(err)
Expand All @@ -422,7 +423,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
Succeeded: disabledCount,
}
}
klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs))
klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, err := toUnstructured(enabledObjs)
if err != nil {
a.addError(err)
Expand Down Expand Up @@ -460,7 +461,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
switch e.Type {
case event.InitType:
for _, ag := range e.InitEvent.ActionGroups {
klog.Info("InitEvent", ag)
klog.Info("Applier InitEvent", ag)
}
case event.ActionGroupType:
klog.Info(e.ActionGroupEvent)
Expand Down Expand Up @@ -502,7 +503,7 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch
klog.V(4).Info(logEvent)
a.addError(processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
default:
klog.V(4).Infof("skipped %v event", e.Type)
klog.V(4).Infof("Applier skipped %v event", e.Type)
}
}

Expand All @@ -517,12 +518,12 @@ func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[sch

errs := a.Errors()
if errs == nil {
klog.V(4).Infof("all resources are up to date.")
klog.V(4).Infof("Applier completed without error")
}
if s.Empty() {
klog.V(4).Infof("The applier made no new progress")
klog.V(4).Infof("Applier made no new progress")
} else {
klog.Infof("The applier made new progress: %s", s.String())
klog.Infof("Applier made new progress: %s", s.String())
objStatusMap.Log(klog.V(0))
}
return gvks, errs
Expand Down Expand Up @@ -558,6 +559,7 @@ func (a *Applier) invalidateErrors() {
a.errs = nil
}

// Apply all managed resource objects and return any errors.
// Apply implements Interface.
func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
a.applyMux.Lock()
Expand Down
4 changes: 3 additions & 1 deletion pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Resources struct {
objectSet map[core.ID]*unstructured.Unstructured
}

// Update performs an atomic update on the resource declaration set.
// Update performs an atomic update on the resource declaration set, converts
// the objects to Unstructured and validating that not all namespaces are
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and validating -> and validates

// deleted at once.
func (r *Resources) Update(ctx context.Context, objects []client.Object) ([]client.Object, status.Error) {
// First build up the new map using a local pointer/reference.
newSet := make(map[core.ID]*unstructured.Unstructured)
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/tagkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func StatusTagKey(err error) string {
// StatusTagValueFromSummary returns error if the summary indicates at least 1
// error, otherwise success.
func StatusTagValueFromSummary(summary *v1beta1.ErrorSummary) string {
if summary.TotalCount == 0 {
if summary == nil || summary.TotalCount == 0 {
return StatusSuccess
}
return StatusError
Expand Down
51 changes: 30 additions & 21 deletions pkg/parse/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
)

// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo.
// TODO: replace with builder pattern to avoid too many arguments.
func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingPeriod, resyncPeriod, retryPeriod, statusUpdatePeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) {
converter, err := declared.NewValueConverter(dc)
if err != nil {
Expand Down Expand Up @@ -161,13 +162,14 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so
currentRS := rs.DeepCopy()

setSourceStatusFields(&rs.Status.Source, p, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

continueSyncing := (rs.Status.Source.ErrorSummary.TotalCount == 0)
var errorSource []v1beta1.ErrorSource
if len(rs.Status.Source.Errors) > 0 {
errorSource = []v1beta1.ErrorSource{v1beta1.SourceError}
}
reposync.SetSyncing(&rs, continueSyncing, "Source", "Source", newStatus.commit, errorSource, rs.Status.Source.ErrorSummary, newStatus.lastUpdate)
// Syncing is stopped/prevented if there are any errors from the same commit.
// TODO: Handle non-blocking errors
syncing := (syncErrorCount == 0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, syncing only depends on the source error. Now, it is changed to include all errors, including the former render or sync errors with the same commit.

It affects how the notification sends out alerts.

The current proposal is to leverage the Syncing condition to determine the sync status and send out alerts.

If the Syncing condition is false, it indicates the current apply loop is done. If there're no errors, then we send out success notification. Otherwise, it is failed.

In this case, if there're errors from the same commit, even if it might be fixed with the new loop, there will be a failure notification.

We can include previous errors with the same commit in the ErrorSummary to surface the errors earlier, but can we keep syncing only depend on the errors from the current phase (source, render, or sync)?

@sdowell FYI

Copy link
Contributor Author

@karlkfi karlkfi Nov 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can include previous errors with the same commit in the ErrorSummary to surface the errors earlier, but can we keep syncing only depend on the errors from the current phase (source, render, or sync)?

It's not super clear what the Syncing condition was/is supposed to represent.

Internally, the phases are effectively Fetch/Read -> Render/Read -> Parse -> Update (Validate/Apply/Watch/Reconcile).

It's confusing to have the status.sync field represent just the errors from the Updater (which wraps the Applier) and then have the Syncing conditions which is set to True by the previous source and rendering phases.

This PR effectively assumes that since the source and rendering phases were setting the Syncing condition to True that the Syncing condition must include the entire pipeline, and not just the Update/Apply phase.

If that is NOT what we want, then we need to document the proposed change and implement it consistency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it's worth noting that the inputs to a sync/apply are not just Git. They also can include Helm values in the spec and other config values, like Git dir, URL, cluster name, etc. So it doesn't make sense to have a unique pipeline execution depend on just a commit. Even just a generation would be technically insufficient, because the previous run may fail, perhaps due to some ephemeral network issue, triggering a retry. Today we don't indicate what generation or attempt/retry the Syncing condition represents, which makes it hard to use for triggering discrete alerts.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Syncing condition represents the current/latest fetch -> render -> parse -> update pipeline, while status.source, status.rendering, and status.sync represent the statuses of the last processed commit in each phase.

It is possible that the Syncing condition only includes status of partial pipeline while the loop is still in progress.

For example,

  1. The first pipeline failed at commit A in the apply phase, status.sync will have errors for commit A.
  2. The second pipeline failed to fetch commit A, status.source will have errors for commit A, and status.sync still contains the former apply errors.
  3. The current pipeline succeeded in fetching and rendering commit A, source errors from the second pipeline will get cleared from status.source. Then the parseAndUpdate function will call p.setSourceStatus, which calls the setSourceStatusWithRetries function. The setSourceStatusWithRetries function should set Syncing condition to true because it will continue applying the parsed objects next. However, this PR sets the syncing condition to false because of the sync errors from the first pipeline.


reposync.SetSyncing(&rs, syncing, "Source", "Source", newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Source.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -191,7 +193,8 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so
if err := p.client.Status().Update(ctx, &rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.", rs.Status.Source.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync source status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setSourceStatusWithRetries(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, "failed to update RepoSync source status from parser")
Expand All @@ -201,7 +204,7 @@ func (p *namespace) setSourceStatusWithRetries(ctx context.Context, newStatus so

// setRenderingStatus implements the Parser interface
func (p *namespace) setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error {
if oldStatus.equal(newStatus) {
if oldStatus.Equal(newStatus) {
return nil
}

Expand All @@ -223,13 +226,14 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus
currentRS := rs.DeepCopy()

setRenderingStatusFields(&rs.Status.Rendering, p, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

continueSyncing := (rs.Status.Rendering.ErrorSummary.TotalCount == 0)
var errorSource []v1beta1.ErrorSource
if len(rs.Status.Rendering.Errors) > 0 {
errorSource = []v1beta1.ErrorSource{v1beta1.RenderingError}
}
reposync.SetSyncing(&rs, continueSyncing, "Rendering", newStatus.message, newStatus.commit, errorSource, rs.Status.Rendering.ErrorSummary, newStatus.lastUpdate)
// Syncing is stopped/prevented if there are any errors from the same commit.
// TODO: Handle non-blocking errors
syncing := (syncErrorCount == 0)

reposync.SetSyncing(&rs, syncing, "Rendering", newStatus.message, newStatus.commit, errorSources, errorSummary, newStatus.lastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Rendering.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -253,7 +257,8 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus
if err := p.client.Status().Update(ctx, &rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.", rs.Status.Rendering.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync rendering status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setRenderingStatusWithRetires(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, "failed to update RepoSync rendering status from parser")
Expand Down Expand Up @@ -283,16 +288,19 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus sync
currentRS := rs.DeepCopy()

setSyncStatusFields(&rs.Status.Status, newStatus, denominator)
errorSources, errorSummary := summarizeErrors(rs.Status.Status, newStatus.commit)
syncErrorCount := totalErrors(errorSummary)

errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync)
var message string
if newStatus.syncing {
reposync.SetSyncing(rs, true, "Sync", "Syncing", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)
message = "Syncing"
} else {
if errorSummary.TotalCount == 0 {
message = "Sync Completed"
if syncErrorCount == 0 {
rs.Status.LastSyncedCommit = rs.Status.Sync.Commit
}
reposync.SetSyncing(rs, false, "Sync", "Sync Completed", rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)
}
reposync.SetSyncing(rs, newStatus.syncing, "Sync", message, rs.Status.Sync.Commit, errorSources, errorSummary, rs.Status.Sync.LastUpdate)

// Avoid unnecessary status updates.
if !currentRS.Status.Sync.LastUpdate.IsZero() && cmp.Equal(currentRS.Status, rs.Status, compare.IgnoreTimestampUpdates) {
Expand All @@ -312,14 +320,15 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, newStatus sync
}

if klog.V(5).Enabled() {
klog.Infof("Updating status for RepoSync %s/%s:\nDiff (- Expected, + Actual):\n%s",
klog.Infof("Updating sync status for RepoSync %s/%s:\nDiff (- Old, + New):\n%s",
rs.Namespace, rs.Name, cmp.Diff(currentRS.Status, rs.Status))
}

if err := p.client.Status().Update(ctx, rs); err != nil {
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err)
klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.",
syncErrorCount, denominator, err)
return p.setSyncStatusWithRetries(ctx, newStatus, denominator*2)
}
return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope))
Expand Down
Loading