Skip to content

Commit

Permalink
Fix retries (#2241)
Browse files Browse the repository at this point in the history
* Attempt to requeue after correct period

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Syntactically correct

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* I think correct requeueing

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Same treatment for the other retries

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Requeue after deleting resources

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Try to fix submission status updates

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Correct usage of submitSparkApplication

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Fix error logging

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Bring back ExecutionAttempts increment that I forgot about

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Log after reconcile complete

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Fix setting submission ID

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy logging

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Update comment

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Start a new test

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Working Fails submission and retries until retries are exhausted test

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Add Application fails and retries until retries are exhausted

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Comments

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Tidy

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Move fail configs out of the examples directory

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Fix lint

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Move TimeUntilNextRetryDue to `pkg/util/sparkapplication.go`

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Update internal/controller/sparkapplication/controller.go

Co-authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* Update test/e2e/sparkapplication_test.go

Co-authored-by: Yi Chen <github@chenyicn.net>
Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* camelCase

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* make fo-fmt

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

* PR comments

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>

---------

Signed-off-by: Thomas Newton <thomas.w.newton@gmail.com>
Co-authored-by: Yi Chen <github@chenyicn.net>
  • Loading branch information
Tom-Newton and ChenYi015 authored Oct 23, 2024
1 parent d130b08 commit 735c7fc
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 62 deletions.
108 changes: 52 additions & 56 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{Requeue: true}, err
}
logger.Info("Reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
defer logger.Info("Finished reconciling SparkApplication", "name", app.Name, "namespace", app.Namespace)

// Check if the spark application is being deleted
if !app.DeletionTimestamp.IsZero() {
Expand Down Expand Up @@ -259,17 +260,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
}
app := old.DeepCopy()

if err := r.submitSparkApplication(app); err != nil {
logger.Error(err, "Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace)
app.Status = v1beta2.SparkApplicationStatus{
AppState: v1beta2.ApplicationState{
State: v1beta2.ApplicationStateFailedSubmission,
ErrorMessage: err.Error(),
},
SubmissionAttempts: app.Status.SubmissionAttempts + 1,
LastSubmissionAttemptTime: metav1.Now(),
}
}
_ = r.submitSparkApplication(app)
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
Expand Down Expand Up @@ -315,6 +306,9 @@ func (r *Reconciler) reconcileSubmittedSparkApplication(ctx context.Context, req

func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName

var result ctrl.Result

retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
Expand All @@ -328,15 +322,22 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
app := old.DeepCopy()

if util.ShouldRetry(app) {
if isNextRetryDue(app) {
timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app)
if err != nil {
return err
}
if timeUntilNextRetryDue <= 0 {
if r.validateSparkResourceDeletion(ctx, app) {
_ = r.submitSparkApplication(app)
} else {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace)
}
return err
return fmt.Errorf("resources associated with SparkApplication name: %s namespace: %s, needed to be deleted", app.Name, app.Namespace)
}
} else {
// If we're waiting before retrying then reconcile will not modify anything, so we need to requeue.
result.RequeueAfter = timeUntilNextRetryDue
}
} else {
app.Status.AppState.State = v1beta2.ApplicationStateFailed
Expand All @@ -352,9 +353,9 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
return result, retryErr
}
return ctrl.Result{}, nil
return result, nil
}

func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -408,9 +409,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
r.recordSparkApplicationEvent(app)
r.resetSparkApplicationStatus(app)
if err = r.submitSparkApplication(app); err != nil {
logger.Error(err, "Failed to run spark-submit", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
}
_ = r.submitSparkApplication(app)
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
Expand Down Expand Up @@ -497,6 +496,9 @@ func (r *Reconciler) reconcileSucceedingSparkApplication(ctx context.Context, re

func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
key := req.NamespacedName

var result ctrl.Result

retryErr := retry.RetryOnConflict(
retry.DefaultRetry,
func() error {
Expand All @@ -510,12 +512,19 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
app := old.DeepCopy()

if util.ShouldRetry(app) {
if isNextRetryDue(app) {
timeUntilNextRetryDue, err := util.TimeUntilNextRetryDue(app)
if err != nil {
return err
}
if timeUntilNextRetryDue <= 0 {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace)
return err
}
app.Status.AppState.State = v1beta2.ApplicationStatePendingRerun
} else {
// If we're waiting before retrying then reconcile will not modify anything, so we need to requeue.
result.RequeueAfter = timeUntilNextRetryDue
}
} else {
app.Status.AppState.State = v1beta2.ApplicationStateFailed
Expand All @@ -528,9 +537,9 @@ func (r *Reconciler) reconcileFailingSparkApplication(ctx context.Context, req c
)
if retryErr != nil {
logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace)
return ctrl.Result{}, retryErr
return result, retryErr
}
return ctrl.Result{}, nil
return result, nil
}

func (r *Reconciler) reconcileCompletedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
Expand Down Expand Up @@ -632,8 +641,28 @@ func (r *Reconciler) getSparkApplication(key types.NamespacedName) (*v1beta2.Spa
}

// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error {
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) {
logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
app.Status.SubmissionID = uuid.New().String()
app.Status.LastSubmissionAttemptTime = metav1.Now()
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1

defer func() {
if submitErr == nil {
app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateSubmitted,
}
app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1
} else {
logger.Info("Failed to submit SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State, "error", submitErr)
app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateFailedSubmission,
ErrorMessage: submitErr.Error(),
}
}
r.recordSparkApplicationEvent(app)
}()

if util.PrometheusMonitoringEnabled(app) {
logger.Info("Configure Prometheus monitoring for SparkApplication", "name", app.Name, "namespace", app.Namespace)
Expand Down Expand Up @@ -709,52 +738,19 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) error

driverPodName := util.GetDriverPodName(app)
app.Status.DriverInfo.PodName = driverPodName
app.Status.SubmissionID = uuid.New().String()
sparkSubmitArgs, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
}

// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
submitted, err := runSparkSubmit(newSubmission(sparkSubmitArgs, app))
if err != nil {
r.recordSparkApplicationEvent(app)
if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil {
return fmt.Errorf("failed to run spark-submit: %v", err)
}
if !submitted {
// The application may not have been submitted even if err == nil, e.g., when some
// state update caused an attempt to re-submit the application, in which case no
// error gets returned from runSparkSubmit. If this is the case, we simply return.
return nil
}

app.Status.AppState = v1beta2.ApplicationState{
State: v1beta2.ApplicationStateSubmitted,
}
app.Status.SubmissionAttempts = app.Status.SubmissionAttempts + 1
app.Status.ExecutionAttempts = app.Status.ExecutionAttempts + 1
app.Status.LastSubmissionAttemptTime = metav1.Now()
r.recordSparkApplicationEvent(app)
return nil
}

// Helper func to determine if the next retry the SparkApplication is due now.
func isNextRetryDue(app *v1beta2.SparkApplication) bool {
retryInterval := app.Spec.RestartPolicy.OnFailureRetryInterval
attemptsDone := app.Status.SubmissionAttempts
lastEventTime := app.Status.LastSubmissionAttemptTime
if retryInterval == nil || lastEventTime.IsZero() || attemptsDone <= 0 {
return false
}

// Retry if we have waited at-least equal to attempts*RetryInterval since we do a linear back-off.
interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone)
currentTime := time.Now()
logger.Info(fmt.Sprintf("currentTime is %v, interval is %v", currentTime, interval))
return currentTime.After(lastEventTime.Add(interval))
}

// updateDriverState finds the driver pod of the application
// and updates the driver state based on the current phase of the pod.
func (r *Reconciler) updateDriverState(_ context.Context, app *v1beta2.SparkApplication) error {
Expand Down
12 changes: 6 additions & 6 deletions internal/controller/sparkapplication/submission.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@ func newSubmission(args []string, app *v1beta2.SparkApplication) *submission {
}
}

func runSparkSubmit(submission *submission) (bool, error) {
func runSparkSubmit(submission *submission) error {
sparkHome, present := os.LookupEnv(common.EnvSparkHome)
if !present {
return false, fmt.Errorf("env %s is not specified", common.EnvSparkHome)
return fmt.Errorf("env %s is not specified", common.EnvSparkHome)
}
command := filepath.Join(sparkHome, "bin", "spark-submit")
cmd := exec.Command(command, submission.args...)
Expand All @@ -58,14 +58,14 @@ func runSparkSubmit(submission *submission) (bool, error) {
}
// The driver pod of the application already exists.
if strings.Contains(errorMsg, common.ErrorCodePodAlreadyExists) {
return false, fmt.Errorf("driver pod already exist")
return fmt.Errorf("driver pod already exist")
}
if errorMsg != "" {
return false, fmt.Errorf("failed to run spark-submit: %s", errorMsg)
return fmt.Errorf("failed to run spark-submit: %s", errorMsg)
}
return false, fmt.Errorf("failed to run spark-submit: %v", err)
return fmt.Errorf("failed to run spark-submit: %v", err)
}
return true, nil
return nil
}

// buildSparkSubmitArgs builds the arguments for spark-submit.
Expand Down
21 changes: 21 additions & 0 deletions pkg/util/sparkapplication.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,27 @@ func ShouldRetry(app *v1beta2.SparkApplication) bool {
return false
}

func TimeUntilNextRetryDue(app *v1beta2.SparkApplication) (time.Duration, error) {
var retryInterval *int64
switch app.Status.AppState.State {
case v1beta2.ApplicationStateFailedSubmission:
retryInterval = app.Spec.RestartPolicy.OnSubmissionFailureRetryInterval
case v1beta2.ApplicationStateFailing:
retryInterval = app.Spec.RestartPolicy.OnFailureRetryInterval
}

attemptsDone := app.Status.SubmissionAttempts
lastAttemptTime := app.Status.LastSubmissionAttemptTime
if retryInterval == nil || lastAttemptTime.IsZero() || attemptsDone <= 0 {
return -1, fmt.Errorf("invalid retry interval (%v), last attempt time (%v) or attemptsDone (%v)", retryInterval, lastAttemptTime, attemptsDone)
}

// Retry wait time is attempts*RetryInterval to do a linear backoff.
interval := time.Duration(*retryInterval) * time.Second * time.Duration(attemptsDone)
currentTime := time.Now()
return interval - currentTime.Sub(lastAttemptTime.Time), nil
}

func GetLocalVolumes(app *v1beta2.SparkApplication) map[string]corev1.Volume {
volumes := make(map[string]corev1.Volume)
for _, volume := range app.Spec.Volumes {
Expand Down
44 changes: 44 additions & 0 deletions test/e2e/bad_examples/fail-application.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: fail-submission
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: non-existent
mainApplicationFile: local:///non-existent.jar
sparkVersion: 3.5.2
restartPolicy:
type: OnFailure
onFailureRetries: 3
onFailureRetryInterval: 1
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
44 changes: 44 additions & 0 deletions test/e2e/bad_examples/fail-submission.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: fail-submission
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.2
imagePullPolicy: IfNotPresent
mainClass: dummy
mainApplicationFile: local:///dummy.jar
sparkVersion: 3.5.2
restartPolicy:
type: OnFailure
onSubmissionFailureRetries: 3
onSubmissionFailureRetryInterval: 1
driver:
labels:
version: 3.5.2
cores: 1
memory: 512m
serviceAccount: non-existent # This is the important part that causes submission to fail.
executor:
labels:
version: 3.5.2
instances: 1
cores: 1
memory: 512m
Loading

0 comments on commit 735c7fc

Please sign in to comment.