Skip to content

Commit

Permalink
Improve applier and status synchronization
Browse files Browse the repository at this point in the history
- Fix Syncing condition errors to include errors from all reconciler
  pipeline phases: rendering, source, & sync.
- Centralize aggregation of sync errors in `updater.status()`
- Move the periodic sync status update from the reconciler into the
  parser. This removes the need for some synchronization that wasn't
  previously thread-safe.
- Use locks to protect the MultiError, Syncing, and Apply methods
  independently. This is a hack to improve thread-safety without
  changing the Applier interface. In the future, the interface
  should probably be rewritten to use channels instead.
- Remove obsolete parser methods
- Clean up period constants
- Rename applier.Interface to KptApplier to make way for the
  KptDestroyer interface later.
- Set ErrorSummary fields to nil when empty (no errors)

Change-Id: I28d767388ef9da2f7bb6484d6bad8e903e23e461
  • Loading branch information
karlkfi committed Nov 10, 2022
1 parent db1aca0 commit 705661c
Show file tree
Hide file tree
Showing 16 changed files with 971 additions and 629 deletions.
1 change: 1 addition & 0 deletions cmd/reconciler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func main() {
ResyncPeriod: *resyncPeriod,
PollingPeriod: *pollingPeriod,
RetryPeriod: configsync.DefaultReconcilerRetryPeriod,
StatusUpdatePeriod: configsync.DefaultReconcilerSyncStatusUpdatePeriod,
SourceRoot: absSourceDir,
RepoRoot: absRepoRoot,
HydratedRoot: *hydratedRootDir,
Expand Down
421 changes: 318 additions & 103 deletions e2e/testcases/hydration_test.go

Large diffs are not rendered by default.

18 changes: 16 additions & 2 deletions e2e/testcases/multi_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,20 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {

nt.T.Logf("The admission webhook should deny the update request in Root %s", rootSync2)
nt.WaitForRootSyncSyncError(rootSync2, applier.ApplierErrorCode, "denied the request")
nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName)
nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository")

// The first RootSync to apply the object will apply its manager annotation,
// which causes the webhook to reject attempts to update the object by other
// RootSyncs, the manager annotation won't change, so the first RootSync
// won't get any management conflict errors.
// The first RootSync would only get an error if the webhook was disabled.

// TODO: Should adoption fights happen with the webhook enabled?
// The remediator doesn't return a KptManagementConflictError, so the
// following error validation will never succeed.

// nt.T.Logf("Root %s should also get surfaced with the conflict error", configsync.RootSyncName)
// nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository")

nt.T.Logf("The Role resource version should not be changed")
if err := nt.Validate("pods", testNs, &rbacv1.Role{},
nomostest.ResourceVersionEquals(nt, roleResourceVersion)); err != nil {
Expand All @@ -410,13 +422,15 @@ func TestConflictingDefinitions_RootToRoot(t *testing.T) {

nt.T.Logf("Stop the admission webhook, the remediator should report the conflicts")
nomostest.StopWebhook(nt)

nt.T.Logf("The Role resource version should be changed because two reconcilers are fighting with each other")
if _, err := nomostest.Retry(90*time.Second, func() error {
return nt.Validate("pods", testNs, &rbacv1.Role{},
nomostest.ResourceVersionNotEquals(nt, roleResourceVersion))
}); err != nil {
nt.T.Fatal(err)
}

nt.T.Logf("Both of the two RootSyncs still report problems because the remediators detect the conflicts")
nt.WaitForRootSyncSyncError(configsync.RootSyncName, status.ManagementConflictErrorCode, "declared in another repository")
nt.WaitForRootSyncSyncError(rootSync2, status.ManagementConflictErrorCode, "declared in another repository")
Expand Down
5 changes: 5 additions & 0 deletions pkg/api/configsync/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ const (
// TODO: replace with retry-backoff strategy
DefaultReconcilerRetryPeriod = time.Second

// DefaultReconcilerSyncStatusUpdatePeriod is the time delay between async
// status updates by the reconciler. These updates report new management
// conflict errors from the remediator, if there are any.
DefaultReconcilerSyncStatusUpdatePeriod = 5 * time.Second

// DefaultReconcileTimeout is the default wait timeout used by the applier
// when waiting for reconciliation after actuation.
// For Apply, it waits for Current status.
Expand Down
158 changes: 101 additions & 57 deletions pkg/applier/kpt_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,37 @@ type Applier struct {
client client.Client
// configFlags for creating clients
configFlags *genericclioptions.ConfigFlags
// errs tracks all the errors the applier encounters.
// This field is cleared at the start of the `Applier.Apply` method
errs status.MultiError
// syncing indicates whether the applier is syncing.
syncing bool
// syncKind is the Kind of the RSync object: RootSync or RepoSync
syncKind string
// syncName is the name of RSync object
syncName string
// syncNamespace is the namespace of RSync object
syncNamespace string
// mux is an Applier-level mutext to prevent concurrent Apply() and Refresh()
mux sync.Mutex
// statusMode controls if the applier injects the acutation status into the
// ResourceGroup object
statusMode string
// reconcileTimeout controls the reconcile and prune timeout
reconcileTimeout time.Duration

// syncingLock protects reading `syncing` while changing its value.
syncingLock sync.RWMutex
// syncing indicates whether the applier or destroyer is running.
syncing bool

// errsLock protects reading `errs` while changing its value.
errsLock sync.RWMutex
// errs tracks all the errors the applier encounters.
// This field is cleared at the start of an Apply
errs status.MultiError

// syncLock prevents multiple Apply methods from running concurrently.
syncLock sync.Mutex
}

// Interface is a fake-able subset of the interface Applier implements.
// KptApplier is a fake-able subset of the interface Applier implements.
//
// Placed here to make discovering the production implementation (above) easier.
type Interface interface {
type KptApplier interface {
// Apply updates the resource API server with the latest parsed git resource.
// This is called when a new change in the git resource is detected. It also
// returns a map of the GVKs which were successfully applied by the Applier.
Expand All @@ -108,7 +115,7 @@ type Interface interface {
Syncing() bool
}

var _ Interface = &Applier{}
var _ KptApplier = &Applier{}

// NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from
// the API server.
Expand Down Expand Up @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err
return SkipErrorForResource(err, id, actuation.ActuationStrategyApply)
}

// processPruneEvent handles PruneEvents from the Applier
func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error {
id := idFrom(e.Identifier)
klog.V(4).Infof("prune %v for object: %v", e.Status, id)
Expand Down Expand Up @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client)
}
}

// sync triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// applyInner triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} {
cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode)
if err != nil {
return nil, Error(err)
a.addError(err)
return nil
}
a.checkInventoryObjectSize(ctx, cs.client)

Expand All @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// through annotation.
enabledObjs, disabledObjs := partitionObjs(objs)
if len(disabledObjs) > 0 {
klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs))
klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs))
disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs)
if err != nil {
a.errs = status.Append(a.errs, err)
return nil, a.errs
a.addError(err)
return nil
}
s.DisableObjs = &stats.DisabledObjStats{
Total: uint64(len(disabledObjs)),
Succeeded: disabledCount,
}
}
klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, toUnsErrs := toUnstructured(enabledObjs)
if toUnsErrs != nil {
return nil, toUnsErrs
klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, err := toUnstructured(enabledObjs)
if err != nil {
a.addError(err)
return nil
}

unknownTypeResources := make(map[core.ID]struct{})
Expand All @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// This allows for picking up CRD changes.
mapper, err := a.configFlags.ToRESTMapper()
if err != nil {
return nil, status.Append(nil, err)
a.addError(err)
return nil
}
meta.MaybeResetRESTMapper(mapper)

Expand All @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
switch e.Type {
case event.InitType:
for _, ag := range e.InitEvent.ActionGroups {
klog.Info("InitEvent", ag)
klog.Info("Applier InitEvent", ag)
}
case event.ActionGroupType:
klog.Info(e.ActionGroupEvent)
case event.ErrorType:
klog.V(4).Info(e.ErrorEvent)
if util.IsRequestTooLargeError(e.ErrorEvent.Err) {
a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
} else {
a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err))
a.addError(e.ErrorEvent.Err)
}
s.ErrorTypeEvents++
case event.WaitType:
Expand All @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// a reconciled object may become pending before a wait task times out.
// Record the objs that have been reconciled.
klog.V(4).Info(e.WaitEvent)
a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap))
err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)
if err != nil {
a.addError(err)
}
case event.ApplyType:
logEvent := event.ApplyEvent{
GroupName: e.ApplyEvent.GroupName,
Expand All @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.ApplyEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources))
err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)
if err != nil {
a.addError(err)
}
case event.PruneType:
logEvent := event.PruneEvent{
GroupName: e.PruneEvent.GroupName,
Expand All @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.PruneEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)
if err != nil {
a.addError(err)
}
default:
klog.V(4).Infof("skipped %v event", e.Type)
klog.V(4).Infof("Applier skipped %v event", e.Type)
}
}

Expand All @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
}
gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{}
}
if a.errs == nil {
klog.V(4).Infof("all resources are up to date.")
if a.Errors() == nil {
klog.V(4).Infof("Applier completed without error")
}

if s.Empty() {
klog.V(4).Infof("The applier made no new progress")
klog.V(4).Infof("Applier made no new progress")
} else {
klog.Infof("The applier made new progress: %s", s.String())
klog.Infof("Applier made new progress: %s", s.String())
objStatusMap.Log(klog.V(0))
}
return gvks, a.errs
return gvks
}

// Errors implements Interface.
// Errors returns the errors encountered during apply.
// Errors implements Interface.
func (a *Applier) Errors() status.MultiError {
a.errsLock.RLock()
defer a.errsLock.RUnlock()
return a.errs
}

// Syncing implements Interface.
func (a *Applier) addError(err error) {
// Ignore nil errors, to allow the caller to skip the nil check
if err == nil {
return
}
a.errsLock.Lock()
defer a.errsLock.Unlock()

// Default to an applier.Error
_, isStatusErr := err.(status.Error)
_, isMultiErr := err.(status.MultiError)
if !isStatusErr && !isMultiErr {
err = Error(err)
}
a.errs = status.Append(a.errs, err)
}

func (a *Applier) clearErrors() {
a.errsLock.Lock()
defer a.errsLock.Unlock()
a.errs = nil
}

// Syncing returns whether the applier is syncing.
// Syncing implements Interface.
func (a *Applier) Syncing() bool {
a.syncingLock.RLock()
defer a.syncingLock.RUnlock()
return a.syncing
}

func (a *Applier) setSyncing(syncing bool) {
a.syncingLock.Lock()
defer a.syncingLock.Unlock()
a.syncing = syncing
}

// Apply all managed resource objects and return any errors.
// Apply implements Interface.
func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// Clear the `errs` field at the start.
a.errs = nil
// Set the `syncing` field to `true` at the start.
a.syncing = true

defer func() {
// Make sure to clear the `syncing` field before `Apply` returns.
a.syncing = false
}()

a.mux.Lock()
defer a.mux.Unlock()

// Pull the actual resources from the API server to compare against the
// declared resources. Note that we do not immediately return on error here
// because the Applier needs to try to do as much work as it can on each
// cycle. We collect and return all errors at the end. Some of those errors
// are transient and resolve in future cycles based on partial work completed
// in a previous cycle (eg ignore an error about a CR so that we can apply the
// CRD, then a future cycle is able to apply the CR).
// TODO: Here and elsewhere, pass the MultiError as a parameter.
return a.sync(ctx, desiredResource)
a.syncLock.Lock()
defer a.syncLock.Unlock()

a.setSyncing(true)
defer a.setSyncing(false)

// Clear the errors at the start of a new apply.
// TODO: Keep track of old errors and new errors and merge them, until applyInner returns.
a.clearErrors()

objectStatusMap := a.applyInner(ctx, desiredResource)
return objectStatusMap, a.Errors()
}

// newInventoryUnstructured creates an inventory object as an unstructured.
Expand Down
4 changes: 2 additions & 2 deletions pkg/applier/kpt_applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct
return events
}

func TestSync(t *testing.T) {
func TestApply(t *testing.T) {
deploymentObj := newDeploymentObj()
deploymentID := object.UnstructuredToObjMetadata(deploymentObj)

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestSync(t *testing.T) {
} else {
applier.clientSetFunc = applierFunc
var gvks map[schema.GroupVersionKind]struct{}
gvks, errs = applier.sync(context.Background(), objs)
gvks, errs = applier.Apply(context.Background(), objs)
if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/declared/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ type Resources struct {
objectSet map[core.ID]*unstructured.Unstructured
}

// Update performs an atomic update on the resource declaration set.
// Update performs an atomic update on the resource declaration set, converts
// the objects to Unstructured and validating that not all namespaces are
// deleted at once.
func (r *Resources) Update(ctx context.Context, objects []client.Object) ([]client.Object, status.Error) {
// First build up the new map using a local pointer/reference.
newSet := make(map[core.ID]*unstructured.Unstructured)
Expand Down
2 changes: 1 addition & 1 deletion pkg/metrics/tagkeys.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func StatusTagKey(err error) string {
// StatusTagValueFromSummary returns error if the summary indicates at least 1
// error, otherwise success.
func StatusTagValueFromSummary(summary *v1beta1.ErrorSummary) string {
if summary.TotalCount == 0 {
if summary == nil || summary.TotalCount == 0 {
return StatusSuccess
}
return StatusError
Expand Down
Loading

0 comments on commit 705661c

Please sign in to comment.