diff --git a/pkg/kube/snapshot/snapshot.go b/pkg/kube/snapshot/snapshot.go index 8e9f6b7a74..f489ca8662 100644 --- a/pkg/kube/snapshot/snapshot.go +++ b/pkg/kube/snapshot/snapshot.go @@ -16,9 +16,12 @@ package snapshot import ( "context" + "regexp" v1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1" "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" @@ -26,6 +29,7 @@ import ( "github.com/kanisterio/kanister/pkg/kube" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1" + "github.com/kanisterio/kanister/pkg/poll" ) // Snapshotter is an interface that describes snapshot operations @@ -153,3 +157,32 @@ func NewSnapshotter(kubeCli kubernetes.Interface, dynCli dynamic.Interface) (Sna } return nil, errors.New("Snapshot resources not supported") } + +// We use regexp to match because errors written in vs.Status.Error.Message are strings +// and we don't have any status code or other metadata in there. +var transientErrorRegexp = regexp.MustCompile("the object has been modified; please apply your changes to the latest version and try again") + +// Use regexp to detect resource conflict error +// If CSI snapshotter changes error reporting to use more structured errors, +// we can improve this function to parse and recognise error codes or types. +func isTransientError(err error) bool { + return transientErrorRegexp.MatchString(err.Error()) +} + +func waitOnReadyToUse( + ctx context.Context, + dynCli dynamic.Interface, + snapGVR schema.GroupVersionResource, + snapshotName, + namespace string, + isReadyFunc func(*unstructured.Unstructured) (bool, error), +) error { + retries := 100 + return poll.WaitWithRetries(ctx, retries, isTransientError, func(context.Context) (bool, error) { + us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) + if err != nil { + return false, err + } + return isReadyFunc(us) + }) +} diff --git a/pkg/kube/snapshot/snapshot_alpha.go b/pkg/kube/snapshot/snapshot_alpha.go index 4b9528b3fa..e1bc3c70e2 100644 --- a/pkg/kube/snapshot/snapshot_alpha.go +++ b/pkg/kube/snapshot/snapshot_alpha.go @@ -33,7 +33,6 @@ import ( "github.com/kanisterio/kanister/pkg/blockstorage" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1" - "github.com/kanisterio/kanister/pkg/poll" ) const ( @@ -318,24 +317,22 @@ func (sna *SnapshotAlpha) CreateContentFromSource(ctx context.Context, source *S return nil } +func isReadyToUseAlpha(us *unstructured.Unstructured) (bool, error) { + vs := v1alpha1.VolumeSnapshot{} + if err := TransformUnstructured(us, &vs); err != nil { + return false, err + } + // Error can be set while waiting for creation + if vs.Status.Error != nil { + return false, errors.New(vs.Status.Error.Message) + } + return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil +} + // WaitOnReadyToUse will block until the Volumesnapshot in namespace 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotAlpha) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return poll.Wait(ctx, func(context.Context) (bool, error) { - us, err := sna.dynCli.Resource(v1alpha1.VolSnapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) - if err != nil { - return false, err - } - vs := v1alpha1.VolumeSnapshot{} - if err := TransformUnstructured(us, &vs); err != nil { - return false, err - } - // Error can be set while waiting for creation - if vs.Status.Error != nil { - return false, errors.New(vs.Status.Error.Message) - } - return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil - }) + return waitOnReadyToUse(ctx, sna.dynCli, v1alpha1.VolSnapGVR, snapshotName, namespace, isReadyToUseAlpha) } func (sna *SnapshotAlpha) GroupVersion(ctx context.Context) schema.GroupVersion { diff --git a/pkg/kube/snapshot/snapshot_beta.go b/pkg/kube/snapshot/snapshot_beta.go index 2860ec8b81..2cbd35478d 100644 --- a/pkg/kube/snapshot/snapshot_beta.go +++ b/pkg/kube/snapshot/snapshot_beta.go @@ -32,7 +32,6 @@ import ( "github.com/kanisterio/kanister/pkg/blockstorage" "github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1" - "github.com/kanisterio/kanister/pkg/poll" ) type SnapshotBeta struct { @@ -100,7 +99,7 @@ func createSnapshot(ctx context.Context, dynCli dynamic.Interface, kubeCli kuber return nil } - if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace); err != nil { + if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace, isReadyToUseBeta); err != nil { return err } @@ -301,29 +300,22 @@ func (sna *SnapshotBeta) CreateContentFromSource(ctx context.Context, source *So // WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotBeta) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace) + return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace, isReadyToUseBeta) } -func waitOnReadyToUse(ctx context.Context, dynCli dynamic.Interface, snapGVR schema.GroupVersionResource, snapshotName, namespace string) error { - return poll.Wait(ctx, func(context.Context) (bool, error) { - us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) - if err != nil { - return false, err - } - vs := v1beta1.VolumeSnapshot{} - err = TransformUnstructured(us, &vs) - if err != nil { - return false, err - } - if vs.Status == nil { - return false, nil - } - // Error can be set while waiting for creation - if vs.Status.Error != nil { - return false, errors.New(*vs.Status.Error.Message) - } - return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil - }) +func isReadyToUseBeta(us *unstructured.Unstructured) (bool, error) { + vs := v1beta1.VolumeSnapshot{} + if err := TransformUnstructured(us, &vs); err != nil { + return false, err + } + if vs.Status == nil { + return false, nil + } + // Error can be set while waiting for creation + if vs.Status.Error != nil { + return false, errors.New(*vs.Status.Error.Message) + } + return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil } func getSnapshotContent(ctx context.Context, dynCli dynamic.Interface, snapContentGVR schema.GroupVersionResource, contentName string) (*v1.VolumeSnapshotContent, error) { diff --git a/pkg/kube/snapshot/snapshot_stable.go b/pkg/kube/snapshot/snapshot_stable.go index 88996ec296..eec973f6ae 100644 --- a/pkg/kube/snapshot/snapshot_stable.go +++ b/pkg/kube/snapshot/snapshot_stable.go @@ -176,7 +176,7 @@ func (sna *SnapshotStable) CreateContentFromSource(ctx context.Context, source * // WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName' // has status 'ReadyToUse' or 'ctx.Done()' is signalled. func (sna *SnapshotStable) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error { - return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace) + return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace, isReadyToUseBeta) } func (sna *SnapshotStable) GroupVersion(ctx context.Context) schema.GroupVersion { diff --git a/pkg/kube/snapshot/snapshot_test.go b/pkg/kube/snapshot/snapshot_test.go index 9335f0e16a..72e48fc848 100644 --- a/pkg/kube/snapshot/snapshot_test.go +++ b/pkg/kube/snapshot/snapshot_test.go @@ -364,6 +364,192 @@ func (s *SnapshotTestSuite) TestVolumeSnapshotCloneFake(c *C) { } } +func (s *SnapshotTestSuite) TestWaitOnReadyToUse(c *C) { + snapshotNameBase := "snap-1-fake" + volName := "pvc-1-fake" + scheme := runtime.NewScheme() + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1alpha1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1beta1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{}) + + fakeCli := fake.NewSimpleClientset() + + size, err := resource.ParseQuantity("1Gi") + c.Assert(err, IsNil) + + pvc := &corev1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: volName, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: size, + }, + }, + }, + } + _, err = fakeCli.CoreV1().PersistentVolumeClaims(defaultNamespace).Create(context.TODO(), pvc, metav1.CreateOptions{}) + c.Assert(err, IsNil) + + dynCli := dynfake.NewSimpleDynamicClient(scheme) + + for _, fakeSs := range []snapshot.Snapshotter{ + snapshot.NewSnapshotAlpha(fakeCli, dynCli), + snapshot.NewSnapshotBeta(fakeCli, dynCli), + snapshot.NewSnapshotStable(fakeCli, dynCli), + } { + ctx := context.Background() + + var volumeSnapshotGVR schema.GroupVersionResource + var snapshotName string + switch fakeSs.(type) { + case *snapshot.SnapshotAlpha: + volumeSnapshotGVR = v1alpha1.VolSnapGVR + snapshotName = snapshotNameBase + "-alpha" + case *snapshot.SnapshotBeta: + volumeSnapshotGVR = v1beta1.VolSnapGVR + snapshotName = snapshotNameBase + "-beta" + case *snapshot.SnapshotStable: + volumeSnapshotGVR = snapshot.VolSnapGVR + snapshotName = snapshotNameBase + "-stable" + } + + err = fakeSs.Create(ctx, snapshotName, defaultNamespace, volName, &fakeClass, false, nil) + c.Assert(err, IsNil) + + // This function should timeout + timeout := 500 * time.Millisecond + bgTimeout := 5 * time.Second + // We don't have readyToUse and no error, waiting indefinitely + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*context deadline exceeded*") + + reply := waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout) + setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace) + select { + case err = <-reply: + c.Assert(err, IsNil) + case <-time.After(2 * time.Second): + c.Error("timeout waiting on ready to use") + } + + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, nil) + + // Set non-transient error + message := "some error" + setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message) + + // If there is non-transient error, exit right away + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*some error.*") + + // Set transient error + message = "the object has been modified; please apply your changes to the latest version and try again" + setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message) + + // If there is a transient error, wait with exp backoff which is long + err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout) + c.Assert(err, NotNil) + c.Assert(err.Error(), Matches, ".*context deadline exceeded*") + + reply = waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout) + setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace) + select { + case err = <-reply: + c.Assert(err, IsNil) + case <-time.After(2 * time.Second): + c.Error("timeout waiting on ready to use") + } + } +} + +// Helpers to work with volume snapshot status used in TestWaitOnReadyToUse +// ---------------------------------------------------------------------------- + +func waitOnReadyToUseInBackground( + c *C, + ctx context.Context, + fakeSs snapshot.Snapshotter, + snapshotName string, + namespace string, + timeout time.Duration, +) chan error { + reply := make(chan error) + go func() { + err := waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, namespace, timeout) + reply <- err + }() + return reply +} + +func waitOnReadyToUseWithTimeout( + c *C, + ctx context.Context, + fakeSs snapshot.Snapshotter, + snapshotName string, + namespace string, + timeout time.Duration, +) error { + deadline := time.Now().Add(timeout) + deadlineCtx, cancel := context.WithDeadline(ctx, deadline) + defer cancel() + + err := fakeSs.WaitOnReadyToUse(deadlineCtx, snapshotName, defaultNamespace) + return err +} + +func setReadyStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, +) { + status := make(map[string]interface{}) + status["readyToUse"] = true + status["creationTime"] = time.Now().Format(time.RFC3339) + + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status) +} + +func setErrorStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, + message string, +) { + status := make(map[string]interface{}) + status["Error"] = map[string]interface{}{ + "Message": message, + } + setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status) +} + +func setVolumeSnapshotStatus( + c *C, + dynCli *dynfake.FakeDynamicClient, + volumeSnapshotGVR schema.GroupVersionResource, + snapshotName string, + namespace string, + status map[string]interface{}, +) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + us, err := dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{}) + c.Assert(err, IsNil) + us.Object["status"] = status + _, err = dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).UpdateStatus(ctx, us, metav1.UpdateOptions{}) + c.Assert(err, IsNil) +} + +// ---------------------------------------------------------------------------- + func (s *SnapshotTestSuite) TestVolumeSnapshotAlpha(c *C) { if s.snapshotClassAlpha == nil { c.Skip("No v1alpha1 Volumesnapshotclass in the cluster")