Skip to content

Commit

Permalink
Merge pull request #17 from wongma7/createvolume-picks
Browse files Browse the repository at this point in the history
Cherry-pick #12, #13, #16 on release-1.13
  • Loading branch information
k8s-ci-robot authored Jan 18, 2019
2 parents 05ed9a3 + 0af807b commit eb02dcb
Showing 1 changed file with 120 additions and 51 deletions.
171 changes: 120 additions & 51 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,11 @@ type ProvisionController struct {

resyncPeriod time.Duration

rateLimiter workqueue.RateLimiter
exponentialBackOffOnError bool
threadiness int

createProvisionedPVBackoff *wait.Backoff
createProvisionedPVRetryCount int
createProvisionedPVInterval time.Duration

Expand Down Expand Up @@ -212,6 +214,18 @@ func Threadiness(threadiness int) func(*ProvisionController) error {
}
}

// RateLimiter is the workqueue.RateLimiter to use for the provisioning and
// deleting work queues. If set, ExponentialBackOffOnError is ignored.
func RateLimiter(rateLimiter workqueue.RateLimiter) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
c.rateLimiter = rateLimiter
return nil
}
}

// ExponentialBackOffOnError determines whether to exponentially back off from
// failures of Provision and Delete. Defaults to true.
func ExponentialBackOffOnError(exponentialBackOffOnError bool) func(*ProvisionController) error {
Expand All @@ -231,6 +245,9 @@ func CreateProvisionedPVRetryCount(createProvisionedPVRetryCount int) func(*Prov
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
c.createProvisionedPVRetryCount = createProvisionedPVRetryCount
return nil
}
Expand All @@ -243,13 +260,36 @@ func CreateProvisionedPVInterval(createProvisionedPVInterval time.Duration) func
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVBackoff != nil {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVInterval = createProvisionedPVInterval
return nil
}
}

// CreateProvisionedPVBackoff is the configuration of exponential backoff between retries when we create a
// PV object for a provisioned volume. Defaults to linear backoff, 10 seconds 5 times.
// Only one of CreateProvisionedPVInterval+CreateProvisionedPVRetryCount or CreateProvisionedPVBackoff
// can be used.
func CreateProvisionedPVBackoff(backoff wait.Backoff) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
return errRuntime
}
if c.createProvisionedPVRetryCount != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVRetryCount")
}
if c.createProvisionedPVInterval != 0 {
return fmt.Errorf("CreateProvisionedPVBackoff cannot be used together with CreateProvisionedPVInterval")
}
c.createProvisionedPVBackoff = &backoff
return nil
}
}

// FailedProvisionThreshold is the threshold for max number of retries on
// failures of Provision. Defaults to 15.
// failures of Provision. Set to 0 to retry indefinitely. Defaults to 15.
func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
Expand All @@ -261,7 +301,7 @@ func FailedProvisionThreshold(failedProvisionThreshold int) func(*ProvisionContr
}

// FailedDeleteThreshold is the threshold for max number of retries on failures
// of Delete. Defaults to 15.
// of Delete. Set to 0 to retry indefinitely. Defaults to 15.
func FailedDeleteThreshold(failedDeleteThreshold int) func(*ProvisionController) error {
return func(c *ProvisionController) error {
if c.HasRun() {
Expand Down Expand Up @@ -439,48 +479,69 @@ func NewProvisionController(
eventRecorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: component})

controller := &ProvisionController{
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
createProvisionedPVRetryCount: DefaultCreateProvisionedPVRetryCount,
createProvisionedPVInterval: DefaultCreateProvisionedPVInterval,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
client: client,
provisionerName: provisionerName,
provisioner: provisioner,
kubeVersion: utilversion.MustParseSemantic(kubeVersion),
id: id,
component: component,
eventRecorder: eventRecorder,
resyncPeriod: DefaultResyncPeriod,
exponentialBackOffOnError: DefaultExponentialBackOffOnError,
threadiness: DefaultThreadiness,
failedProvisionThreshold: DefaultFailedProvisionThreshold,
failedDeleteThreshold: DefaultFailedDeleteThreshold,
leaderElection: DefaultLeaderElection,
leaderElectionNamespace: getInClusterNamespace(),
leaseDuration: DefaultLeaseDuration,
renewDeadline: DefaultRenewDeadline,
retryPeriod: DefaultRetryPeriod,
metricsPort: DefaultMetricsPort,
metricsAddress: DefaultMetricsAddress,
metricsPath: DefaultMetricsPath,
hasRun: false,
hasRunLock: &sync.Mutex{},
}

for _, option := range options {
option(controller)
err := option(controller)
if err != nil {
glog.Fatalf("Error processing controller options: %s", err)
}
}

ratelimiter := workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
if !controller.exponentialBackOffOnError {
ratelimiter = workqueue.NewMaxOfRateLimiter(
var rateLimiter workqueue.RateLimiter
if controller.rateLimiter != nil {
// rateLimiter set via parameter takes precedence
rateLimiter = controller.rateLimiter
} else if controller.exponentialBackOffOnError {
rateLimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
} else {
rateLimiter = workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(15*time.Second, 15*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
)
}
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(ratelimiter, "volumes")
controller.claimQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "claims")
controller.volumeQueue = workqueue.NewNamedRateLimitingQueue(rateLimiter, "volumes")

if controller.createProvisionedPVBackoff == nil {
// Use linear backoff with createProvisionedPVInterval and createProvisionedPVRetryCount by default.
if controller.createProvisionedPVInterval == 0 {
controller.createProvisionedPVInterval = DefaultCreateProvisionedPVInterval
}
if controller.createProvisionedPVRetryCount == 0 {
controller.createProvisionedPVRetryCount = DefaultCreateProvisionedPVRetryCount
}
controller.createProvisionedPVBackoff = &wait.Backoff{
Duration: controller.createProvisionedPVInterval,
Factor: 1, // linear backoff
Steps: controller.createProvisionedPVRetryCount,
}
}

informer := informers.NewSharedInformerFactory(client, controller.resyncPeriod)

Expand Down Expand Up @@ -684,7 +745,10 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
}

if err := ctrl.syncClaimHandler(key); err != nil {
if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold {
if ctrl.failedProvisionThreshold == 0 {
glog.Warningf("Retrying syncing claim %q, failure %v", key, ctrl.claimQueue.NumRequeues(obj))
ctrl.claimQueue.AddRateLimited(obj)
} else if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold {
glog.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
ctrl.claimQueue.AddRateLimited(obj)
} else {
Expand Down Expand Up @@ -725,11 +789,14 @@ func (ctrl *ProvisionController) processNextVolumeWorkItem() bool {
}

if err := ctrl.syncVolumeHandler(key); err != nil {
if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold {
glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
if ctrl.failedDeleteThreshold == 0 {
glog.Warningf("Retrying syncing volume %q, failure %v", key, ctrl.volumeQueue.NumRequeues(obj))
ctrl.volumeQueue.AddRateLimited(obj)
} else if ctrl.volumeQueue.NumRequeues(obj) < ctrl.failedDeleteThreshold {
glog.Warningf("Retrying syncing volume %q because failures %v < threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold)
ctrl.volumeQueue.AddRateLimited(obj)
} else {
glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
glog.Errorf("Giving up syncing volume %q because failures %v >= threshold %v", key, ctrl.volumeQueue.NumRequeues(obj), ctrl.failedDeleteThreshold)
// Done but do not Forget: it will not be in the queue but NumRequeues
// will be saved until the obj is deleted from kubernetes
}
Expand Down Expand Up @@ -1059,47 +1126,49 @@ func (ctrl *ProvisionController) provisionClaimOperation(claim *v1.PersistentVol
}

// Try to create the PV object several times
for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastSaveError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
glog.Info(logOperation(operation, "trying to save persistentvolume %q", volume.Name))
if _, err = ctrl.client.CoreV1().PersistentVolumes().Create(volume); err == nil || apierrs.IsAlreadyExists(err) {
// Save succeeded.
if err != nil {
glog.Info(logOperation(operation, "persistentvolume %q already exists, reusing", volume.Name))
err = nil
} else {
glog.Info(logOperation(operation, "persistentvolume %q saved", volume.Name))
}
break
return true, nil
}
// Save failed, try again after a while.
glog.Info(logOperation(operation, "failed to save persistentvolume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}
lastSaveError = err
return false, nil
})

if err != nil {
// Save failed. Now we have a storage asset outside of Kubernetes,
// but we don't have appropriate PV object for it.
// Emit some event here and try to delete the storage asset several
// times.
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error creating provisioned PV object for claim %s: %v. Deleting the volume.", claimToClaimKey(claim), lastSaveError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningFailed", strerr)

for i := 0; i < ctrl.createProvisionedPVRetryCount; i++ {
var lastDeleteError error
err = wait.ExponentialBackoff(*ctrl.createProvisionedPVBackoff, func() (bool, error) {
if err = ctrl.provisioner.Delete(volume); err == nil {
// Delete succeeded
glog.Info(logOperation(operation, "cleaning volume %q succeeded", volume.Name))
break
return true, nil
}
// Delete failed, try again after a while.
glog.Info(logOperation(operation, "failed to clean volume %q: %v", volume.Name, err))
time.Sleep(ctrl.createProvisionedPVInterval)
}

lastDeleteError = err
return false, nil
})
if err != nil {
// Delete failed several times. There is an orphaned volume and there
// is nothing we can do about it.
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), err)
strerr := fmt.Sprintf("Error cleaning provisioned volume for claim %s: %v. Please delete manually.", claimToClaimKey(claim), lastDeleteError)
glog.Error(logOperation(operation, strerr))
ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, "ProvisioningCleanupFailed", strerr)
}
Expand Down

0 comments on commit eb02dcb

Please sign in to comment.