Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(wfr): retry creating pod on QuotaExceed error until timeout #1528

Merged
merged 3 commits into from
Nov 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions pkg/workflow/controller/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,31 @@ func (c *Controller) nextWork() bool {
}

defer c.queue.Done(key)
_, err := c.doWork(key.(string))
if err == nil {
c.queue.Forget(key)
} else if c.queue.NumRequeues(key) < 3 {
log.Errorf("process %s failed (will retry): %v", key, err)
c.queue.AddRateLimited(key)
res, err := c.doWork(key.(string))
if res.Requeue != nil {
requeue := *res.Requeue
if !requeue {
log.Warningf("process %s failed (requeue=false, gave up)", key)
c.queue.Forget(key)
} else if res.RequeueAfter > 0 {
log.Warningf("process %s failed (requeue=true, will retry after %s)", key, res.RequeueAfter)
c.queue.Forget(key)
c.queue.AddAfter(key, res.RequeueAfter)
} else {
log.Warningf("process %s failed (requeue=true, will retry)", key)
c.queue.AddRateLimited(key)
}
} else if err != nil {
if c.queue.NumRequeues(key) < 3 {
log.Errorf("process %s failed (will retry): %v", key, err)
c.queue.AddRateLimited(key)
} else {
log.Errorf("process %s failed (gave up): %v", key, err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}
} else {
log.Errorf("process %s failed (gave up): %v", key, err)
c.queue.Forget(key)
utilruntime.HandleError(err)
}

return true
Expand Down
6 changes: 4 additions & 2 deletions pkg/workflow/controller/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (

// Result contains the result of a Reconciler invocation.
type Result struct {
// Requeue tells the Controller to requeue the reconcile key. Defaults to false.
Requeue bool
// Requeue tells the Controller to requeue the reconcile key.
// If it is set to false, the controller will not requeue even if error occurred.
// If it is nil, the controller will retry at most 3 times on errors.
Requeue *bool

// RequeueAfter if greater than 0, tells the Controller to requeue the reconcile key after the Duration.
// Implies that Requeue is true, there is no need to set Requeue to true at the same time as RequeueAfter.
Expand Down
44 changes: 35 additions & 9 deletions pkg/workflow/workflowrun/operator.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package workflowrun

import (
stderr "errors"
"fmt"
"os"
"reflect"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -359,6 +361,14 @@ func (o *operator) OverallStatus() (*v1alpha1.Status, error) {
}, nil
}

func isExceededQuotaError(err error) bool {
if status := errors.APIStatus(nil); stderr.As(err, &status) {
s := status.Status()
return s.Reason == metav1.StatusReasonForbidden && strings.Contains(s.Message, "exceeded quota:")
}
return false
}

// Reconcile finds next stages in the workflow to run and resolve WorkflowRun's overall status.
func (o *operator) Reconcile() (controller.Result, error) {
var res controller.Result
Expand All @@ -375,14 +385,6 @@ func (o *operator) Reconcile() (controller.Result, error) {
log.WithField("stg", nextStages).Info("Next stages to run")
}

for _, stage := range nextStages {
o.UpdateStageStatus(stage, &v1alpha1.Status{
Phase: v1alpha1.StatusRunning,
Reason: "StageInitialized",
LastTransitionTime: metav1.Time{Time: time.Now()},
StartTime: metav1.Time{Time: time.Now()},
})
}
overall, err := o.OverallStatus()
if err != nil {
return res, fmt.Errorf("resolve overall status error: %v", err)
Expand All @@ -399,6 +401,7 @@ func (o *operator) Reconcile() (controller.Result, error) {
return res, nil
}

var retryStageNames []string
// Create pod to run stages.
for _, stage := range nextStages {
log.WithField("stg", stage).Info("Start to run stage")
Expand All @@ -411,10 +414,33 @@ func (o *operator) Reconcile() (controller.Result, error) {

err = NewWorkloadProcessor(o.clusterClient, o.client, o.wf, o.wfr, stg, o).Process()
if err != nil {
log.WithField("stg", stage).Error("Process workload error: ", err)
if isExceededQuotaError(err) {
retryStageNames = append(retryStageNames, stage)
log.WithField("wfr", o.wfr.Name).WithField("stg", stage).Warning("Process workload error: ", err)
} else {
log.WithField("wfr", o.wfr.Name).WithField("stg", stage).Error("Process workload error: ", err)
}
continue
}
}
if len(retryStageNames) > 0 {
var requeue bool
if HasTimedOut(o.wfr) {
// timed-out. Update stage status and do not requeue
for _, stageName := range retryStageNames {
o.UpdateStageStatus(stageName, &v1alpha1.Status{
Phase: v1alpha1.StatusFailed,
Reason: "RetryOnExceededQuotaTimeout",
LastTransitionTime: metav1.Time{Time: time.Now()},
StartTime: metav1.Time{Time: time.Now()},
})
}
requeue = false
} else {
requeue = true
}
res.Requeue = &requeue
}

overall, err = o.OverallStatus()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/workflow/workflowrun/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,9 @@ func ensureOwner(client clientset.Interface, wf *v1alpha1.Workflow, wfr *v1alpha
func GCPodName(wfr string) string {
return fmt.Sprintf("wfrgc--%s", wfr)
}

// HasTimedOut checks whether the WorkflowRun has timed out
func HasTimedOut(wfr *v1alpha1.WorkflowRun) bool {
timeout, _ := ParseTime(wfr.Spec.Timeout)
return wfr.CreationTimestamp.UTC().Add(timeout).Before(time.Now().UTC())
}
32 changes: 10 additions & 22 deletions pkg/workflow/workflowrun/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,11 @@ import (
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/caicloud/cyclone/pkg/apis/cyclone/v1alpha1"
"github.com/caicloud/cyclone/pkg/k8s/clientset"
"github.com/caicloud/cyclone/pkg/util"
"github.com/caicloud/cyclone/pkg/util/retry"
"github.com/caicloud/cyclone/pkg/workflow/workload/delegation"
"github.com/caicloud/cyclone/pkg/workflow/workload/pod"
)
Expand Down Expand Up @@ -68,43 +66,33 @@ func (p *WorkloadProcessor) processPod() error {
// Generate pod for this stage.
po, err := pod.NewBuilder(p.client, p.wf, p.wfr, p.stg).Build()
if err != nil {
log.WithField("wfr", p.wfr.Name).WithField("stg", p.stg.Name).Error("Create pod manifest for stage error: ", err)
p.wfrOper.GetRecorder().Eventf(p.wfr, corev1.EventTypeWarning, "GeneratePodSpecError", "Generate pod for stage '%s' error: %v", p.stg.Name, err)
p.wfrOper.UpdateStageStatus(p.stg.Name, &v1alpha1.Status{
Phase: v1alpha1.StatusFailed,
Reason: "GeneratePodError",
LastTransitionTime: metav1.Time{Time: time.Now()},
Message: fmt.Sprintf("Failed to generate pod: %v", err),
})
return err
return fmt.Errorf("create pod manifest: %w", err)
}
log.WithField("stg", p.stg.Name).Debug("Pod manifest created")

// Create the generated pod with retry on exceeded quota.
// Here is a litter tricky. Cyclone will delete stage related pod to release cpu/memory resource when stage have
// been finished, but pod deletion needs some time, so retry on exceeded quota gives the time to waiting previous
// stage pod deletion.
backoff := wait.Backoff{
Steps: 3,
Duration: 5 * time.Second,
Factor: 1.5,
Jitter: 0.1,
}
origin := po.DeepCopy()
err = retry.OnExceededQuota(backoff, func() error {
po, err = p.clusterClient.CoreV1().Pods(pod.GetExecutionContext(p.wfr).Namespace).Create(origin)
return err
})
po, err = p.clusterClient.CoreV1().Pods(pod.GetExecutionContext(p.wfr).Namespace).Create(po)
if err != nil {
log.WithField("wfr", p.wfr.Name).WithField("stg", p.stg.Name).Error("Create pod for stage error: ", err)
p.wfrOper.GetRecorder().Eventf(p.wfr, corev1.EventTypeWarning, "StagePodCreated", "Create pod for stage '%s' error: %v", p.stg.Name, err)
var phase v1alpha1.StatusPhase
if isExceededQuotaError(err) {
phase = v1alpha1.StatusPending
} else {
phase = v1alpha1.StatusFailed
}
p.wfrOper.UpdateStageStatus(p.stg.Name, &v1alpha1.Status{
Phase: v1alpha1.StatusFailed,
Phase: phase,
Reason: "CreatePodError",
LastTransitionTime: metav1.Time{Time: time.Now()},
Message: fmt.Sprintf("Failed to create pod: %v", err),
})
return err
return fmt.Errorf("create pod: %w", err)
}

log.WithField("wfr", p.wfr.Name).WithField("stg", p.stg.Name).Debug("Create pod for stage succeeded")
Expand Down