Skip to content

Commit

Permalink
Retry on transient error when waiting for snapshot to be ready (#2508)
Browse files Browse the repository at this point in the history
* Retry on transient error when waiting for snapshot to be ready

CSI snapshot controller might add errors to the snapshot status which it will
recover from.

Make snapshotter.WaitOnReadyToUse retry (up to 100 times) on those errors.
Backoff mechanism makes it so 100 retries is minutes, hopefuly should be enough for
most cases.

Unfortunately CSI snapshotter uses strings to inform of error reason and
does not provide error code or type when reporting, hence for now we use regexp to
match on transient error. If CSI snapshotter uses better error format in the future,
we can also change that.

* Refactor snapshotter.WaitOnReadyToUse to deduplicate retries

Move retry logic to snapshot.go

* Formatting and code cleanup

* Formatting

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
hairyhum and mergify[bot] committed Dec 8, 2023
1 parent 4a70b36 commit 4aefd25
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 40 deletions.
33 changes: 33 additions & 0 deletions pkg/kube/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ package snapshot

import (
"context"
"regexp"

v1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1"
"github.com/kanisterio/kanister/pkg/poll"
)

// Snapshotter is an interface that describes snapshot operations
Expand Down Expand Up @@ -153,3 +157,32 @@ func NewSnapshotter(kubeCli kubernetes.Interface, dynCli dynamic.Interface) (Sna
}
return nil, errors.New("Snapshot resources not supported")
}

// We use regexp to match because errors written in vs.Status.Error.Message are strings
// and we don't have any status code or other metadata in there.
var transientErrorRegexp = regexp.MustCompile("the object has been modified; please apply your changes to the latest version and try again")

// Use regexp to detect resource conflict error
// If CSI snapshotter changes error reporting to use more structured errors,
// we can improve this function to parse and recognise error codes or types.
func isTransientError(err error) bool {
return transientErrorRegexp.MatchString(err.Error())
}

func waitOnReadyToUse(
ctx context.Context,
dynCli dynamic.Interface,
snapGVR schema.GroupVersionResource,
snapshotName,
namespace string,
isReadyFunc func(*unstructured.Unstructured) (bool, error),
) error {
retries := 100
return poll.WaitWithRetries(ctx, retries, isTransientError, func(context.Context) (bool, error) {
us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
return isReadyFunc(us)
})
}
29 changes: 13 additions & 16 deletions pkg/kube/snapshot/snapshot_alpha.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1"
"github.com/kanisterio/kanister/pkg/poll"
)

const (
Expand Down Expand Up @@ -318,24 +317,22 @@ func (sna *SnapshotAlpha) CreateContentFromSource(ctx context.Context, source *S
return nil
}

func isReadyToUseAlpha(us *unstructured.Unstructured) (bool, error) {
vs := v1alpha1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
}

// WaitOnReadyToUse will block until the Volumesnapshot in namespace 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
func (sna *SnapshotAlpha) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return poll.Wait(ctx, func(context.Context) (bool, error) {
us, err := sna.dynCli.Resource(v1alpha1.VolSnapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
vs := v1alpha1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
})
return waitOnReadyToUse(ctx, sna.dynCli, v1alpha1.VolSnapGVR, snapshotName, namespace, isReadyToUseAlpha)
}

func (sna *SnapshotAlpha) GroupVersion(ctx context.Context) schema.GroupVersion {
Expand Down
38 changes: 15 additions & 23 deletions pkg/kube/snapshot/snapshot_beta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1"
"github.com/kanisterio/kanister/pkg/poll"
)

type SnapshotBeta struct {
Expand Down Expand Up @@ -100,7 +99,7 @@ func createSnapshot(ctx context.Context, dynCli dynamic.Interface, kubeCli kuber
return nil
}

if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace); err != nil {
if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace, isReadyToUseBeta); err != nil {
return err
}

Expand Down Expand Up @@ -301,29 +300,22 @@ func (sna *SnapshotBeta) CreateContentFromSource(ctx context.Context, source *So
// WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
func (sna *SnapshotBeta) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace)
return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace, isReadyToUseBeta)
}

func waitOnReadyToUse(ctx context.Context, dynCli dynamic.Interface, snapGVR schema.GroupVersionResource, snapshotName, namespace string) error {
return poll.Wait(ctx, func(context.Context) (bool, error) {
us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
vs := v1beta1.VolumeSnapshot{}
err = TransformUnstructured(us, &vs)
if err != nil {
return false, err
}
if vs.Status == nil {
return false, nil
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(*vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
})
func isReadyToUseBeta(us *unstructured.Unstructured) (bool, error) {
vs := v1beta1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
if vs.Status == nil {
return false, nil
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(*vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
}

func getSnapshotContent(ctx context.Context, dynCli dynamic.Interface, snapContentGVR schema.GroupVersionResource, contentName string) (*v1.VolumeSnapshotContent, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kube/snapshot/snapshot_stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (sna *SnapshotStable) CreateContentFromSource(ctx context.Context, source *
// WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
func (sna *SnapshotStable) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace)
return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace, isReadyToUseBeta)
}

func (sna *SnapshotStable) GroupVersion(ctx context.Context) schema.GroupVersion {
Expand Down
186 changes: 186 additions & 0 deletions pkg/kube/snapshot/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,192 @@ func (s *SnapshotTestSuite) TestVolumeSnapshotCloneFake(c *C) {
}
}

func (s *SnapshotTestSuite) TestWaitOnReadyToUse(c *C) {
snapshotNameBase := "snap-1-fake"
volName := "pvc-1-fake"
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1alpha1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1beta1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})

fakeCli := fake.NewSimpleClientset()

size, err := resource.ParseQuantity("1Gi")
c.Assert(err, IsNil)

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: volName,
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
},
}
_, err = fakeCli.CoreV1().PersistentVolumeClaims(defaultNamespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
c.Assert(err, IsNil)

dynCli := dynfake.NewSimpleDynamicClient(scheme)

for _, fakeSs := range []snapshot.Snapshotter{
snapshot.NewSnapshotAlpha(fakeCli, dynCli),
snapshot.NewSnapshotBeta(fakeCli, dynCli),
snapshot.NewSnapshotStable(fakeCli, dynCli),
} {
ctx := context.Background()

var volumeSnapshotGVR schema.GroupVersionResource
var snapshotName string
switch fakeSs.(type) {
case *snapshot.SnapshotAlpha:
volumeSnapshotGVR = v1alpha1.VolSnapGVR
snapshotName = snapshotNameBase + "-alpha"
case *snapshot.SnapshotBeta:
volumeSnapshotGVR = v1beta1.VolSnapGVR
snapshotName = snapshotNameBase + "-beta"
case *snapshot.SnapshotStable:
volumeSnapshotGVR = snapshot.VolSnapGVR
snapshotName = snapshotNameBase + "-stable"
}

err = fakeSs.Create(ctx, snapshotName, defaultNamespace, volName, &fakeClass, false, nil)
c.Assert(err, IsNil)

// This function should timeout
timeout := 500 * time.Millisecond
bgTimeout := 5 * time.Second
// We don't have readyToUse and no error, waiting indefinitely
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*context deadline exceeded*")

reply := waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout)
setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace)
select {
case err = <-reply:
c.Assert(err, IsNil)
case <-time.After(2 * time.Second):
c.Error("timeout waiting on ready to use")
}

setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, nil)

// Set non-transient error
message := "some error"
setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message)

// If there is non-transient error, exit right away
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*some error.*")

// Set transient error
message = "the object has been modified; please apply your changes to the latest version and try again"
setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message)

// If there is a transient error, wait with exp backoff which is long
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*context deadline exceeded*")

reply = waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout)
setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace)
select {
case err = <-reply:
c.Assert(err, IsNil)
case <-time.After(2 * time.Second):
c.Error("timeout waiting on ready to use")
}
}
}

// Helpers to work with volume snapshot status used in TestWaitOnReadyToUse
// ----------------------------------------------------------------------------

func waitOnReadyToUseInBackground(
c *C,
ctx context.Context,
fakeSs snapshot.Snapshotter,
snapshotName string,
namespace string,
timeout time.Duration,
) chan error {
reply := make(chan error)
go func() {
err := waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, namespace, timeout)
reply <- err
}()
return reply
}

func waitOnReadyToUseWithTimeout(
c *C,
ctx context.Context,
fakeSs snapshot.Snapshotter,
snapshotName string,
namespace string,
timeout time.Duration,
) error {
deadline := time.Now().Add(timeout)
deadlineCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

err := fakeSs.WaitOnReadyToUse(deadlineCtx, snapshotName, defaultNamespace)
return err
}

func setReadyStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName string,
namespace string,
) {
status := make(map[string]interface{})
status["readyToUse"] = true
status["creationTime"] = time.Now().Format(time.RFC3339)

setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status)
}

func setErrorStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName string,
namespace string,
message string,
) {
status := make(map[string]interface{})
status["Error"] = map[string]interface{}{
"Message": message,
}
setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status)
}

func setVolumeSnapshotStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName string,
namespace string,
status map[string]interface{},
) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

us, err := dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
c.Assert(err, IsNil)
us.Object["status"] = status
_, err = dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).UpdateStatus(ctx, us, metav1.UpdateOptions{})
c.Assert(err, IsNil)
}

// ----------------------------------------------------------------------------

func (s *SnapshotTestSuite) TestVolumeSnapshotAlpha(c *C) {
if s.snapshotClassAlpha == nil {
c.Skip("No v1alpha1 Volumesnapshotclass in the cluster")
Expand Down

0 comments on commit 4aefd25

Please sign in to comment.