Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve thread-safety of sync status updates #301

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 60 additions & 55 deletions pkg/applier/kpt_applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,38 +77,37 @@ type Applier struct {
// errs tracks all the errors the applier encounters.
// This field is cleared at the start of the `Applier.Apply` method
errs status.MultiError
// syncing indicates whether the applier is syncing.
syncing bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh man was this another bool used for synchronization?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup. and set outside of the lock.

// syncKind is the Kind of the RSync object: RootSync or RepoSync
syncKind string
// syncName is the name of RSync object
syncName string
// syncNamespace is the namespace of RSync object
syncNamespace string
// mux is an Applier-level mutext to prevent concurrent Apply() and Refresh()
mux sync.Mutex
// statusMode controls if the applier injects the acutation status into the
// ResourceGroup object
statusMode string
// reconcileTimeout controls the reconcile and prune timeout
reconcileTimeout time.Duration

// applyMux prevents concurrent Apply() calls
applyMux sync.Mutex
// errorMux prevents concurrent modifications to the cached set of errors
errorMux sync.RWMutex
}

// Interface is a fake-able subset of the interface Applier implements.
// KptApplier is a fake-able subset of the interface Applier implements.
//
// Placed here to make discovering the production implementation (above) easier.
type Interface interface {
type KptApplier interface {
// Apply updates the resource API server with the latest parsed git resource.
// This is called when a new change in the git resource is detected. It also
// returns a map of the GVKs which were successfully applied by the Applier.
Apply(ctx context.Context, desiredResources []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError)
// Errors returns the errors encountered during apply.
Errors() status.MultiError
// Syncing indicates whether the applier is syncing.
Syncing() bool
}

var _ Interface = &Applier{}
var _ KptApplier = &Applier{}

// NewNamespaceApplier initializes an applier that fetches a certain namespace's resources from
// the API server.
Expand Down Expand Up @@ -397,11 +396,12 @@ func (a *Applier) checkInventoryObjectSize(ctx context.Context, c client.Client)
}
}

// sync triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// applyInner triggers a kpt live apply library call to apply a set of resources.
func (a *Applier) applyInner(ctx context.Context, objs []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
cs, err := a.clientSetFunc(a.client, a.configFlags, a.statusMode)
if err != nil {
return nil, Error(err)
a.addError(err)
return nil, a.Errors()
}
a.checkInventoryObjectSize(ctx, cs.client)

Expand All @@ -414,18 +414,19 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
klog.Infof("%v objects to be disabled: %v", len(disabledObjs), core.GKNNs(disabledObjs))
disabledCount, err := cs.handleDisabledObjects(ctx, a.inventory, disabledObjs)
if err != nil {
a.errs = status.Append(a.errs, err)
return nil, a.errs
a.addError(err)
return nil, a.Errors()
}
s.DisableObjs = &stats.DisabledObjStats{
Total: uint64(len(disabledObjs)),
Succeeded: disabledCount,
}
}
klog.Infof("%v objects to be applied: %v", len(enabledObjs), core.GKNNs(enabledObjs))
resources, toUnsErrs := toUnstructured(enabledObjs)
if toUnsErrs != nil {
return nil, toUnsErrs
resources, err := toUnstructured(enabledObjs)
if err != nil {
a.addError(err)
return nil, a.Errors()
}

unknownTypeResources := make(map[core.ID]struct{})
Expand All @@ -449,7 +450,8 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// This allows for picking up CRD changes.
mapper, err := a.configFlags.ToRESTMapper()
if err != nil {
return nil, status.Append(nil, err)
a.addError(err)
return nil, a.Errors()
}
meta.MaybeResetRESTMapper(mapper)

Expand All @@ -465,9 +467,9 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
case event.ErrorType:
klog.V(4).Info(e.ErrorEvent)
if util.IsRequestTooLargeError(e.ErrorEvent.Err) {
a.errs = status.Append(a.errs, largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
a.addError(largeResourceGroupError(e.ErrorEvent.Err, idFromInventory(a.inventory)))
} else {
a.errs = status.Append(a.errs, Error(e.ErrorEvent.Err))
a.addError(e.ErrorEvent.Err)
}
s.ErrorTypeEvents++
case event.WaitType:
Expand All @@ -478,7 +480,7 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
// a reconciled object may become pending before a wait task times out.
// Record the objs that have been reconciled.
klog.V(4).Info(e.WaitEvent)
a.errs = status.Append(a.errs, processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap))
a.addError(processWaitEvent(e.WaitEvent, s.WaitEvent, objStatusMap))
case event.ApplyType:
logEvent := event.ApplyEvent{
GroupName: e.ApplyEvent.GroupName,
Expand All @@ -488,7 +490,7 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.ApplyEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources))
a.addError(processApplyEvent(ctx, e.ApplyEvent, s.ApplyEvent, objStatusMap, unknownTypeResources))
case event.PruneType:
logEvent := event.PruneEvent{
GroupName: e.PruneEvent.GroupName,
Expand All @@ -498,7 +500,7 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
Error: e.PruneEvent.Error,
}
klog.V(4).Info(logEvent)
a.errs = status.Append(a.errs, processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
a.addError(processPruneEvent(ctx, e.PruneEvent, s.PruneEvent, objStatusMap, cs))
default:
klog.V(4).Infof("skipped %v event", e.Type)
}
Expand All @@ -512,57 +514,60 @@ func (a *Applier) sync(ctx context.Context, objs []client.Object) (map[schema.Gr
}
gvks[resource.GetObjectKind().GroupVersionKind()] = struct{}{}
}
if a.errs == nil {

errs := a.Errors()
if errs == nil {
klog.V(4).Infof("all resources are up to date.")
}

if s.Empty() {
klog.V(4).Infof("The applier made no new progress")
} else {
klog.Infof("The applier made new progress: %s", s.String())
objStatusMap.Log(klog.V(0))
}
return gvks, a.errs
return gvks, errs
}

// Errors returns the errors encountered during the last apply or current apply
// if still running.
// Errors implements Interface.
// Errors returns the errors encountered during apply.
func (a *Applier) Errors() status.MultiError {
// TODO: Make read/write of a.errs thread-safe
// Return a copy
a.errorMux.RLock()
defer a.errorMux.RUnlock()

// Return a copy to avoid persisting caller modifications
return status.Append(nil, a.errs)
haiyanmeng marked this conversation as resolved.
Show resolved Hide resolved
}

// Syncing implements Interface.
// Syncing returns whether the applier is syncing.
func (a *Applier) Syncing() bool {
return a.syncing
func (a *Applier) addError(err error) {
a.errorMux.Lock()
defer a.errorMux.Unlock()

if _, ok := err.(status.Error); !ok {
// Wrap as an applier.Error to indidate the source of the error
err = Error(err)
}

a.errs = status.Append(a.errs, err)
}

func (a *Applier) invalidateErrors() {
a.errorMux.Lock()
defer a.errorMux.Unlock()

a.errs = nil
}

// Apply implements Interface.
func (a *Applier) Apply(ctx context.Context, desiredResource []client.Object) (map[schema.GroupVersionKind]struct{}, status.MultiError) {
// Clear the `errs` field at the start.
a.errs = nil
// Set the `syncing` field to `true` at the start.
a.syncing = true

defer func() {
// Make sure to clear the `syncing` field before `Apply` returns.
a.syncing = false
}()

a.mux.Lock()
defer a.mux.Unlock()

// Pull the actual resources from the API server to compare against the
// declared resources. Note that we do not immediately return on error here
// because the Applier needs to try to do as much work as it can on each
// cycle. We collect and return all errors at the end. Some of those errors
// are transient and resolve in future cycles based on partial work completed
// in a previous cycle (eg ignore an error about a CR so that we can apply the
// CRD, then a future cycle is able to apply the CR).
// TODO: Here and elsewhere, pass the MultiError as a parameter.
return a.sync(ctx, desiredResource)
a.applyMux.Lock()
defer a.applyMux.Unlock()

// Ideally we want to avoid invalidating errors that will continue to happen,
// but for now, invalidate all errors until they recur.
// TODO: improve error cache invalidation to make rsync status more stable
a.invalidateErrors()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the plan for how we want to handle this in the long run?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm writing a doc. But the TLDR is to do more careful invalidation by caching what event type and object added/owns the error, and not invalidating that error until we see that event again, or the apply ends.

return a.applyInner(ctx, desiredResource)
}

// newInventoryUnstructured creates an inventory object as an unstructured.
Expand Down
88 changes: 45 additions & 43 deletions pkg/applier/kpt_applier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (a *fakeApplier) Run(_ context.Context, _ inventory.Info, _ object.Unstruct
return events
}

func TestSync(t *testing.T) {
func TestApply(t *testing.T) {
deploymentObj := newDeploymentObj()
deploymentID := object.UnstructuredToObjMetadata(deploymentObj)

Expand Down Expand Up @@ -242,54 +242,56 @@ func TestSync(t *testing.T) {
}

for _, tc := range testcases {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(kinds.RepoSyncV1Beta1())
u.SetNamespace("test-namespace")
u.SetName("rs")

fakeClient := testingfake.NewClient(t, core.Scheme, u)
configFlags := &genericclioptions.ConfigFlags{} // unused by test applier
applierFunc := func(c client.Client, _ *genericclioptions.ConfigFlags, _ string) (*clientSet, error) {
return &clientSet{
kptApplier: newFakeApplier(tc.initErr, tc.events),
client: fakeClient,
}, tc.initErr
}

var errs status.MultiError
applier, err := NewNamespaceApplier(fakeClient, configFlags, "test-namespace", "rs", "", 5*time.Minute)
if err != nil {
errs = Error(err)
} else {
applier.clientSetFunc = applierFunc
var gvks map[schema.GroupVersionKind]struct{}
gvks, errs = applier.sync(context.Background(), objs)
if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff)
t.Run(tc.name, func(t *testing.T) {
u := &unstructured.Unstructured{}
u.SetGroupVersionKind(kinds.RepoSyncV1Beta1())
u.SetNamespace("test-namespace")
u.SetName("rs")

fakeClient := testingfake.NewClient(t, core.Scheme, u)
configFlags := &genericclioptions.ConfigFlags{} // unused by test applier
applierFunc := func(c client.Client, _ *genericclioptions.ConfigFlags, _ string) (*clientSet, error) {
return &clientSet{
kptApplier: newFakeApplier(tc.initErr, tc.events),
client: fakeClient,
}, tc.initErr
}
}
if tc.multiErr == nil {
if errs != nil {
t.Errorf("%s: unexpected error %v", tc.name, errs)

var errs status.MultiError
applier, err := NewNamespaceApplier(fakeClient, configFlags, "test-namespace", "rs", "", 5*time.Minute)
if err != nil {
errs = Error(err)
} else {
applier.clientSetFunc = applierFunc
var gvks map[schema.GroupVersionKind]struct{}
gvks, errs = applier.Apply(context.Background(), objs)
if diff := cmp.Diff(tc.gvks, gvks, cmpopts.EquateEmpty()); diff != "" {
t.Errorf("%s: Diff of GVK map from Apply(): %s", tc.name, diff)
}
}
} else if errs == nil {
t.Errorf("%s: expected some error, but not happened", tc.name)
} else {
actualErrs := errs.Errors()
expectedErrs := tc.multiErr.Errors()
if len(actualErrs) != len(expectedErrs) {
t.Errorf("%s: number of error is not as expected %v", tc.name, actualErrs)
if tc.multiErr == nil {
if errs != nil {
t.Errorf("%s: unexpected error %v", tc.name, errs)
}
} else if errs == nil {
t.Errorf("%s: expected some error, but not happened", tc.name)
} else {
for i, actual := range actualErrs {
expected := expectedErrs[i]
if !strings.Contains(actual.Error(), expected.Error()) || reflect.TypeOf(expected) != reflect.TypeOf(actual) {
t.Errorf("%s:\nexpected error:\n%v\nbut got:\n%v", tc.name,
indent(expected.Error(), 1),
indent(actual.Error(), 1))
actualErrs := errs.Errors()
expectedErrs := tc.multiErr.Errors()
if len(actualErrs) != len(expectedErrs) {
t.Errorf("%s: number of error is not as expected %v", tc.name, actualErrs)
} else {
for i, actual := range actualErrs {
expected := expectedErrs[i]
if !strings.Contains(actual.Error(), expected.Error()) || reflect.TypeOf(expected) != reflect.TypeOf(actual) {
t.Errorf("%s:\nexpected error:\n%v\nbut got:\n%v", tc.name,
indent(expected.Error(), 1),
indent(actual.Error(), 1))
}
}
}
}
}
})
}
}

Expand Down
Loading