From 4aefd250dbe3b87edab7f9e384fba9847dfbe078 Mon Sep 17 00:00:00 2001 From: Daniil Fedotov Date: Fri, 8 Dec 2023 16:34:18 -0500 Subject: [PATCH] Retry on transient error when waiting for snapshot to be ready (#2508) * Retry on transient error when waiting for snapshot to be ready CSI snapshot controller might add errors to the snapshot status which it will recover from. Make snapshotter.WaitOnReadyToUse retry (up to 100 times) on those errors. Backoff mechanism makes it so 100 retries is minutes, hopefuly should be enough for most cases. Unfortunately CSI snapshotter uses strings to inform of error reason and does not provide error code or type when reporting, hence for now we use regexp to match on transient error. If CSI snapshotter uses better error format in the future, we can also change that. * Refactor snapshotter.WaitOnReadyToUse to deduplicate retries Move retry logic to snapshot.go * Formatting and code cleanup * Formatting --------- Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- pkg/kube/snapshot/snapshot.go | 33 +++++ pkg/kube/snapshot/snapshot_alpha.go | 29 ++--- pkg/kube/snapshot/snapshot_beta.go | 38 +++--- pkg/kube/snapshot/snapshot_stable.go | 2 +- pkg/kube/snapshot/snapshot_test.go | 186 +++++++++++++++++++++++++++ 5 files changed, 248 insertions(+), 40 deletions(-) diff --git a/pkg/kube/snapshot/snapshot.go b/pkg/kube/snapshot/snapshot.go index 8e9f6b7a74..f489ca8662 100644 --- a/pkg/kube/snapshot/snapshot.go +++ b/pkg/kube/snapshot/snapshot.go @@ -16,9 +16,12 @@ package snapshot import ( "context" + "regexp" v1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -26,6 +29,7 @@ import ( "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1" + "github.com/kanisterio/kanister/pkg/poll" ) // Snapshotter is an interface that describes snapshot operations @@ -153,3 +157,32 @@ func NewSnapshotter(kubeCli kubernetes.Interface, dynCli dynamic.Interface) (Sna } return nil, errors.New("Snapshot resources not supported") } + +// We use regexp to match because errors written in vs.Status.Error.Message are strings +// and we don't have any status code or other metadata in there. +var transientErrorRegexp = regexp.MustCompile("the object has been modified; please apply your changes to the latest version and try again") + +// Use regexp to detect resource conflict error +// If CSI snapshotter changes error reporting to use more structured errors, +// we can improve this function to parse and recognise error codes or types. +func isTransientError(err error) bool { + return transientErrorRegexp.MatchString(err.Error()) +} + +func waitOnReadyToUse( + ctx context.Context, + dynCli dynamic.Interface, + snapGVR schema.GroupVersionResource, + snapshotName, + namespace string, + isReadyFunc func(*unstructured.Unstructured) (bool, error), +) error { + retries := 100 + return poll.WaitWithRetries(ctx, retries, isTransientError, func(context.Context) (bool, error) { + us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) + if err != nil { + return false, err + } + return isReadyFunc(us) + }) +} diff --git a/pkg/kube/snapshot/snapshot_alpha.go b/pkg/kube/snapshot/snapshot_alpha.go index 4b9528b3fa..e1bc3c70e2 100644 --- a/pkg/kube/snapshot/snapshot_alpha.go +++ b/pkg/kube/snapshot/snapshot_alpha.go @@ -33,7 +33,6 @@ import ( "github.com/kanisterio/kanister/pkg/blockstorage" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1" - "github.com/kanisterio/kanister/pkg/poll" ) const ( @@ -318,24 +317,22 @@ func (sna *SnapshotAlpha) CreateContentFromSource(ctx context.Context, source *S return nil } +func isReadyToUseAlpha(us *unstructured.Unstructured) (bool, error) { + vs := v1alpha1.VolumeSnapshot{} + if err := TransformUnstructured(us, &vs); err != nil { + return false, err + } + // Error can be set while waiting for creation + if vs.Status.Error != nil { + return false, errors.New(vs.Status.Error.Message) + } + return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil +} + // WaitOnReadyToUse will block until the Volumesnapshot in namespace 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotAlpha) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return poll.Wait(ctx, func(context.Context) (bool, error) { - us, err := sna.dynCli.Resource(v1alpha1.VolSnapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) - if err != nil { - return false, err - } - vs := v1alpha1.VolumeSnapshot{} - if err := TransformUnstructured(us, &vs); err != nil { - return false, err - } - // Error can be set while waiting for creation - if vs.Status.Error != nil { - return false, errors.New(vs.Status.Error.Message) - } - return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil - }) + return waitOnReadyToUse(ctx, sna.dynCli, v1alpha1.VolSnapGVR, snapshotName, namespace, isReadyToUseAlpha) } func (sna *SnapshotAlpha) GroupVersion(ctx context.Context) schema.GroupVersion { diff --git a/pkg/kube/snapshot/snapshot_beta.go b/pkg/kube/snapshot/snapshot_beta.go index 2860ec8b81..2cbd35478d 100644 --- a/pkg/kube/snapshot/snapshot_beta.go +++ b/pkg/kube/snapshot/snapshot_beta.go @@ -32,7 +32,6 @@ import ( "github.com/kanisterio/kanister/pkg/blockstorage" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1" - "github.com/kanisterio/kanister/pkg/poll" ) type SnapshotBeta struct { @@ -100,7 +99,7 @@ func createSnapshot(ctx context.Context, dynCli dynamic.Interface, kubeCli kuber return nil } - if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace); err != nil { + if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace, isReadyToUseBeta); err != nil { return err } @@ -301,29 +300,22 @@ func (sna *SnapshotBeta) CreateContentFromSource(ctx context.Context, source *So // WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotBeta) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace) + return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace, isReadyToUseBeta) } -func waitOnReadyToUse(ctx context.Context, dynCli dynamic.Interface, snapGVR schema.GroupVersionResource, snapshotName, namespace string) error { - return poll.Wait(ctx, func(context.Context) (bool, error) { - us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) - if err != nil { - return false, err - } - vs := v1beta1.VolumeSnapshot{} - err = TransformUnstructured(us, &vs) - if err != nil { - return false, err - } - if vs.Status == nil { - return false, nil - } - // Error can be set while waiting for creation - if vs.Status.Error != nil { - return false, errors.New(*vs.Status.Error.Message) - } - return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil - }) +func isReadyToUseBeta(us *unstructured.Unstructured) (bool, error) { + vs := v1beta1.VolumeSnapshot{} + if err := TransformUnstructured(us, &vs); err != nil { + return false, err + } + if vs.Status == nil { + return false, nil + } + // Error can be set while waiting for creation + if vs.Status.Error != nil { + return false, errors.New(*vs.Status.Error.Message) + } + return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil } func getSnapshotContent(ctx context.Context, dynCli dynamic.Interface, snapContentGVR schema.GroupVersionResource, contentName string) (*v1.VolumeSnapshotContent, error) { diff --git a/pkg/kube/snapshot/snapshot_stable.go b/pkg/kube/snapshot/snapshot_stable.go index 88996ec296..eec973f6ae 100644 --- a/pkg/kube/snapshot/snapshot_stable.go +++ b/pkg/kube/snapshot/snapshot_stable.go @@ -176,7 +176,7 @@ func (sna *SnapshotStable) CreateContentFromSource(ctx context.Context, source * // WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotStable) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace) + return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace, isReadyToUseBeta) } func (sna *SnapshotStable) GroupVersion(ctx context.Context) schema.GroupVersion { diff --git a/pkg/kube/snapshot/snapshot_test.go b/pkg/kube/snapshot/snapshot_test.go index 9335f0e16a..72e48fc848 100644 --- a/pkg/kube/snapshot/snapshot_test.go +++ b/pkg/kube/snapshot/snapshot_test.go @@ -364,6 +364,192 @@ func (s *SnapshotTestSuite) TestVolumeSnapshotCloneFake(c *C) { } } +func (s *SnapshotTestSuite) TestWaitOnReadyToUse(c *C) { + snapshotNameBase := "snap-1-fake" + volName := "pvc-1-fake" + scheme := runtime.NewScheme() + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1alpha1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1beta1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + + fakeCli := fake.NewSimpleClientset() + + size, err := resource.ParseQuantity("1Gi") + c.Assert(err, IsNil) + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: volName, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: size, + }, + }, + }, + } + _, err = fakeCli.CoreV1().PersistentVolumeClaims(defaultNamespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) + c.Assert(err, IsNil) + + dynCli := dynfake.NewSimpleDynamicClient(scheme) + + for _, fakeSs := range []snapshot.Snapshotter{ + snapshot.NewSnapshotAlpha(fakeCli, dynCli), + snapshot.NewSnapshotBeta(fakeCli, dynCli), + snapshot.NewSnapshotStable(fakeCli, dynCli), + } { + ctx := context.Background() + + var volumeSnapshotGVR schema.GroupVersionResource + var snapshotName string + switch fakeSs.(type) { + case *snapshot.SnapshotAlpha: + volumeSnapshotGVR = v1alpha1.VolSnapGVR + snapshotName = snapshotNameBase + "-alpha" + case *snapshot.SnapshotBeta: + volumeSnapshotGVR = v1beta1.VolSnapGVR + snapshotName = snapshotNameBase + "-beta" + case *snapshot.SnapshotStable: + volumeSnapshotGVR = snapshot.VolSnapGVR + snapshotName = snapshotNameBase + "-stable" + } + + err = fakeSs.Create(ctx, snapshotName, defaultNamespace, volName, &fakeClass, false, nil) + c.Assert(err, IsNil) + + // This function should timeout + timeout := 500 * time.Millisecond + bgTimeout := 5 * time.Second + // We don't have readyToUse and no error, waiting indefinitely + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*context deadline exceeded*") + + reply := waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout) + setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace) + select { + case err = <-reply: + c.Assert(err, IsNil) + case <-time.After(2 * time.Second): + c.Error("timeout waiting on ready to use") + } + + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, nil) + + // Set non-transient error + message := "some error" + setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message) + + // If there is non-transient error, exit right away + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*some error.*") + + // Set transient error + message = "the object has been modified; please apply your changes to the latest version and try again" + setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message) + + // If there is a transient error, wait with exp backoff which is long + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*context deadline exceeded*") + + reply = waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout) + setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace) + select { + case err = <-reply: + c.Assert(err, IsNil) + case <-time.After(2 * time.Second): + c.Error("timeout waiting on ready to use") + } + } +} + +// Helpers to work with volume snapshot status used in TestWaitOnReadyToUse +// ---------------------------------------------------------------------------- + +func waitOnReadyToUseInBackground( + c *C, + ctx context.Context, + fakeSs snapshot.Snapshotter, + snapshotName string, + namespace string, + timeout time.Duration, +) chan error { + reply := make(chan error) + go func() { + err := waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, namespace, timeout) + reply <- err + }() + return reply +} + +func waitOnReadyToUseWithTimeout( + c *C, + ctx context.Context, + fakeSs snapshot.Snapshotter, + snapshotName string, + namespace string, + timeout time.Duration, +) error { + deadline := time.Now().Add(timeout) + deadlineCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + err := fakeSs.WaitOnReadyToUse(deadlineCtx, snapshotName, defaultNamespace) + return err +} + +func setReadyStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, +) { + status := make(map[string]interface{}) + status["readyToUse"] = true + status["creationTime"] = time.Now().Format(time.RFC3339) + + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status) +} + +func setErrorStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, + message string, +) { + status := make(map[string]interface{}) + status["Error"] = map[string]interface{}{ + "Message": message, + } + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status) +} + +func setVolumeSnapshotStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, + status map[string]interface{}, +) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + us, err := dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) + c.Assert(err, IsNil) + us.Object["status"] = status + _, err = dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).UpdateStatus(ctx, us, metav1.UpdateOptions{}) + c.Assert(err, IsNil) +} + +// ---------------------------------------------------------------------------- + func (s *SnapshotTestSuite) TestVolumeSnapshotAlpha(c *C) { if s.snapshotClassAlpha == nil { c.Skip("No v1alpha1 Volumesnapshotclass in the cluster")