diff --git a/deploy/crd.yaml b/deploy/crd.yaml index 8f165fda..4be01739 100644 --- a/deploy/crd.yaml +++ b/deploy/crd.yaml @@ -114,6 +114,8 @@ spec: type: string savepointPath: type: string + savepointDisabled: + type: boolean maxCheckpointRestoreAgeSeconds: type: integer minimum: 1 diff --git a/docs/crd.md b/docs/crd.md index c3a2ac74..71db6720 100644 --- a/docs/crd.md +++ b/docs/crd.md @@ -90,6 +90,9 @@ Below is the list of fields in the custom resource and their description * **savepointPath** `type:string` If specified, the application state will be restored from this savepoint + * **savepointDisabled** `type:boolean` + If specified, during an update, the current application (if existing) is cancelled without taking a savepoint. + * **allowNonRestoredState** `type:boolean` Skips savepoint operator state that cannot be mapped to the new program version diff --git a/docs/state_machine.md b/docs/state_machine.md index 23261c8f..ca92e269 100644 --- a/docs/state_machine.md +++ b/docs/state_machine.md @@ -19,10 +19,15 @@ labelled and annotated as indicated in the custom resource. The operator also se variables and arguments for the containers to start up the Flink application from the image. ### ClusterStarting -In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we -transition to the `Savepointing` state. Otherwise, if we are unable to start the cluster for some reason (an invalid +In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we check +if the spec has `savepointDisabled` field set to true. If yes, we transition to `Cancelling` state else to `Savepointing`. +If we are unable to start the cluster for some reason (an invalid image, bad configuration, not enough Kubernetes resources, etc.), we transition to the `DeployFailed` state. +### Cancelling +In this state, the operator attempts to cancel the running job (if existing) and transition to `SubmittingJob` state. +If it fails, we transition to `RollingBack`. + ### Savepointing In the `Savepointing` state, the operator attempts to cancel the existing job with a [savepoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html) (if this is the first diff --git a/docs/state_machine.mmd b/docs/state_machine.mmd index 17e9c11d..fd042a46 100644 --- a/docs/state_machine.mmd +++ b/docs/state_machine.mmd @@ -14,9 +14,12 @@ Running --> Updating Updating --> ClusterStarting DeployFailed --> Updating -ClusterStarting --> Savepointing +ClusterStarting -- savepoint disabled --> Cancelling +ClusterStarting -- savepoint enabled --> Savepointing ClusterStarting -- Create fails --> DeployFailed +Cancelling --> SubmittingJob +Cancelling -- cancel fails --> RollingBackJob Savepointing --> SubmittingJob Savepointing -- Savepoint fails --> Recovering @@ -28,7 +31,10 @@ SubmittingJob -- job start fails --> RollingBackJob RollingBackJob --> DeployFailed end -linkStyle 5 stroke:#FF0000 -linkStyle 7 stroke:#FF0000 -linkStyle 9 stroke:#FF0000 -linkStyle 11 stroke:#FF0000 +linkStyle 4 stroke:#303030 +linkStyle 5 stroke:#303030 +linkStyle 6 stroke:#FF0000 +linkStyle 8 stroke:#FF0000 +linkStyle 10 stroke:#FF0000 +linkStyle 12 stroke:#FF0000 +linkStyle 14 stroke:#FF0000 \ No newline at end of file diff --git a/docs/state_machine.png b/docs/state_machine.png index 7a5ce661..99967b21 100644 Binary files a/docs/state_machine.png and b/docs/state_machine.png differ diff --git a/integ/checkpoint_failure_test.go b/integ/checkpoint_failure_test.go index 7cc85537..9f7a596c 100644 --- a/integ/checkpoint_failure_test.go +++ b/integ/checkpoint_failure_test.go @@ -17,7 +17,7 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) { config, err := s.Util.ReadFlinkApplication("test_app.yaml") c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) config.Name = testName + "job" - config.Spec.DeleteMode = "ForceCancel" + config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel config.ObjectMeta.Labels["integTest"] = testName diff --git a/integ/job_cancellation_test.go b/integ/job_cancellation_test.go new file mode 100644 index 00000000..6c92a3e0 --- /dev/null +++ b/integ/job_cancellation_test.go @@ -0,0 +1,295 @@ +package integ + +import ( + "fmt" + "time" + + "github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1" + "github.com/prometheus/common/log" + . "gopkg.in/check.v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func WaitUpdateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication { + + // update with new appln image. + app, err := s.Util.Update(name, updateFn) + c.Assert(err, IsNil) + + for { + // keep trying until the new job is launched + newApp, err := s.Util.GetFlinkApplication(name) + c.Assert(err, IsNil) + if newApp.Status.JobStatus.JobID != "" && + newApp.Status.JobStatus.JobID != app.Status.JobStatus.JobID { + break + } + time.Sleep(100 * time.Millisecond) + } + + c.Assert(s.Util.WaitForPhase(name, v1beta1.FlinkApplicationRunning, failurePhase), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(name), IsNil) + + // check that the new job started from an empty savepoint. + newApp, _ := s.Util.GetFlinkApplication(name) + c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID) + c.Assert(newApp.Status.SavepointPath, Equals, "") + + // wait for the old cluster to be cleaned up + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "flink-app-hash=" + app.Status.DeployHash}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + time.Sleep(100 * time.Millisecond) + } + return newApp +} + +// tests the workflow of job cancellation without savepoint +func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) { + + testName := "cancelsuccess" + const finalizer = "simple.finalizers.test.com" + + // start a simple app + config, err := s.Util.ReadFlinkApplication("test_app.yaml") + c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) + + config.Name = testName + "job" + config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + config.Spec.SavepointDisabled = true + config.ObjectMeta.Labels["integTest"] = testName + config.Finalizers = append(config.Finalizers, finalizer) + + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) + + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + c.Assert(len(pods.Items), Equals, 3) + for _, pod := range pods.Items { + c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image) + } + + // test updating the app with a new image + newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.Image = NewImage + }, v1beta1.FlinkApplicationDeployFailed) + + c.Assert(newApp.Spec.Image, Equals, NewImage) + c.Assert(newApp.Status.SavepointPath, Equals, "") + + pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + c.Assert(len(pods.Items), Equals, 3) + for _, pod := range pods.Items { + c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) + } + + // cleanup + c.Assert(s.Util.FlinkApps().Delete(newApp.Name, &v1.DeleteOptions{}), IsNil) + var app *v1beta1.FlinkApplication + for { + app, err = s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer { + break + } + time.Sleep(100 * time.Millisecond) + } + + job := s.Util.GetJobOverview(app) + c.Assert(job["status"], Equals, "CANCELED") + + // delete our finalizer + app.Finalizers = []string{} + _, err = s.Util.FlinkApps().Update(app) + c.Assert(err, IsNil) + + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + } + log.Info("All pods torn down") +} + +// tests a job update with the existing job already in cancelled state. +// here, the new submitted job starts without a savepoint. +func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) { + + testName := "invalidcancel" + config, err := s.Util.ReadFlinkApplication("test_app.yaml") + c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) + + config.Name = testName + "job" + config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + config.Spec.SavepointDisabled = true + config.ObjectMeta.Labels["integTest"] = testName + + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) + + currApp, _ := s.Util.GetFlinkApplication(config.Name) + c.Assert(currApp.Status.SavepointPath, Equals, "") + job := s.Util.GetJobOverview(currApp) + c.Assert(job["status"], Equals, "RUNNING") + + // trigger a cancel on the existing job + endpoint := fmt.Sprintf("jobs/%s?mode=cancel", currApp.Status.JobStatus.JobID) + _, err = s.Util.FlinkAPIPatch(currApp, endpoint) + c.Assert(err, IsNil) + + // wait a bit + time.Sleep(1 * time.Second) + + job = s.Util.GetJobOverview(currApp) + c.Assert(job["status"], Equals, "CANCELED") + + newApp, err := s.Util.Update(config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.Image = NewImage + }) + c.Assert(err, IsNil) + + for { + // wait until the new job is launched + newApp, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + if newApp.Status.JobStatus.JobID != "" && + newApp.Status.JobStatus.JobID != currApp.Status.JobStatus.JobID { + break + } + time.Sleep(100 * time.Millisecond) + } + + // we should end up in the Running of the new job + c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil) + + newApp, err = s.Util.GetFlinkApplication(newApp.Name) + c.Assert(err, IsNil) + + job = s.Util.GetJobOverview(newApp) + c.Assert(job["status"], Equals, "RUNNING") + c.Assert(newApp.Status.SavepointPath, Equals, "") + + // start deleting + c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil) + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + } + log.Info("All pods torn down") +} + +// tests the recovery workflow of the job when savepoint is disabled. +func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) { + + const finalizer = "simple.finalizers.test.com" + const testName = "cancelrecovery" + + config, err := s.Util.ReadFlinkApplication("test_app.yaml") + c.Assert(err, IsNil, Commentf("Failed to read test app yaml")) + + config.Name = testName + config.ObjectMeta.Labels["integTest"] = testName + config.Finalizers = append(config.Finalizers, finalizer) + config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel + config.Spec.SavepointDisabled = true + + c.Assert(s.Util.CreateFlinkApplication(config), IsNil, + Commentf("Failed to create flink application")) + + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationSavepointing), IsNil) + + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) + currApp, _ := s.Util.GetFlinkApplication(config.Name) + c.Assert(currApp.Status.SavepointPath, Equals, "") + + // Test updating the app with a bad jar name -- this should cause a failed deploy and roll back + _, err = s.Util.Update(config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.JarName = "nonexistent.jar" + app.Spec.RestartNonce = "rollback" + }) + c.Assert(err, IsNil) + c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed, ""), IsNil) + c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil) + + // assert the restart of the job with a new job id and old deploy hash. + newApp, err := s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), currApp.Status.JobStatus.JobID) + c.Assert(newApp.Status.SavepointPath, Equals, "") + c.Assert(newApp.Status.SavepointTriggerID, Equals, "") + c.Assert(newApp.Status.DeployHash, Equals, currApp.Status.DeployHash) + + // assert that the restarted job wasn't restored from a savepoint. + endpoint := fmt.Sprintf("jobs/%s/checkpoints", newApp.Status.JobStatus.JobID) + res, err := s.Util.FlinkAPIGet(newApp, endpoint) + c.Assert(err, IsNil) + body := res.(map[string]interface{}) + restored := (body["latest"].(map[string]interface{}))["restored"] + c.Assert(restored, IsNil) + + // roll forward with the right config. + _ = WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) { + app.Spec.JarName = config.Spec.JarName + app.Spec.RestartNonce = "rollback2" + app.Spec.Image = NewImage + }, "") + + // assert the pods have the new image + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + for _, pod := range pods.Items { + c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage) + } + + // delete the application and ensure everything is cleaned up successfully + c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil) + var app *v1beta1.FlinkApplication + for { + app, err = s.Util.GetFlinkApplication(config.Name) + c.Assert(err, IsNil) + if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer { + break + } + time.Sleep(100 * time.Millisecond) + } + c.Assert(app.Status.SavepointPath, Equals, "") + c.Assert(app.Status.SavepointTriggerID, Equals, "") + + app.Finalizers = []string{} + _, err = s.Util.FlinkApps().Update(app) + c.Assert(err, IsNil) + + // wait until all pods are deleted + for { + pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name). + List(v1.ListOptions{LabelSelector: "integTest=" + testName}) + c.Assert(err, IsNil) + if len(pods.Items) == 0 { + break + } + time.Sleep(100 * time.Millisecond) + } + log.Info("All pods torn down") +} diff --git a/integ/utils/utils.go b/integ/utils/utils.go index ed9b9a88..fb632179 100644 --- a/integ/utils/utils.go +++ b/integ/utils/utils.go @@ -405,6 +405,30 @@ func (f *TestUtil) FlinkAPIGet(app *flinkapp.FlinkApplication, endpoint string) return result, nil } +func (f *TestUtil) FlinkAPIPatch(app *flinkapp.FlinkApplication, endpoint string) (interface{}, error) { + + url := fmt.Sprintf("http://localhost:8001/api/v1/namespaces/%s/"+ + "services/%s:8081/proxy/%s", + f.Namespace.Name, app.Name, endpoint) + + resp, err := resty.SetRedirectPolicy(resty.FlexibleRedirectPolicy(5)).R().Patch(url) + if err != nil { + return nil, err + } + + if !resp.IsSuccess() { + return nil, fmt.Errorf("request failed with code %d", resp.StatusCode()) + } + + var result interface{} + err = json.Unmarshal(resp.Body(), &result) + if err != nil { + return nil, err + } + + return result, nil +} + func vertexRunning(vertex map[string]interface{}) bool { if vertex["status"] != "RUNNING" { return false @@ -482,3 +506,17 @@ func (f *TestUtil) Update(name string, updateFn func(app *flinkapp.FlinkApplicat time.Sleep(500 * time.Millisecond) } } + +func (f *TestUtil) GetJobOverview(app *flinkapp.FlinkApplication) map[string]interface{} { + + jobs, _ := f.FlinkAPIGet(app, "/jobs") + jobMap := jobs.(map[string]interface{}) + jobList := jobMap["jobs"].([]interface{}) + for _, j := range jobList { + job := j.(map[string]interface{}) + if job["id"] == app.Status.JobStatus.JobID { + return job + } + } + return nil +} diff --git a/pkg/apis/app/v1beta1/types.go b/pkg/apis/app/v1beta1/types.go index a0715003..35a20dd1 100644 --- a/pkg/apis/app/v1beta1/types.go +++ b/pkg/apis/app/v1beta1/types.go @@ -44,6 +44,7 @@ type FlinkApplicationSpec struct { // Deprecated: use SavepointPath instead SavepointInfo SavepointInfo `json:"savepointInfo,omitempty"` SavepointPath string `json:"savepointPath,omitempty"` + SavepointDisabled bool `json:"savepointDisabled"` DeploymentMode DeploymentMode `json:"deploymentMode,omitempty"` RPCPort *int32 `json:"rpcPort,omitempty"` BlobPort *int32 `json:"blobPort,omitempty"` @@ -220,6 +221,7 @@ const ( FlinkApplicationSubmittingJob FlinkApplicationPhase = "SubmittingJob" FlinkApplicationRunning FlinkApplicationPhase = "Running" FlinkApplicationSavepointing FlinkApplicationPhase = "Savepointing" + FlinkApplicationCancelling FlinkApplicationPhase = "Cancelling" FlinkApplicationDeleting FlinkApplicationPhase = "Deleting" FlinkApplicationRecovering FlinkApplicationPhase = "Recovering" FlinkApplicationRollingBackJob FlinkApplicationPhase = "RollingBackJob" @@ -233,6 +235,7 @@ var FlinkApplicationPhases = []FlinkApplicationPhase{ FlinkApplicationSubmittingJob, FlinkApplicationRunning, FlinkApplicationSavepointing, + FlinkApplicationCancelling, FlinkApplicationDeleting, FlinkApplicationRecovering, FlinkApplicationDeployFailed, diff --git a/pkg/controller/flink/client/api.go b/pkg/controller/flink/client/api.go index 68c802fe..76048d0b 100644 --- a/pkg/controller/flink/client/api.go +++ b/pkg/controller/flink/client/api.go @@ -220,7 +220,7 @@ func (c *FlinkJobManagerClient) ForceCancelJob(ctx context.Context, url string, return GetRetryableError(err, v1beta1.ForceCancelJob, response.Status(), DefaultRetries) } - c.metrics.forceCancelJobFailureCounter.Inc(ctx) + c.metrics.forceCancelJobSuccessCounter.Inc(ctx) return nil } diff --git a/pkg/controller/flinkapplication/flink_state_machine.go b/pkg/controller/flinkapplication/flink_state_machine.go index 51682d3c..668927eb 100644 --- a/pkg/controller/flinkapplication/flink_state_machine.go +++ b/pkg/controller/flinkapplication/flink_state_machine.go @@ -174,6 +174,8 @@ func (s *FlinkStateMachine) handle(ctx context.Context, application *v1beta1.Fli updateApplication, appErr = s.handleSubmittingJob(ctx, application) case v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed: updateApplication, appErr = s.handleApplicationRunning(ctx, application) + case v1beta1.FlinkApplicationCancelling: + updateApplication, appErr = s.handleApplicationCancelling(ctx, application) case v1beta1.FlinkApplicationSavepointing: updateApplication, appErr = s.handleApplicationSavepointing(ctx, application) case v1beta1.FlinkApplicationRecovering: @@ -283,7 +285,11 @@ func (s *FlinkStateMachine) handleClusterStarting(ctx context.Context, applicati logger.Infof(ctx, "Flink cluster has started successfully") // TODO: in single mode move to submitting job - s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) + if application.Spec.SavepointDisabled { + s.updateApplicationPhase(application, v1beta1.FlinkApplicationCancelling) + } else { + s.updateApplicationPhase(application, v1beta1.FlinkApplicationSavepointing) + } return statusChanged, nil } @@ -346,6 +352,41 @@ func (s *FlinkStateMachine) handleApplicationSavepointing(ctx context.Context, a return statusUnchanged, nil } +func (s *FlinkStateMachine) handleApplicationCancelling(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + + // this is the first deploy + if application.Status.DeployHash == "" { + s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) + return statusChanged, nil + } + + if rollback, reason := s.shouldRollback(ctx, application); rollback { + s.flinkController.LogEvent(ctx, application, corev1.EventTypeWarning, "CancelFailed", + fmt.Sprintf("Could not cancel existing job: %s", reason)) + application.Status.RetryCount = 0 + application.Status.JobStatus.JobID = "" + s.updateApplicationPhase(application, v1beta1.FlinkApplicationRollingBackJob) + return statusChanged, nil + } + + job, err := s.flinkController.GetJobForApplication(ctx, application, application.Status.DeployHash) + if err != nil { + return statusUnchanged, err + } + + if job != nil && job.State != client.Canceled && + job.State != client.Failed { + err := s.flinkController.ForceCancel(ctx, application, application.Status.DeployHash) + if err != nil { + return statusUnchanged, err + } + } + + application.Status.JobStatus.JobID = "" + s.updateApplicationPhase(application, v1beta1.FlinkApplicationSubmittingJob) + return statusChanged, nil +} + func (s *FlinkStateMachine) handleApplicationRecovering(ctx context.Context, app *v1beta1.FlinkApplication) (bool, error) { // we're in the middle of a deploy, and savepointing has failed in some way... we're going to try to recover // and push through if possible diff --git a/pkg/controller/flinkapplication/flink_state_machine_test.go b/pkg/controller/flinkapplication/flink_state_machine_test.go index 3114f8f9..a8534233 100644 --- a/pkg/controller/flinkapplication/flink_state_machine_test.go +++ b/pkg/controller/flinkapplication/flink_state_machine_test.go @@ -81,6 +81,140 @@ func TestHandleStartingClusterStarting(t *testing.T) { assert.Nil(t, err) } +func TestHandleNewOrCreateWithSavepointDisabled(t *testing.T) { + updateInvoked := false + app := v1beta1.FlinkApplication{ + Spec: v1beta1.FlinkApplicationSpec{ + SavepointDisabled: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationClusterStarting, + DeployHash: "old-hash", + }, + } + + stateMachineForTest := getTestStateMachine() + + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.IsClusterReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (bool, error) { + return true, nil + } + mockFlinkController.IsServiceReadyFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (b bool, e error) { + return true, nil + } + mockFlinkController.GetCurrentDeploymentsForAppFunc = func(ctx context.Context, application *v1beta1.FlinkApplication) (*common.FlinkDeployment, error) { + fd := testFlinkDeployment(application) + fd.Taskmanager.Status.AvailableReplicas = 2 + fd.Jobmanager.Status.AvailableReplicas = 1 + return &fd, nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.UpdateK8ObjectFunc = func(ctx context.Context, object runtime.Object) error { + return nil + } + + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationCancelling, application.Status.Phase) + updateInvoked = true + return nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + assert.True(t, updateInvoked) +} + +func TestHandleApplicationCancel(t *testing.T) { + jobID := "j1" + app := v1beta1.FlinkApplication{ + Spec: v1beta1.FlinkApplicationSpec{ + SavepointDisabled: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationCancelling, + DeployHash: "old-hash", + }, + } + + cancelInvoked := false + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.GetJobForApplicationFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (*client.FlinkJobOverview, error) { + assert.Equal(t, "old-hash", hash) + return &client.FlinkJobOverview{ + JobID: jobID, + State: client.Running, + }, nil + } + + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) (e error) { + assert.Equal(t, "old-hash", hash) + cancelInvoked = true + + return nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationSubmittingJob, application.Status.Phase) + return nil + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + + assert.True(t, cancelInvoked) +} + +func TestHandleApplicationCancelFailedWithMaxRetries(t *testing.T) { + + retryableErr := client.GetRetryableError(errors.New("blah"), "ForceCancelJob", "FAILED", 5) + app := v1beta1.FlinkApplication{ + Spec: v1beta1.FlinkApplicationSpec{ + SavepointDisabled: true, + }, + Status: v1beta1.FlinkApplicationStatus{ + Phase: v1beta1.FlinkApplicationCancelling, + DeployHash: "old-hash", + LastSeenError: retryableErr.(*v1beta1.FlinkApplicationError), + }, + } + + app.Status.LastSeenError.LastErrorUpdateTime = nil + updateInvoked := false + stateMachineForTest := getTestStateMachine() + mockFlinkController := stateMachineForTest.flinkController.(*mock.FlinkController) + mockFlinkController.ForceCancelFunc = func(ctx context.Context, application *v1beta1.FlinkApplication, hash string) error { + // given we maxed out on retries, we should never have come here + assert.False(t, true) + return nil + } + + mockK8Cluster := stateMachineForTest.k8Cluster.(*k8mock.K8Cluster) + mockK8Cluster.UpdateStatusFunc = func(ctx context.Context, object runtime.Object) error { + updateInvoked = true + application := object.(*v1beta1.FlinkApplication) + assert.Equal(t, v1beta1.FlinkApplicationRollingBackJob, application.Status.Phase) + return nil + } + + mockRetryHandler := stateMachineForTest.retryHandler.(*mock.RetryHandler) + mockRetryHandler.IsErrorRetryableFunc = func(err error) bool { + return true + } + mockRetryHandler.IsRetryRemainingFunc = func(err error, retryCount int32) bool { + return false + } + + err := stateMachineForTest.Handle(context.Background(), &app) + assert.Nil(t, err) + + assert.True(t, updateInvoked) +} + func TestHandleStartingDual(t *testing.T) { updateInvoked := false stateMachineForTest := getTestStateMachine() @@ -313,7 +447,7 @@ func TestSubmittingToRunning(t *testing.T) { assert.Equal(t, app.Spec.EntryClass, entryClass) assert.Equal(t, app.Spec.ProgramArgs, programArgs) assert.Equal(t, app.Spec.AllowNonRestoredState, allowNonRestoredState) - assert.Equal(t, app.Spec.SavepointPath, savepointPath) + assert.Equal(t, app.Status.SavepointPath, savepointPath) startCount++ return jobID, nil