From 645cc9a9dc16da6ae566c00adddbe1b6478baeeb Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Wed, 16 Jan 2019 14:27:18 -0500 Subject: [PATCH 1/3] Add RateLimiter arg so that callers can pass their own workqueue.RateLimiter --- controller/controller.go | 34 ++++++++++++++++++++++++++-------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index 0b5b866..ecbf129 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -127,6 +127,7 @@ type ProvisionController struct { resyncPeriod time.Duration + rateLimiter workqueue.RateLimiter exponentialBackOffOnError bool threadiness int @@ -212,6 +213,18 @@ func Threadiness(threadiness int) func(*ProvisionController) error { } } +// RateLimiter is the workqueue.RateLimiter to use for the provisioning and +// deleting work queues. If set, ExponentialBackOffOnError is ignored. +func RateLimiter(rateLimiter workqueue.RateLimiter) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + c.rateLimiter = rateLimiter + return nil + } +} + // ExponentialBackOffOnError determines whether to exponentially back off from // failures of Provision and Delete. Defaults to true. func ExponentialBackOffOnError(exponentialBackOffOnError bool) func(*ProvisionController) error { @@ -469,18 +482,23 @@ func NewProvisionController( option(controller) } - ratelimiter := workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ) - if !controller.exponentialBackOffOnError { - ratelimiter = workqueue.NewMaxOfRateLimiter( + var rateLimiter workqueue.RateLimiter + if controller.rateLimiter != nil { + // rateLimiter set via parameter takes precedence + rateLimiter = controller.rateLimiter + } else if controller.exponentialBackOffOnError { + rateLimiter = workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ) + } else { + rateLimiter = workqueue.NewMaxOfRateLimiter( workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second), &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, ) } - controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims") - controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes") + controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") + controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) From c2c650d543020199b80be5b2d1aaad681f81e9f7 Mon Sep 17 00:00:00 2001 From: Matthew Wong Date: Wed, 16 Jan 2019 14:32:53 -0500 Subject: [PATCH 2/3] Allow disabling of failure thresholds --- controller/controller.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index ecbf129..cb408be 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -262,7 +262,7 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func } // FailedProvisionThreshold is the threshold for max number of retries on -// failures of Provision. Defaults to 15. +// failures of Provision. Set to 0 to retry indefinitely. Defaults to 15. func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error { return func(c *ProvisionController) error { if c.HasRun() { @@ -274,7 +274,7 @@ func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionContr } // FailedDeleteThreshold is the threshold for max number of retries on failures -// of Delete. Defaults to 15. +// of Delete. Set to 0 to retry indefinitely. Defaults to 15. func FailedDeleteThreshold(failedDeleteThreshold int) func(*ProvisionController) error { return func(c *ProvisionController) error { if c.HasRun() { @@ -702,7 +702,10 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool { } if err := ctrl.syncClaimHandler(key); err != nil { - if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold { + if ctrl.failedProvisionThreshold == 0 { + glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj)) + ctrl.claimQueue.AddRateLimited(obj) + } else if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold { glog.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) ctrl.claimQueue.AddRateLimited(obj) } else { @@ -743,11 +746,14 @@ func (ctrl *ProvisionController) processNextVolumeWorkItem() bool { } if err := ctrl.syncVolumeHandler(key); err != nil { - if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold { - glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) + if ctrl.failedDeleteThreshold == 0 { + glog.Warningf("Retrying syncing volume %q, failure %v", key, ctrl.volumeQueue.NumRequeues(obj)) + ctrl.volumeQueue.AddRateLimited(obj) + } else if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold { + glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold) ctrl.volumeQueue.AddRateLimited(obj) } else { - glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedProvisionThreshold) + glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold) // Done but do not Forget: it will not be in the queue but NumRequeues // will be saved until the obj is deleted from kubernetes } From 0af807b1c7f2b5069c6b7131686cc5e11bc67769 Mon Sep 17 00:00:00 2001 From: Jan Safranek Date: Thu, 17 Jan 2019 11:15:21 +0100 Subject: [PATCH 3/3] Allow exponential backoff to save PVs to API server Exponential backoff must be explicitly requested by provisioner. --- controller/controller.go | 119 +++++++++++++++++++++++++++------------ 1 file changed, 82 insertions(+), 37 deletions(-) diff --git a/controller/controller.go b/controller/controller.go index cb408be..19e66c6 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -131,6 +131,7 @@ type ProvisionController struct { exponentialBackOffOnError bool threadiness int + createProvisionedPVBackoff *wait.Backoff createProvisionedPVRetryCount int createProvisionedPVInterval time.Duration @@ -244,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov if c.HasRun() { return errRuntime } + if c.createProvisionedPVBackoff != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount") + } c.createProvisionedPVRetryCount = createProvisionedPVRetryCount return nil } @@ -256,11 +260,34 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func if c.HasRun() { return errRuntime } + if c.createProvisionedPVBackoff != nil { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") + } c.createProvisionedPVInterval = createProvisionedPVInterval return nil } } +// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a +// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times. +// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff +// can be used. +func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error { + return func(c *ProvisionController) error { + if c.HasRun() { + return errRuntime + } + if c.createProvisionedPVRetryCount != 0 { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount") + } + if c.createProvisionedPVInterval != 0 { + return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval") + } + c.createProvisionedPVBackoff = &backoff + return nil + } +} + // FailedProvisionThreshold is the threshold for max number of retries on // failures of Provision. Set to 0 to retry indefinitely. Defaults to 15. func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error { @@ -452,34 +479,35 @@ func NewProvisionController( eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component}) controller := &ProvisionController{ - client: client, - provisionerName: provisionerName, - provisioner: provisioner, - kubeVersion: utilversion.MustParseSemantic(kubeVersion), - id: id, - component: component, - eventRecorder: eventRecorder, - resyncPeriod: DefaultResyncPeriod, - exponentialBackOffOnError: DefaultExponentialBackOffOnError, - threadiness: DefaultThreadiness, - createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount, - createProvisionedPVInterval: DefaultCreateProvisionedPVInterval, - failedProvisionThreshold: DefaultFailedProvisionThreshold, - failedDeleteThreshold: DefaultFailedDeleteThreshold, - leaderElection: DefaultLeaderElection, - leaderElectionNamespace: getInClusterNamespace(), - leaseDuration: DefaultLeaseDuration, - renewDeadline: DefaultRenewDeadline, - retryPeriod: DefaultRetryPeriod, - metricsPort: DefaultMetricsPort, - metricsAddress: DefaultMetricsAddress, - metricsPath: DefaultMetricsPath, - hasRun: false, - hasRunLock: &sync.Mutex{}, + client: client, + provisionerName: provisionerName, + provisioner: provisioner, + kubeVersion: utilversion.MustParseSemantic(kubeVersion), + id: id, + component: component, + eventRecorder: eventRecorder, + resyncPeriod: DefaultResyncPeriod, + exponentialBackOffOnError: DefaultExponentialBackOffOnError, + threadiness: DefaultThreadiness, + failedProvisionThreshold: DefaultFailedProvisionThreshold, + failedDeleteThreshold: DefaultFailedDeleteThreshold, + leaderElection: DefaultLeaderElection, + leaderElectionNamespace: getInClusterNamespace(), + leaseDuration: DefaultLeaseDuration, + renewDeadline: DefaultRenewDeadline, + retryPeriod: DefaultRetryPeriod, + metricsPort: DefaultMetricsPort, + metricsAddress: DefaultMetricsAddress, + metricsPath: DefaultMetricsPath, + hasRun: false, + hasRunLock: &sync.Mutex{}, } for _, option := range options { - option(controller) + err := option(controller) + if err != nil { + glog.Fatalf("Error processing controller options: %s", err) + } } var rateLimiter workqueue.RateLimiter @@ -500,6 +528,21 @@ func NewProvisionController( controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims") controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes") + if controller.createProvisionedPVBackoff == nil { + // Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default. + if controller.createProvisionedPVInterval == 0 { + controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval + } + if controller.createProvisionedPVRetryCount == 0 { + controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount + } + controller.createProvisionedPVBackoff = &wait.Backoff{ + Duration: controller.createProvisionedPVInterval, + Factor: 1, // linear backoff + Steps: controller.createProvisionedPVRetryCount, + } + } + informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod) // ---------------------- @@ -1083,47 +1126,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol } // Try to create the PV object several times - for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + var lastSaveError error + err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name)) if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) { // Save succeeded. if err != nil { glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name)) - err = nil } else { glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name)) } - break + return true, nil } // Save failed, try again after a while. glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err)) - time.Sleep(ctrl.createProvisionedPVInterval) - } + lastSaveError = err + return false, nil + }) if err != nil { // Save failed. Now we have a storage asset outside of Kubernetes, // but we don't have appropriate PV object for it. // Emit some event here and try to delete the storage asset several // times. - strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err) + strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr) - for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ { + var lastDeleteError error + err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) { if err = ctrl.provisioner.Delete(volume); err == nil { // Delete succeeded glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name)) - break + return true, nil } // Delete failed, try again after a while. glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err)) - time.Sleep(ctrl.createProvisionedPVInterval) - } - + lastDeleteError = err + return false, nil + }) if err != nil { // Delete failed several times. There is an orphaned volume and there // is nothing we can do about it. - strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err) + strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError) glog.Error(logOperation(operation, strerr)) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr) }