Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling Savepoint during updates #184

Merged
merged 9 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions docs/state_machine.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ Running --> Updating
Updating --> ClusterStarting
DeployFailed --> Updating

ClusterStarting --> Savepointing
ClusterStarting -- savepoint disabled --> Cancelling
ClusterStarting -- savepoint enabled --> Savepointing
ClusterStarting -- Create fails --> DeployFailed

Cancelling --> SubmittingJob
Savepointing --> SubmittingJob
Savepointing -- Savepoint fails --> Recovering

Expand All @@ -28,7 +30,9 @@ SubmittingJob -- job start fails --> RollingBackJob
RollingBackJob --> DeployFailed
end

linkStyle 5 stroke:#FF0000
linkStyle 7 stroke:#FF0000
linkStyle 4 stroke:#303030
linkStyle 5 stroke:#303030
linkStyle 6 stroke:#FF0000
linkStyle 9 stroke:#FF0000
linkStyle 11 stroke:#FF0000
linkStyle 13 stroke:#FF0000
Binary file modified docs/state_machine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 3 additions & 0 deletions pkg/apis/app/v1beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type FlinkApplicationSpec struct {
// Deprecated: use SavepointPath instead
SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"`
SavepointPath string `json:"savepointPath,omitempty"`
SavepointDisabled bool `json:"savepointDisabled"`
DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"`
RPCPort *int32 `json:"rpcPort,omitempty"`
BlobPort *int32 `json:"blobPort,omitempty"`
Expand Down Expand Up @@ -220,6 +221,7 @@ const (
FlinkApplicationSubmittingJob FlinkApplicationPhase = "SubmittingJob"
FlinkApplicationRunning FlinkApplicationPhase = "Running"
FlinkApplicationSavepointing FlinkApplicationPhase = "Savepointing"
FlinkApplicationCancelling FlinkApplicationPhase = "Cancelling"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also update state_machine.md with the details of this new state?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

FlinkApplicationDeleting FlinkApplicationPhase = "Deleting"
FlinkApplicationRecovering FlinkApplicationPhase = "Recovering"
FlinkApplicationRollingBackJob FlinkApplicationPhase = "RollingBackJob"
Expand All @@ -233,6 +235,7 @@ var FlinkApplicationPhases = []FlinkApplicationPhase{
FlinkApplicationSubmittingJob,
FlinkApplicationRunning,
FlinkApplicationSavepointing,
FlinkApplicationCancelling,
FlinkApplicationDeleting,
FlinkApplicationRecovering,
FlinkApplicationDeployFailed,
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func (c *FlinkJobManagerClient) ForceCancelJob(ctx context.Context, url string,
return GetRetryableError(err, v1beta1.ForceCancelJob, response.Status(), DefaultRetries)
}

c.metrics.forceCancelJobFailureCounter.Inc(ctx)
c.metrics.forceCancelJobSuccessCounter.Inc(ctx)
return nil
}

Expand Down
36 changes: 35 additions & 1 deletion pkg/controller/flinkapplication/flink_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli
updateApplication, appErr = s.handleSubmittingJob(ctx, application)
case v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed:
updateApplication, appErr = s.handleApplicationRunning(ctx, application)
case v1beta1.FlinkApplicationCancelling:
updateApplication, appErr = s.handleApplicationCancelling(ctx, application)
case v1beta1.FlinkApplicationSavepointing:
updateApplication, appErr = s.handleApplicationSavepointing(ctx, application)
case v1beta1.FlinkApplicationRecovering:
Expand Down Expand Up @@ -283,7 +285,11 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati

logger.Infof(ctx, "Flink cluster has started successfully")
// TODO: in single mode move to submitting job
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing)
if application.Spec.SavepointDisabled {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationCancelling)
} else {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing)
}
return statusChanged, nil
}

Expand Down Expand Up @@ -346,6 +352,34 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a
return statusUnchanged, nil
}

func (s *FlinkStateMachine) handleApplicationCancelling(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {

// this is the first deploy
if application.Status.DeployHash == "" {
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}

if rollback, reason := s.shouldRollback(ctx, application); rollback {
// it's rare to come here despite the retries. let's head to submitting.
s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "CancelFailed",
fmt.Sprintf("Could not cancel existing job: %s", reason))
application.Status.RetryCount = 0
application.Status.JobStatus.JobID = ""
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
Copy link
Contributor

@anandswaminathan anandswaminathan Mar 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be FlinkApplicationRollingBackJob

We can't move ahead and submit job in the second cluster, if the first cluster is still running the job.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Definitely better to err on the side of caution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

return statusChanged, nil
}

err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's a potential case here where we get an error back from this call (e.g., a timeout), but the job does end up cancelled. We can handle that by first checking whether the job is running, and if not, moving on to SubmittingJob.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my testing, I saw two kinds of errors

  1. status code 404 when the job is already in Cancelled status
  2. status code 400 for a bad request like the job with Id doesn't exist.

It definitely doesn't hurt to make that extra call to check if the job is running or not.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, in general for operators the pattern is (1) query the current state of the world; (2) make calls to update the world to the desired state. On each iteration of the reconciliation loop you don't really know what the state is until you query it (even the status might out of date or missing updates).

if err != nil {
return statusUnchanged, err
}

application.Status.JobStatus.JobID = ""
s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob)
return statusChanged, nil
}

func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) {
// we're in the middle of a deploy, and savepointing has failed in some way... we're going to try to recover
// and push through if possible
Expand Down
128 changes: 127 additions & 1 deletion pkg/controller/flinkapplication/flink_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,132 @@ func TestHandleStartingClusterStarting(t *testing.T) {
assert.Nil(t, err)
}

func TestHandleNewOrCreateWithSavepointDisabled(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, but can you also add an integration test? Given this is a completely different flow, would be good to fully test out the successful and failure paths.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

updateInvoked := false
app := v1beta1.FlinkApplication{
Spec: v1beta1.FlinkApplicationSpec{
SavepointDisabled: true,
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationClusterStarting,
DeployHash: "old-hash",
},
}

stateMachineForTest := getTestStateMachine()

mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.IsClusterReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) {
return true, nil
}
mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (b bool, e error) {
return true, nil
}
mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) {
fd := testFlinkDeployment(application)
fd.Taskmanager.Status.AvailableReplicas = 2
fd.Jobmanager.Status.AvailableReplicas = 1
return &fd, nil
}

mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error {
return nil
}

mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationCancelling, application.Status.Phase)
updateInvoked = true
return nil
}

err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)
assert.True(t, updateInvoked)
}

func TestHandleApplicationCancel(t *testing.T) {
app := v1beta1.FlinkApplication{
Spec: v1beta1.FlinkApplicationSpec{
SavepointDisabled: true,
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationCancelling,
DeployHash: "old-hash",
},
}

cancelInvoked := false
stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)

mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (e error) {
assert.Equal(t, "old-hash", hash)
cancelInvoked = true

return nil
}

mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase)
return nil
}

err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)

assert.True(t, cancelInvoked)
}

func TestHandleApplicationCancelFailedWithMaxRetries(t *testing.T) {

retryableErr := client.GetRetryableError(errors.New("blah"), "ForceCancelJob", "FAILED", 5)
app := v1beta1.FlinkApplication{
Spec: v1beta1.FlinkApplicationSpec{
SavepointDisabled: true,
},
Status: v1beta1.FlinkApplicationStatus{
Phase: v1beta1.FlinkApplicationCancelling,
DeployHash: "old-hash",
LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError),
},
}

app.Status.LastSeenError.LastErrorUpdateTime = nil
updateInvoked := false
stateMachineForTest := getTestStateMachine()
mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController)
mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error {
// given we maxed out on retries, we should never have come here
assert.False(t, true)
return nil
}

mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster)
mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error {
updateInvoked = true
application := object.(*v1beta1.FlinkApplication)
assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase)
return nil
}

mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler)
mockRetryHandler.IsErrorRetryableFunc = func(err error) bool {
return true
}
mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool {
return false
}

err := stateMachineForTest.Handle(context.Background(), &app)
assert.Nil(t, err)

assert.True(t, updateInvoked)
}

func TestHandleStartingDual(t *testing.T) {
updateInvoked := false
stateMachineForTest := getTestStateMachine()
Expand Down Expand Up @@ -313,7 +439,7 @@ func TestSubmittingToRunning(t *testing.T) {
assert.Equal(t, app.Spec.EntryClass, entryClass)
assert.Equal(t, app.Spec.ProgramArgs, programArgs)
assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState)
assert.Equal(t, app.Spec.SavepointPath, savepointPath)
assert.Equal(t, app.Status.SavepointPath, savepointPath)

startCount++
return jobID, nil
Expand Down