Skip to content

Commit

Permalink
Clean up Applier
Browse files Browse the repository at this point in the history
- Uses locks to protect the MultiError, Sycning, and Apply method
  independently. This is a hack to imporve threadsafety without
  changing the Applier interface. More work will probably be needed
  to actually fix the status inconsistency issues.
- Modify all the places calling SetSyncStatus to also call
  UpdateConflictManagerStatus. This series of multiple updates is
  still a mess that needs more work, but at least it should be
  consistently flakey now. Every status update will remove the
  conflict errors, and then the conflict errors are prepended back
  in with another update.
- Rename applier.Interface to KptApplier to make way for the
  KptDestroyer interface later.
- TODO: I still don't understand why only RootSyncs have conflict
  errors from the remediator added to their status...

Change-Id: I1c3373f98bb5d842ce113ba61dc0c87e1119c665
  • Loading branch information
karlkfi committed Nov 3, 2022
1 parent 8d9c21b commit 9124c33
Show file tree
Hide file tree
Showing 8 changed files with 158 additions and 85 deletions.
158 changes: 101 additions & 57 deletions pkg/applier/kpt_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,30 +74,37 @@ type Applier struct {
client client.Client
// configFlags for creating clients
configFlags *genericclioptions.ConfigFlags
// errs tracks all the errors the applier encounters.
// This field is cleared at the start of the `Applier.Apply` method
errs status.MultiError
// syncing indicates whether the applier is syncing.
syncing bool
// syncKind is the Kind of the RSync object: RootSync or RepoSync
syncKind string
// syncName is the name of RSync object
syncName string
// syncNamespace is the namespace of RSync object
syncNamespace string
// mux is an Applier-level mutext to prevent concurrent Apply() and Refresh()
mux sync.Mutex
// statusMode controls if the applier injects the acutation status into the
// ResourceGroup object
statusMode string
// reconcileTimeout controls the reconcile and prune timeout
reconcileTimeout time.Duration

// syncingLock protects reading `syncing` while changing its value.
syncingLock sync.RWMutex
// syncing indicates whether the applier or destroyer is running.
syncing bool

// errsLock protects reading `errs` while changing its value.
errsLock sync.RWMutex
// errs tracks all the errors the applier encounters.
// This field is cleared at the start of an Apply
errs status.MultiError

// syncLock prevents multiple Apply methods from running concurrently.
syncLock sync.Mutex
}

// Interface is a fake-able subset of the interface Applier implements.
// KptApplier is a fake-able subset of the interface Applier implements.
//
// Placed here to make discovering the production implementation (above) easier.
type Interface interface {
type KptApplier interface {
// Apply updates the resource API server with the latest parsed git resource.
// This is called when a new change in the git resource is detected. It also
// returns a map of the GVKs which were successfully applied by the Applier.
Expand All @@ -108,7 +115,7 @@ type Interface interface {
Syncing() bool
}

var _ Interface = &Applier{}
var _ KptApplier = &Applier{}

// NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from
// the API server.
Expand Down Expand Up @@ -278,6 +285,7 @@ func handleApplySkippedEvent(obj *unstructured.Unstructured, id core.ID, err err
return SkipErrorForResource(err, id, actuation.ActuationStrategyApply)
}

// processPruneEvent handles PruneEvents from the Applier
func processPruneEvent(ctx context.Context, e event.PruneEvent, s *stats.PruneEventStats, objectStatusMap ObjectStatusMap, cs *clientSet) status.Error {
id := idFrom(e.Identifier)
klog.V(4).Infof("prune %v for object: %v", e.Status, id)
Expand Down Expand Up @@ -394,11 +402,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client)
}
}

// sync triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// applyInner triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) applyInner(ctx context.Context, objs []client.Object) map[schema.GroupVersionKind]struct{} {
cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode)
if err != nil {
return nil, Error(err)
a.addError(err)
return nil
}
a.checkInventoryObjectSize(ctx, cs.client)

Expand All @@ -408,21 +417,22 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// through annotation.
enabledObjs, disabledObjs := partitionObjs(objs)
if len(disabledObjs) > 0 {
klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs))
klog.Infof("Applier disabling %d objects: %v", len(disabledObjs), core.GKNNs(disabledObjs))
disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs)
if err != nil {
a.errs = status.Append(a.errs, err)
return nil, a.errs
a.addError(err)
return nil
}
s.DisableObjs = &stats.DisabledObjStats{
Total: uint64(len(disabledObjs)),
Succeeded: disabledCount,
}
}
klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, toUnsErrs := toUnstructured(enabledObjs)
if toUnsErrs != nil {
return nil, toUnsErrs
klog.Infof("Applier applying %d objects: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, err := toUnstructured(enabledObjs)
if err != nil {
a.addError(err)
return nil
}

unknownTypeResources := make(map[core.ID]struct{})
Expand All @@ -446,7 +456,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// This allows for picking up CRD changes.
mapper, err := a.configFlags.ToRESTMapper()
if err != nil {
return nil, status.Append(nil, err)
a.addError(err)
return nil
}
meta.MaybeResetRESTMapper(mapper)

Expand All @@ -455,16 +466,16 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
switch e.Type {
case event.InitType:
for _, ag := range e.InitEvent.ActionGroups {
klog.Info("InitEvent", ag)
klog.Info("Applier InitEvent", ag)
}
case event.ActionGroupType:
klog.Info(e.ActionGroupEvent)
case event.ErrorType:
klog.V(4).Info(e.ErrorEvent)
if util.IsRequestTooLargeError(e.ErrorEvent.Err) {
a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
} else {
a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err))
a.addError(e.ErrorEvent.Err)
}
s.ErrorTypeEvents++
case event.WaitType:
Expand All @@ -475,7 +486,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// a reconciled object may become pending before a wait task times out.
// Record the objs that have been reconciled.
klog.V(4).Info(e.WaitEvent)
a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap))
err := processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap)
if err != nil {
a.addError(err)
}
case event.ApplyType:
logEvent := event.ApplyEvent{
GroupName: e.ApplyEvent.GroupName,
Expand All @@ -485,7 +499,10 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.ApplyEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources))
err := processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources)
if err != nil {
a.addError(err)
}
case event.PruneType:
logEvent := event.PruneEvent{
GroupName: e.PruneEvent.GroupName,
Expand All @@ -495,9 +512,12 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.PruneEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
err := processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs)
if err != nil {
a.addError(err)
}
default:
klog.V(4).Infof("skipped %v event", e.Type)
klog.V(4).Infof("Applier skipped %v event", e.Type)
}
}

Expand All @@ -509,55 +529,79 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
}
gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{}
}
if a.errs == nil {
klog.V(4).Infof("all resources are up to date.")
if a.Errors() == nil {
klog.V(4).Infof("Applier completed without error")
}

if s.Empty() {
klog.V(4).Infof("The applier made no new progress")
klog.V(4).Infof("Applier made no new progress")
} else {
klog.Infof("The applier made new progress: %s", s.String())
klog.Infof("Applier made new progress: %s", s.String())
objStatusMap.Log(klog.V(0))
}
return gvks, a.errs
return gvks
}

// Errors implements Interface.
// Errors returns the errors encountered during apply.
// Errors implements Interface.
func (a *Applier) Errors() status.MultiError {
a.errsLock.RLock()
defer a.errsLock.RUnlock()
return a.errs
}

// Syncing implements Interface.
func (a *Applier) addError(err error) {
// Ignore nil errors, to allow the caller to skip the nil check
if err == nil {
return
}
a.errsLock.Lock()
defer a.errsLock.Unlock()

// Default to an applier.Error
_, isStatusErr := err.(status.Error)
_, isMultiErr := err.(status.MultiError)
if !isStatusErr && !isMultiErr {
err = Error(err)
}
a.errs = status.Append(a.errs, err)
}

func (a *Applier) clearErrors() {
a.errsLock.Lock()
defer a.errsLock.Unlock()
a.errs = nil
}

// Syncing returns whether the applier is syncing.
// Syncing implements Interface.
func (a *Applier) Syncing() bool {
a.syncingLock.RLock()
defer a.syncingLock.RUnlock()
return a.syncing
}

func (a *Applier) setSyncing(syncing bool) {
a.syncingLock.Lock()
defer a.syncingLock.Unlock()
a.syncing = syncing
}

// Apply all managed resource objects and return any errors.
// Apply implements Interface.
func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// Clear the `errs` field at the start.
a.errs = nil
// Set the `syncing` field to `true` at the start.
a.syncing = true

defer func() {
// Make sure to clear the `syncing` field before `Apply` returns.
a.syncing = false
}()

a.mux.Lock()
defer a.mux.Unlock()

// Pull the actual resources from the API server to compare against the
// declared resources. Note that we do not immediately return on error here
// because the Applier needs to try to do as much work as it can on each
// cycle. We collect and return all errors at the end. Some of those errors
// are transient and resolve in future cycles based on partial work completed
// in a previous cycle (eg ignore an error about a CR so that we can apply the
// CRD, then a future cycle is able to apply the CR).
// TODO: Here and elsewhere, pass the MultiError as a parameter.
return a.sync(ctx, desiredResource)
a.syncLock.Lock()
defer a.syncLock.Unlock()

a.setSyncing(true)
defer a.setSyncing(false)

// Clear the errors at the start of a new apply.
// TODO: Keep track of old errors and new errors and merge them, until applyInner returns.
a.clearErrors()

objectStatusMap := a.applyInner(ctx, desiredResource)
return objectStatusMap, a.Errors()
}

// newInventoryUnstructured creates an inventory object as an unstructured.
Expand Down
4 changes: 2 additions & 2 deletions pkg/applier/kpt_applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct
return events
}

func TestSync(t *testing.T) {
func TestApply(t *testing.T) {
deploymentObj := newDeploymentObj()
deploymentID := object.UnstructuredToObjMetadata(deploymentObj)

Expand Down Expand Up @@ -263,7 +263,7 @@ func TestSync(t *testing.T) {
} else {
applier.clientSetFunc = applierFunc
var gvks map[schema.GroupVersionKind]struct{}
gvks, errs = applier.sync(context.Background(), objs)
gvks, errs = applier.Apply(context.Background(), objs)
if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff)
}
Expand Down
18 changes: 10 additions & 8 deletions pkg/parse/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import (
)

// NewNamespaceRunner creates a new runnable parser for parsing a Namespace repo.
func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.Interface, rem remediator.Interface) (Parser, error) {
func NewNamespaceRunner(clusterName, syncName, reconcilerName string, scope declared.Scope, fileReader reader.Reader, c client.Client, pollingFrequency time.Duration, resyncPeriod time.Duration, fs FileSource, dc discovery.DiscoveryInterface, resources *declared.Resources, app applier.KptApplier, rem remediator.Interface) (Parser, error) {
converter, err := declared.NewValueConverter(dc)
if err != nil {
return nil, err
Expand Down Expand Up @@ -271,13 +271,13 @@ func (p *namespace) setRenderingStatusWithRetires(ctx context.Context, newStatus
// SetSyncStatus implements the Parser interface
// SetSyncStatus sets the RepoSync sync status.
// `errs` includes the errors encountered during the apply step;
func (p *namespace) SetSyncStatus(ctx context.Context, errs status.MultiError) error {
func (p *namespace) SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error {
p.mux.Lock()
defer p.mux.Unlock()
return p.setSyncStatusWithRetries(ctx, errs, defaultDenominator)
return p.setSyncStatusWithRetries(ctx, syncing, errs, defaultDenominator)
}

func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.MultiError, denominator int) error {
func (p *namespace) setSyncStatusWithRetries(ctx context.Context, syncing bool, errs status.MultiError, denominator int) error {
if denominator <= 0 {
return fmt.Errorf("The denominator must be a positive number")
}
Expand All @@ -289,9 +289,6 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu

currentRS := rs.DeepCopy()

// syncing indicates whether the applier is syncing.
syncing := p.applier.Syncing()

setSyncStatus(&rs.Status.Status, status.ToCSE(errs), denominator)

errorSources, errorSummary := summarizeErrors(rs.Status.Source, rs.Status.Sync)
Expand Down Expand Up @@ -330,13 +327,18 @@ func (p *namespace) setSyncStatusWithRetries(ctx context.Context, errs status.Mu
// If the update failure was caused by the size of the RepoSync object, we would truncate the errors and retry.
if isRequestTooLargeError(err) {
klog.Infof("Failed to update RepoSync sync status (total error count: %d, denominator: %d): %s.", rs.Status.Sync.ErrorSummary.TotalCount, denominator, err)
return p.setSyncStatusWithRetries(ctx, errs, denominator*2)
return p.setSyncStatusWithRetries(ctx, syncing, errs, denominator*2)
}
return status.APIServerError(err, fmt.Sprintf("failed to update the RepoSync sync status for the %v namespace", p.scope))
}
return nil
}

// ApplierSyncing returns true if the applier is syncing
func (p *namespace) ApplierSyncing() bool {
return p.applier.Syncing()
}

// ApplierErrors implements the Parser interface
func (p *namespace) ApplierErrors() status.MultiError {
return p.applier.Errors()
Expand Down
4 changes: 3 additions & 1 deletion pkg/parse/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ type Parser interface {
parseSource(ctx context.Context, state sourceState) ([]ast.FileObject, status.MultiError)
setSourceStatus(ctx context.Context, newStatus sourceStatus) error
setRenderingStatus(ctx context.Context, oldStatus, newStatus renderingStatus) error
SetSyncStatus(ctx context.Context, errs status.MultiError) error
SetSyncStatus(ctx context.Context, syncing bool, errs status.MultiError) error
options() *opts
// SetReconciling sets the field indicating whether the reconciler is reconciling a change.
SetReconciling(value bool)
// Reconciling returns whether the reconciler is reconciling a change.
Reconciling() bool
// ApplierErrors returns the errors surfaced by the applier.
ApplierErrors() status.MultiError
// ApplierSyncing returns true if the Applier is syncing
ApplierSyncing() bool
// RemediatorConflictErrors returns the conflict errors detected by the remediator.
RemediatorConflictErrors() []status.ManagementConflictError
// K8sClient returns the Kubernetes client that talks to the API server.
Expand Down
Loading

0 comments on commit 9124c33

Please sign in to comment.