Skip to content

Commit

Permalink
Reduce test flakiness
Browse files Browse the repository at this point in the history
 * Use a KubeClient in EnvTest that always hits APIServer to avoid
   cache latency and inconsistency issues that can cause races and
   intermittent test failures. See
   kubernetes-sigs/controller-runtime#1464
   and
   kubernetes-sigs/controller-runtime#343
   for details.
 * Write Status after Spec. This ensures that tests waiting for a
   status update cannot possibly see it so fast that they go on
   to perform a write that conflicts with the Spec write of the
   controller.
 * Improve log messages to be clearer (aids in test debugging).
  • Loading branch information
matthchr committed Nov 8, 2021
1 parent 940aaba commit 1b963e4
Show file tree
Hide file tree
Showing 4 changed files with 152 additions and 65 deletions.
6 changes: 2 additions & 4 deletions v2/internal/controllers/generic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func register(
For(obj).
// Note: These predicates prevent status updates from triggering a reconcile.
// to learn more look at https://pkg.go.dev/sigs.k8s.io/controller-runtime/pkg/predicate#GenerationChangedPredicate
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{}, predicate.AnnotationChangedPredicate{})).
WithEventFilter(predicate.Or(predicate.GenerationChangedPredicate{})).
WithOptions(options.Options).
Complete(reconciler)
if err != nil {
Expand Down Expand Up @@ -252,9 +252,7 @@ func (gr *GenericReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
} else if reconcilerNamespace == "" && gr.Config.PodNamespace != "" {
genruntime.AddAnnotation(metaObj, NamespaceAnnotation, gr.Config.PodNamespace)
// Setting the annotation will trigger another reconcile so we
// don't need to requeue explicitly.
return ctrl.Result{}, gr.KubeClient.Client.Update(ctx, obj)
return ctrl.Result{Requeue: true}, gr.KubeClient.Client.Update(ctx, obj)
}

// TODO: We need some factory-lookup here
Expand Down
102 changes: 45 additions & 57 deletions v2/internal/reconcilers/azure_generic_arm_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ type CreateOrUpdateAction string
const (
CreateOrUpdateActionNoAction = CreateOrUpdateAction("NoAction")
CreateOrUpdateActionManageOwnership = CreateOrUpdateAction("ManageOwnership")
CreateOrUpdateActionBeginCreation = CreateOrUpdateAction("BeginCreation")
CreateOrUpdateActionMonitorCreation = CreateOrUpdateAction("MonitorCreation")
CreateOrUpdateActionBeginCreation = CreateOrUpdateAction("BeginCreateOrUpdate")
CreateOrUpdateActionMonitorCreation = CreateOrUpdateAction("MonitorCreateOrUpdate")
)

type DeleteAction string
Expand Down Expand Up @@ -137,7 +137,7 @@ func (r *AzureDeploymentReconciler) Reconcile(ctx context.Context) (ctrl.Result,
}

func (r *AzureDeploymentReconciler) CreateOrUpdate(ctx context.Context) (ctrl.Result, error) {
r.logObj("reconciling resource")
r.logObj("reconciling resource", r.obj)

action, actionFunc, err := r.DetermineCreateOrUpdateAction()
if err != nil {
Expand All @@ -161,7 +161,7 @@ func (r *AzureDeploymentReconciler) CreateOrUpdate(ctx context.Context) (ctrl.Re
}

func (r *AzureDeploymentReconciler) Delete(ctx context.Context) (ctrl.Result, error) {
r.logObj("reconciling resource")
r.logObj("reconciling resource", r.obj)

action, actionFunc, err := r.DetermineDeleteAction()
if err != nil {
Expand Down Expand Up @@ -289,23 +289,17 @@ func (r *AzureDeploymentReconciler) makeReadyConditionFromError(cloudError *gene
return r.PositiveConditions.MakeFalseCondition(conditions.ConditionTypeReady, severity, r.obj.GetGeneration(), errorDetails.Code, errorDetails.Message)
}

func (r *AzureDeploymentReconciler) AddInitialResourceState(resourceID string) (bool, error) {
hasFinalizer := controllerutil.ContainsFinalizer(r.obj, GenericControllerFinalizer)
_, hasResourceID := genruntime.GetResourceID(r.obj)
_, hasSig := r.GetResourceSignature()

specChanged := !hasFinalizer || !hasSig || !hasResourceID

func (r *AzureDeploymentReconciler) AddInitialResourceState(resourceID string) error {
controllerutil.AddFinalizer(r.obj, GenericControllerFinalizer)
sig, err := r.SpecSignature() // nolint:govet
if err != nil {
return false, errors.Wrap(err, "failed to compute resource spec hash")
return errors.Wrap(err, "failed to compute resource spec hash")
}
r.SetResourceSignature(sig)
conditions.SetCondition(r.obj, r.PositiveConditions.Ready.Reconciling(r.obj.GetGeneration()))
genruntime.SetResourceID(r.obj, resourceID) // TODO: This is sorta weird because we can actually just get it via resolver... so this is a cached value only?

return specChanged, nil
return nil
}

func (r *AzureDeploymentReconciler) DetermineDeleteAction() (DeleteAction, DeleteActionFunc, error) {
Expand Down Expand Up @@ -366,7 +360,7 @@ func (r *AzureDeploymentReconciler) DetermineCreateOrUpdateAction() (CreateOrUpd
return CreateOrUpdateActionManageOwnership, r.ManageOwnership, nil
}

return CreateOrUpdateActionBeginCreation, r.BeginCreateResource, nil
return CreateOrUpdateActionBeginCreation, r.BeginCreateOrUpdateResource, nil
}

//////////////////////////////////////////
Expand Down Expand Up @@ -464,14 +458,14 @@ func (r *AzureDeploymentReconciler) MonitorDelete(ctx context.Context) (ctrl.Res
return ctrl.Result{}, err
}

func (r *AzureDeploymentReconciler) BeginCreateResource(ctx context.Context) (ctrl.Result, error) {
func (r *AzureDeploymentReconciler) BeginCreateOrUpdateResource(ctx context.Context) (ctrl.Result, error) {
armResource, err := r.ConvertResourceToARMResource(ctx)
if err != nil {
return ctrl.Result{}, err
}
r.log.V(Status).Info("Deploying new resource to Azure")
r.log.V(Status).Info("About to send resource to Azure")

changed, err := r.AddInitialResourceState(armResource.GetID())
err = r.AddInitialResourceState(armResource.GetID())
if err != nil {
return ctrl.Result{}, err
}
Expand All @@ -485,16 +479,11 @@ func (r *AzureDeploymentReconciler) BeginCreateResource(ctx context.Context) (ct
if err != nil {
return ctrl.Result{}, err
}
if changed {
// If adding the initial resource state changed the resource, we don't need to requeue
result.Requeue = false
result.RequeueAfter = zeroDuration
}
return result, nil
}

r.log.V(Status).Info("Began creating resource in Azure", "id", armResource.GetID())
r.recorder.Eventf(r.obj, v1.EventTypeNormal, string(CreateOrUpdateActionBeginCreation), "Began creating new resource in Azure with ID %q", armResource.GetID())
r.log.V(Status).Info("Successfully sent resource to Azure", "id", armResource.GetID())
r.recorder.Eventf(r.obj, v1.EventTypeNormal, string(CreateOrUpdateActionBeginCreation), "Successfully sent resource to Azure with ID %q", armResource.GetID())

// If we are done here it means the deployment succeeded immediately. It can't have failed because if it did
// we would have taken the err path above.
Expand All @@ -516,16 +505,7 @@ func (r *AzureDeploymentReconciler) BeginCreateResource(ctx context.Context) (ct
return ctrl.Result{}, client.IgnoreNotFound(err)
}

result := ctrl.Result{}
// TODO: Right now, because we're adding spec signature and other annotations, another event will
// TODO: be triggered. As such, we don't want to requeue this event. If we stop modifying spec we
// TODO: WILL need to requeue this event. For determinism though we really only want one event
// TODO: active at once, so commenting this out for now.
//if !deployment.IsTerminalProvisioningState() {
// result = ctrl.Result{Requeue: true}
//}

return result, nil
return ctrl.Result{Requeue: true}, nil
}

func (r *AzureDeploymentReconciler) handlePollerFailed(ctx context.Context, err error) (ctrl.Result, error) {
Expand All @@ -542,7 +522,6 @@ func (r *AzureDeploymentReconciler) handlePollerFailed(ctx context.Context, err
conditions.SetCondition(r.obj, ready)
}

_, _, hasResumeToken := r.GetPollerResumeToken()
r.SetPollerResumeToken("", "")

err = r.CommitUpdate(ctx)
Expand All @@ -553,8 +532,7 @@ func (r *AzureDeploymentReconciler) handlePollerFailed(ctx context.Context, err
}

// We probably just modified spec so don't need to requeue this
needRequeue := !hasResumeToken
return ctrl.Result{Requeue: needRequeue}, nil
return ctrl.Result{Requeue: true}, nil
}

func (r *AzureDeploymentReconciler) handlePollerSuccess(ctx context.Context) (ctrl.Result, error) {
Expand Down Expand Up @@ -595,7 +573,7 @@ func (r *AzureDeploymentReconciler) handlePollerSuccess(ctx context.Context) (ct
// The correct handling is just to ignore it and we will get an event shortly with the updated version to patch
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// We just modified spec so don't need to requeue this

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -708,13 +686,13 @@ func (r *AzureDeploymentReconciler) getStatus(ctx context.Context, id string) (g
}

// logObj logs the r.obj JSON payload
func (r *AzureDeploymentReconciler) logObj(note string) {
func (r *AzureDeploymentReconciler) logObj(note string, obj genruntime.MetaObject) {
if r.log.V(Debug).Enabled() {
// This could technically select annotations from other Azure operators, but for now that's ok.
// In the future when we no longer use annotations as heavily as we do now we can remove this or
// scope it to a finite set of annotations.
ourAnnotations := make(map[string]string)
for key, value := range r.obj.GetAnnotations() {
for key, value := range obj.GetAnnotations() {
if strings.HasSuffix(key, ".azure.com") {
ourAnnotations[key] = value
}
Expand All @@ -724,44 +702,54 @@ func (r *AzureDeploymentReconciler) logObj(note string) {
// due to possible risk of disclosing secrets or other data that is "private" and users may
// not want in logs.
r.log.V(Debug).Info(note,
"kind", r.obj.GetObjectKind(),
"resourceVersion", r.obj.GetResourceVersion(),
"generation", r.obj.GetGeneration(),
"uid", r.obj.GetUID(),
"owner", r.obj.Owner(),
"ownerReferences", r.obj.GetOwnerReferences(),
"creationTimestamp", r.obj.GetCreationTimestamp(),
"finalizers", r.obj.GetFinalizers(),
"kind", obj.GetObjectKind(),
"resourceVersion", obj.GetResourceVersion(),
"generation", obj.GetGeneration(),
"uid", obj.GetUID(),
"owner", obj.Owner(),
"ownerReferences", obj.GetOwnerReferences(),
"creationTimestamp", obj.GetCreationTimestamp(),
"finalizers", obj.GetFinalizers(),
"annotations", ourAnnotations,
// Use fmt here to ensure the output uses the String() method, which log.Info doesn't seem to do by default
"conditions", fmt.Sprintf("%s", r.obj.GetConditions()))
"conditions", fmt.Sprintf("%s", obj.GetConditions()))
}
}

// CommitUpdate persists the contents of r.obj to etcd by using the Kubernetes client.
// Note that after this method has been called, r.obj contains the result of the update
// from APIServer (including an updated resourceVersion).
func (r *AzureDeploymentReconciler) CommitUpdate(ctx context.Context) error {
// Order of updates (spec first or status first) matters here.
// If the status is updated first: clients that are waiting on status
// Condition Ready == true might see that quickly enough, and make a spec
// update fast enough, to conflict with the second write (that of the spec).
// This will trigger extra requests to Azure and fail our recording tests but is
// otherwise harmless in an actual deployment.
// We update the spec first to avoid the above problem.

// We must clone here because the result of this update could contain
// fields such as status.location that may not be set but are not omitempty.
// This will cause the contents we have in Status.Location to be overwritten.
clone := r.obj.DeepCopyObject().(client.Object)
err := r.KubeClient.Client.Status().Update(ctx, clone)

// TODO: We should stop updating spec at all, except maybe for the finalizer.
// TODO: See: https://github.com/Azure/azure-service-operator/issues/1744
err := r.KubeClient.Client.Update(ctx, clone)
if err != nil {
return errors.Wrap(err, "updating resource status")
return errors.Wrap(err, "updating resource")
}

// TODO: This is a hack so that we can update 2x in a row.
// TODO: Do away with this if/when we stop modifying spec.
r.obj.SetResourceVersion(clone.GetResourceVersion())

// TODO: We should stop updating spec at all, see: https://github.com/Azure/azure-service-operator/issues/1744
err = r.KubeClient.Client.Update(ctx, r.obj)
// Note that subsequent calls to GET can (if using a cached client) can miss the updates we've just done.
// See: https://github.com/kubernetes-sigs/controller-runtime/issues/1464.
err = r.KubeClient.Client.Status().Update(ctx, r.obj)
if err != nil {
return errors.Wrap(err, "updating resource")
return errors.Wrap(err, "updating resource status")
}

r.logObj("updated resource")
r.logObj("updated resource in etcd", r.obj)

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions v2/internal/testcommon/direct_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright (c) Microsoft Corporation.
Licensed under the MIT license.
*/

package testcommon

import (
"context"

"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)

type directClient struct {
inner client.Client
}

// NewTestClient is a thin wrapper around controller-runtime client.New, except that we
// repopulate the objects GVK since for some reason they do that in the cached client and not
// in the direct one...
func NewTestClient(config *rest.Config, options client.Options) (client.Client, error) {
inner, err := client.New(config, options)
if err != nil {
return nil, err
}

return &directClient{
inner: inner,
}, nil
}

func (d *directClient) Get(ctx context.Context, key client.ObjectKey, obj client.Object) error {
gvk, err := apiutil.GVKForObject(obj, d.inner.Scheme())
if err != nil {
return err
}
err = d.inner.Get(ctx, key, obj)
if err != nil {
return err
}

obj.GetObjectKind().SetGroupVersionKind(gvk)
return nil
}

func (d *directClient) List(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
return d.inner.List(ctx, list, opts...)
}

func (d *directClient) Create(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
return d.inner.Create(ctx, obj, opts...)
}

func (d *directClient) Delete(ctx context.Context, obj client.Object, opts ...client.DeleteOption) error {
return d.inner.Delete(ctx, obj, opts...)
}

func (d *directClient) Update(ctx context.Context, obj client.Object, opts ...client.UpdateOption) error {
return d.inner.Update(ctx, obj, opts...)
}

func (d *directClient) Patch(ctx context.Context, obj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return d.inner.Patch(ctx, obj, patch, opts...)
}

func (d *directClient) DeleteAllOf(ctx context.Context, obj client.Object, opts ...client.DeleteAllOfOption) error {
return d.inner.DeleteAllOf(ctx, obj, opts...)
}

func (d *directClient) Status() client.StatusWriter {
return d.inner.Status()
}

func (d *directClient) Scheme() *runtime.Scheme {
return d.inner.Scheme()
}

func (d *directClient) RESTMapper() meta.RESTMapper {
return d.inner.RESTMapper()
}

var _ client.Client = &directClient{}
23 changes: 19 additions & 4 deletions v2/internal/testcommon/kube_test_context_envtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/envtest"

Expand Down Expand Up @@ -65,10 +66,24 @@ func createSharedEnvTest(cfg testConfig, namespaceResources *namespaceResources)

log.Println("Creating & starting controller-runtime manager")
mgr, err := ctrl.NewManager(kubeConfig, ctrl.Options{
Scheme: controllers.CreateScheme(),
CertDir: environment.WebhookInstallOptions.LocalServingCertDir,
Port: environment.WebhookInstallOptions.LocalServingPort,
EventBroadcaster: record.NewBroadcasterForTests(1 * time.Second),
Scheme: controllers.CreateScheme(),
CertDir: environment.WebhookInstallOptions.LocalServingCertDir,
Port: environment.WebhookInstallOptions.LocalServingPort,
EventBroadcaster: record.NewBroadcasterForTests(1 * time.Second),
NewClient: func(_ cache.Cache, config *rest.Config, options client.Options, _ ...client.Object) (client.Client, error) {
// We bypass the caching client for tests, see https://github.com/kubernetes-sigs/controller-runtime/issues/343 and
// https://github.com/kubernetes-sigs/controller-runtime/issues/1464 for details. Specifically:
// https://github.com/kubernetes-sigs/controller-runtime/issues/343#issuecomment-469435686 which states:
// "ah, yeah, this is probably a bit of a confusing statement,
// but don't use the manager client in tests. The manager-provided client is designed
// to do the right thing for controllers by default (which is to read from caches, meaning that it's not strongly consistent),
// which means it probably does the wrong thing for tests (which almost certainly want strong consistency)."

// It's possible that if we do https://github.com/Azure/azure-service-operator/issues/1891, we can go back
// to using the default (cached) client, as the main problem with using it is that it can introduce inconsistency
// in test request counts that cause intermittent test failures.
return NewTestClient(config, options)
},
MetricsBindAddress: "0", // disable serving metrics, or else we get conflicts listening on same port 8080
NewCache: cacheFunc,
})
Expand Down

0 comments on commit 1b963e4

Please sign in to comment.