Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry on transient error when waiting for snapshot to be ready #2508

Merged
merged 5 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pkg/kube/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,20 @@ package snapshot

import (
"context"
"regexp"

v1 "github.com/kubernetes-csi/external-snapshotter/client/v4/apis/volumesnapshot/v1"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1"
"github.com/kanisterio/kanister/pkg/poll"
)

// Snapshotter is an interface that describes snapshot operations
Expand Down Expand Up @@ -153,3 +157,25 @@ func NewSnapshotter(kubeCli kubernetes.Interface, dynCli dynamic.Interface) (Sna
}
return nil, errors.New("Snapshot resources not supported")
}

// We use regexp to match because errors written in vs.Status.Error.Message are strings
// and we don't have any status code or other metadata in there.
var transientErrorRegexp = regexp.MustCompile("the object has been modified; please apply your changes to the latest version and try again")

// Use regexp to detect resource conflict error
// If CSI snapshotter changes error reporting to use more structured errors,
// we can improve this function to parse and recognise error codes or types.
func isTransientError(err error) bool {
return transientErrorRegexp.MatchString(err.Error())
}

func waitOnReadyToUse(ctx context.Context, dynCli dynamic.Interface, snapGVR schema.GroupVersionResource, snapshotName, namespace string, isReadyFunc func(*unstructured.Unstructured) (bool, error)) error {
hairyhum marked this conversation as resolved.
Show resolved Hide resolved
retries := 100
return poll.WaitWithRetries(ctx, retries, isTransientError, func(context.Context) (bool, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this the right poll to use here? Why is 100 retries the correct value?

It appears to me that there is currently only 1 other file where Kanister code uses poll.WaitWithRetries.

It is more common to use poll.Wait, sometimes with the ctx set to have a timeout. Or to call poll.WaitWithBackoff and a maximum time set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could do that. The difference here is that we only retry on this specific type of error, while poll.WaitWithBackoff would not distinguish that.
The 100 value is pretty arbitrary just to avoid the infinite wait. We could use poll.WaitWithBackoffWithRetries to configure max time to wait as well, but I don't know what's the good value in this context.

Current setting for the context timeout is infinity because there's a long-running process. Inserting a new time limit here would mean potentially creating a new failure scenario. Setting retries would worst case result in similar behaviour to what we have right now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @pavannd1 indicated we can check this in as is and iterate on the polling mechanism-- this will improve robustness as is.

We can retry only on a specific error when using poll.Wait or poll.WaitWithBackoff by making the check for that error within the function called by poll. See for example how an apierrors.IsNotFound error is handled in RepoServiceHandler.createService.

See

err = poll.WaitWithBackoff(ctx, backoff.Backoff{

Yes, there are some subtle differences between terminating a poll with a timeout or by retry count-- in particular whether ctx expiration can affect a call made within the poll loop. But I don't think those differences matter when the call being made is just a Get.

Ultimately the retry count is going to be picked based on a some knowledge of how long it is reasonable to wait for the snapshot controller to resolve the transient. In my mind it is clearer to just express that as a time value instead of trying to calculate the retry count from desired time and the backoff parameters.

See the many instances of poll.Wait(timeoutCtx,... in pkg/app where an app-specific timeout is known and used. Or pkg/kube.WaitForPodReady.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further reflection, I've realized that the code as implemented has the following behavior:

  • Will wait indefinitely for the controller to update the status to be complete or some form of error.
  • Once it encounters a transient error it will poll a bounded number of times (with default backoff parameters)
  • If a non-transient error is encountered it will return an error immediately.

Importantly, setting a timeout on the context at beginning of all polling will affect the first wait, the wait for controller to ready snapshot before setting any error, transient or not.

While I still think it would be better to have an explicit time limit on retrying of transients, and we probably could work out a way to build it into a retry function closure, at that point it is no longer simple and clear.

Best solution is probably to keep retry count on trasients and add a comment about expected time that count corresponds to using default backoff.

us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
return isReadyFunc(us)
})
}
30 changes: 14 additions & 16 deletions pkg/kube/snapshot/snapshot_alpha.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1alpha1"
"github.com/kanisterio/kanister/pkg/poll"
)

const (
Expand Down Expand Up @@ -318,24 +317,23 @@ func (sna *SnapshotAlpha) CreateContentFromSource(ctx context.Context, source *S
return nil
}

func isReadyToUseAlpha(us *unstructured.Unstructured) (bool, error) {
vs := v1alpha1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wanted to confirm that the events are in VolumeSnapshot resource right, and not in the VolumeSnapshotContent resource?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I don't think we read VolumeSnapshotContent status anywhere in our code. But status errors are propagated by snapshot_controller to VolumeSnapshot if there is an error in VolumeSnapshotContent as I understand.

return false, errors.New(vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
}

// WaitOnReadyToUse will block until the Volumesnapshot in namespace 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
// FIXME: merge this with beta wait
hairyhum marked this conversation as resolved.
Show resolved Hide resolved
func (sna *SnapshotAlpha) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return poll.Wait(ctx, func(context.Context) (bool, error) {
us, err := sna.dynCli.Resource(v1alpha1.VolSnapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
vs := v1alpha1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
})
return waitOnReadyToUse(ctx, sna.dynCli, v1alpha1.VolSnapGVR, snapshotName, namespace, isReadyToUseAlpha)
}

func (sna *SnapshotAlpha) GroupVersion(ctx context.Context) schema.GroupVersion {
Expand Down
38 changes: 15 additions & 23 deletions pkg/kube/snapshot/snapshot_beta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (

"github.com/kanisterio/kanister/pkg/blockstorage"
"github.com/kanisterio/kanister/pkg/kube/snapshot/apis/v1beta1"
"github.com/kanisterio/kanister/pkg/poll"
)

type SnapshotBeta struct {
Expand Down Expand Up @@ -100,7 +99,7 @@ func createSnapshot(ctx context.Context, dynCli dynamic.Interface, kubeCli kuber
return nil
}

if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace); err != nil {
if err := waitOnReadyToUse(ctx, dynCli, snapGVR, name, namespace, isReadyToUseBeta); err != nil {
return err
}

Expand Down Expand Up @@ -301,29 +300,22 @@ func (sna *SnapshotBeta) CreateContentFromSource(ctx context.Context, source *So
// WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
func (sna *SnapshotBeta) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace)
return waitOnReadyToUse(ctx, sna.dynCli, v1beta1.VolSnapGVR, snapshotName, namespace, isReadyToUseBeta)
}

func waitOnReadyToUse(ctx context.Context, dynCli dynamic.Interface, snapGVR schema.GroupVersionResource, snapshotName, namespace string) error {
return poll.Wait(ctx, func(context.Context) (bool, error) {
us, err := dynCli.Resource(snapGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
if err != nil {
return false, err
}
vs := v1beta1.VolumeSnapshot{}
err = TransformUnstructured(us, &vs)
if err != nil {
return false, err
}
if vs.Status == nil {
return false, nil
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(*vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
})
func isReadyToUseBeta(us *unstructured.Unstructured) (bool, error) {
vs := v1beta1.VolumeSnapshot{}
if err := TransformUnstructured(us, &vs); err != nil {
return false, err
}
if vs.Status == nil {
return false, nil
}
// Error can be set while waiting for creation
if vs.Status.Error != nil {
return false, errors.New(*vs.Status.Error.Message)
}
return (vs.Status.ReadyToUse != nil && *vs.Status.ReadyToUse && vs.Status.CreationTime != nil), nil
}

func getSnapshotContent(ctx context.Context, dynCli dynamic.Interface, snapContentGVR schema.GroupVersionResource, contentName string) (*v1.VolumeSnapshotContent, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kube/snapshot/snapshot_stable.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (sna *SnapshotStable) CreateContentFromSource(ctx context.Context, source *
// WaitOnReadyToUse will block until the Volumesnapshot in 'namespace' with name 'snapshotName'
// has status 'ReadyToUse' or 'ctx.Done()' is signalled.
func (sna *SnapshotStable) WaitOnReadyToUse(ctx context.Context, snapshotName, namespace string) error {
return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace)
return waitOnReadyToUse(ctx, sna.dynCli, VolSnapGVR, snapshotName, namespace, isReadyToUseBeta)
pavannd1 marked this conversation as resolved.
Show resolved Hide resolved
}

func (sna *SnapshotStable) GroupVersion(ctx context.Context) schema.GroupVersion {
Expand Down
151 changes: 151 additions & 0 deletions pkg/kube/snapshot/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,157 @@ func (s *SnapshotTestSuite) TestVolumeSnapshotCloneFake(c *C) {
}
}

func (s *SnapshotTestSuite) TestWaitOnReadyToUse(c *C) {
snapshotName := "snap-1-fake"
volName := "pvc-1-fake"
scheme := runtime.NewScheme()
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1alpha1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1beta1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})
scheme.AddKnownTypeWithName(schema.GroupVersionKind{Group: "snapshot.storage.k8s.io", Version: "v1", Kind: "VolumeSnapshotClassList"}, &unstructured.UnstructuredList{})

fakeCli := fake.NewSimpleClientset()

size, err := resource.ParseQuantity("1Gi")
c.Assert(err, IsNil)

pvc := &corev1.PersistentVolumeClaim{
ObjectMeta: metav1.ObjectMeta{
Name: volName,
},
Spec: corev1.PersistentVolumeClaimSpec{
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{
corev1.ResourceStorage: size,
},
},
},
}
_, err = fakeCli.CoreV1().PersistentVolumeClaims(defaultNamespace).Create(context.TODO(), pvc, metav1.CreateOptions{})
c.Assert(err, IsNil)

dynCli := dynfake.NewSimpleDynamicClient(scheme)

for _, fakeSs := range []snapshot.Snapshotter{
snapshot.NewSnapshotAlpha(fakeCli, dynCli),
snapshot.NewSnapshotBeta(fakeCli, dynCli),
snapshot.NewSnapshotStable(fakeCli, dynCli),
} {
ctx := context.Background()

var volumeSnapshotGVR schema.GroupVersionResource
switch fakeSs.(type) {
case *snapshot.SnapshotAlpha:
volumeSnapshotGVR = v1alpha1.VolSnapGVR
snapshotName = snapshotName + "-alpha"
case *snapshot.SnapshotBeta:
volumeSnapshotGVR = v1beta1.VolSnapGVR
snapshotName = snapshotName + "-beta"
case *snapshot.SnapshotStable:
volumeSnapshotGVR = snapshot.VolSnapGVR
snapshotName = snapshotName + "-stable"
}

err = fakeSs.Create(ctx, snapshotName, defaultNamespace, volName, &fakeClass, false, nil)
c.Assert(err, IsNil)

// This function should timeout
timeout := 500 * time.Millisecond
bgTimeout := 5 * time.Second
// We don't have readyToUse and no error, waiting indefinitely
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*context deadline exceeded*")

reply := waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout)
setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace)
select {
case err = <-reply:
c.Assert(err, IsNil)
case <-time.After(2 * time.Second):
c.Error("timeout waiting on ready to use")
}

setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, nil)

// Set non-transient error
message := "some error"
setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message)

// If there is non-transient error, exit right away
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*some error.*")

// Set transient error
message = "the object has been modified; please apply your changes to the latest version and try again"
setErrorStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace, message)

// If there is a transient error, wait with exp backoff which is long
err = waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, defaultNamespace, timeout)
c.Assert(err, NotNil)
c.Assert(err.Error(), Matches, ".*context deadline exceeded*")

reply = waitOnReadyToUseInBackground(c, ctx, fakeSs, snapshotName, defaultNamespace, bgTimeout)
setReadyStatus(c, dynCli, volumeSnapshotGVR, snapshotName, defaultNamespace)
select {
case err = <-reply:
c.Assert(err, IsNil)
case <-time.After(2 * time.Second):
c.Error("timeout waiting on ready to use")
}
}
}

// Helpers to work with volume snapshot status used in TestWaitOnReadyToUse
// ----------------------------------------------------------------------------

func waitOnReadyToUseInBackground(c *C, ctx context.Context, fakeSs snapshot.Snapshotter, snapshotName string, namespace string, timeout time.Duration) chan error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func waitOnReadyToUseInBackground(c *C, ctx context.Context, fakeSs snapshot.Snapshotter, snapshotName string, namespace string, timeout time.Duration) chan error {
func waitOnReadyToUseInBackground(
c *C,
ctx context.Context,
fakeSs snapshot.Snapshotter,
snapshotName,
namespace string,
timeout time.Duration,
) chan error {

[optional] formatting for readability

reply := make(chan error)
go func() {
err := waitOnReadyToUseWithTimeout(c, ctx, fakeSs, snapshotName, namespace, timeout)
reply <- err
}()
return reply
}

func waitOnReadyToUseWithTimeout(c *C, ctx context.Context, fakeSs snapshot.Snapshotter, snapshotName string, namespace string, timeout time.Duration) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func waitOnReadyToUseWithTimeout(c *C, ctx context.Context, fakeSs snapshot.Snapshotter, snapshotName string, namespace string, timeout time.Duration) error {
func waitOnReadyToUseWithTimeout(
c *C,
ctx context.Context,
fakeSs snapshot.Snapshotter,
snapshotName,
namespace string,
timeout time.Duration,
) error {

deadline := time.Now().Add(timeout)
deadlineCtx, cancel := context.WithDeadline(ctx, deadline)
defer cancel()

err := fakeSs.WaitOnReadyToUse(deadlineCtx, snapshotName, defaultNamespace)
return err
}

func setReadyStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func setReadyStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string) {
func setReadyStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName,
namespace string,
) {

status := make(map[string]interface{})
status["readyToUse"] = true
status["creationTime"] = time.Now().Format(time.RFC3339)

setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status)
}

func setErrorStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string, message string) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func setErrorStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string, message string) {
func setErrorStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName,
namespace,
message string,
) {

status := make(map[string]interface{})
status["Error"] = map[string]interface{}{
"Message": message,
}
setVolumeSnapshotStatus(c, dynCli, volumeSnapshotGVR, snapshotName, namespace, status)
}

func setVolumeSnapshotStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string, status map[string]interface{}) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func setVolumeSnapshotStatus(c *C, dynCli *dynfake.FakeDynamicClient, volumeSnapshotGVR schema.GroupVersionResource, snapshotName string, namespace string, status map[string]interface{}) {
func setVolumeSnapshotStatus(
c *C,
dynCli *dynfake.FakeDynamicClient,
volumeSnapshotGVR schema.GroupVersionResource,
snapshotName,
namespace string,
status map[string]interface{},
) {

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

us, err := dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).Get(ctx, snapshotName, metav1.GetOptions{})
c.Assert(err, IsNil)
us.Object["status"] = status
_, err = dynCli.Resource(volumeSnapshotGVR).Namespace(namespace).UpdateStatus(ctx, us, metav1.UpdateOptions{})
c.Assert(err, IsNil)
}

// ----------------------------------------------------------------------------

func (s *SnapshotTestSuite) TestVolumeSnapshotAlpha(c *C) {
if s.snapshotClassAlpha == nil {
c.Skip("No v1alpha1 Volumesnapshotclass in the cluster")
Expand Down