Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow disabling Savepoint during updates #184

Merged
merged 9 commits into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions deploy/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ spec:
type: string
savepointPath:
type: string
savepointDisabled:
type: boolean
maxCheckpointRestoreAgeSeconds:
type: integer
minimum: 1
Expand Down
3 changes: 3 additions & 0 deletions docs/crd.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ Below is the list of fields in the custom resource and their description
* **savepointPath** `type:string`
If specified, the application state will be restored from this savepoint

* **savepointDisabled** `type:boolean`
If specified, during an update, the current application (if existing) is cancelled without taking a savepoint.

* **allowNonRestoredState** `type:boolean`
Skips savepoint operator state that cannot be mapped to the new program version

Expand Down
9 changes: 7 additions & 2 deletions docs/state_machine.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ labelled and annotated as indicated in the custom resource. The operator also se
variables and arguments for the containers to start up the Flink application from the image.

### ClusterStarting
In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we
transition to the `Savepointing` state. Otherwise, if we are unable to start the cluster for some reason (an invalid
In this state, the operator monitors the Flink cluster created in the New state. Once it successfully starts, we check
if the spec has `savepointDisabled` field set to true. If yes, we transition to `Cancelling` state else to `Savepointing`.
If we are unable to start the cluster for some reason (an invalid
image, bad configuration, not enough Kubernetes resources, etc.), we transition to the `DeployFailed` state.

### Cancelling
In this state, the operator attempts to cancel the running job (if existing) and transition to `SubmittingJob` state.
If it fails, we transition to `RollingBack`.

### Savepointing
In the `Savepointing` state, the operator attempts to cancel the existing job with a
[savepoint](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/state/savepoints.html) (if this is the first
Expand Down
16 changes: 11 additions & 5 deletions docs/state_machine.mmd
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ Running --> Updating
Updating --> ClusterStarting
DeployFailed --> Updating

ClusterStarting --> Savepointing
ClusterStarting -- savepoint disabled --> Cancelling
ClusterStarting -- savepoint enabled --> Savepointing
ClusterStarting -- Create fails --> DeployFailed

Cancelling --> SubmittingJob
Cancelling -- cancel fails --> RollingBackJob
Savepointing --> SubmittingJob
Savepointing -- Savepoint fails --> Recovering

Expand All @@ -28,7 +31,10 @@ SubmittingJob -- job start fails --> RollingBackJob
RollingBackJob --> DeployFailed
end

linkStyle 5 stroke:#FF0000
linkStyle 7 stroke:#FF0000
linkStyle 9 stroke:#FF0000
linkStyle 11 stroke:#FF0000
linkStyle 4 stroke:#303030
linkStyle 5 stroke:#303030
linkStyle 6 stroke:#FF0000
linkStyle 8 stroke:#FF0000
linkStyle 10 stroke:#FF0000
linkStyle 12 stroke:#FF0000
linkStyle 14 stroke:#FF0000
Binary file modified docs/state_machine.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
2 changes: 1 addition & 1 deletion integ/checkpoint_failure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func failingJobTest(s *IntegSuite, c *C, testName string, causeFailure func()) {
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))
config.Name = testName + "job"
config.Spec.DeleteMode = "ForceCancel"
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel

config.ObjectMeta.Labels["integTest"] = testName

Expand Down
295 changes: 295 additions & 0 deletions integ/job_cancellation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
package integ

import (
"fmt"
"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1beta1"
"github.com/prometheus/common/log"
. "gopkg.in/check.v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func WaitUpdateAndValidate(c *C, s *IntegSuite, name string, updateFn func(app *v1beta1.FlinkApplication), failurePhase v1beta1.FlinkApplicationPhase) *v1beta1.FlinkApplication {

// update with new appln image.
app, err := s.Util.Update(name, updateFn)
c.Assert(err, IsNil)

for {
// keep trying until the new job is launched
newApp, err := s.Util.GetFlinkApplication(name)
c.Assert(err, IsNil)
if newApp.Status.JobStatus.JobID != "" &&
newApp.Status.JobStatus.JobID != app.Status.JobStatus.JobID {
break
}
time.Sleep(100 * time.Millisecond)
}

c.Assert(s.Util.WaitForPhase(name, v1beta1.FlinkApplicationRunning, failurePhase), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(name), IsNil)

// check that the new job started from an empty savepoint.
newApp, _ := s.Util.GetFlinkApplication(name)
c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), app.Status.JobStatus.JobID)
c.Assert(newApp.Status.SavepointPath, Equals, "")

// wait for the old cluster to be cleaned up
for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "flink-app-hash=" + app.Status.DeployHash})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
return newApp
}

// tests the workflow of job cancellation without savepoint
func (s *IntegSuite) TestJobCancellationWithoutSavepoint(c *C) {

testName := "cancelsuccess"
const finalizer = "simple.finalizers.test.com"

// start a simple app
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))

config.Name = testName + "job"
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
config.Spec.SavepointDisabled = true
config.ObjectMeta.Labels["integTest"] = testName
config.Finalizers = append(config.Finalizers, finalizer)

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, config.Spec.Image)
}

// test updating the app with a new image
newApp := WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.Image = NewImage
}, v1beta1.FlinkApplicationDeployFailed)

c.Assert(newApp.Spec.Image, Equals, NewImage)
c.Assert(newApp.Status.SavepointPath, Equals, "")

pods, err = s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
c.Assert(len(pods.Items), Equals, 3)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage)
}

// cleanup
c.Assert(s.Util.FlinkApps().Delete(newApp.Name, &v1.DeleteOptions{}), IsNil)
var app *v1beta1.FlinkApplication
for {
app, err = s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer {
break
}
time.Sleep(100 * time.Millisecond)
}

job := s.Util.GetJobOverview(app)
c.Assert(job["status"], Equals, "CANCELED")

// delete our finalizer
app.Finalizers = []string{}
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
}
log.Info("All pods torn down")
}

// tests a job update with the existing job already in cancelled state.
// here, the new submitted job starts without a savepoint.
func (s *IntegSuite) TestCancelledJobWithoutSavepoint(c *C) {

testName := "invalidcancel"
config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))

config.Name = testName + "job"
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
config.Spec.SavepointDisabled = true
config.ObjectMeta.Labels["integTest"] = testName

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

currApp, _ := s.Util.GetFlinkApplication(config.Name)
c.Assert(currApp.Status.SavepointPath, Equals, "")
job := s.Util.GetJobOverview(currApp)
c.Assert(job["status"], Equals, "RUNNING")

// trigger a cancel on the existing job
endpoint := fmt.Sprintf("jobs/%s?mode=cancel", currApp.Status.JobStatus.JobID)
_, err = s.Util.FlinkAPIPatch(currApp, endpoint)
c.Assert(err, IsNil)

// wait a bit
time.Sleep(1 * time.Second)

job = s.Util.GetJobOverview(currApp)
c.Assert(job["status"], Equals, "CANCELED")

newApp, err := s.Util.Update(config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.Image = NewImage
})
c.Assert(err, IsNil)

for {
// wait until the new job is launched
newApp, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
if newApp.Status.JobStatus.JobID != "" &&
newApp.Status.JobStatus.JobID != currApp.Status.JobStatus.JobID {
break
}
time.Sleep(100 * time.Millisecond)
}

// we should end up in the Running of the new job
c.Assert(s.Util.WaitForPhase(newApp.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationDeployFailed), IsNil)

newApp, err = s.Util.GetFlinkApplication(newApp.Name)
c.Assert(err, IsNil)

job = s.Util.GetJobOverview(newApp)
c.Assert(job["status"], Equals, "RUNNING")
c.Assert(newApp.Status.SavepointPath, Equals, "")

// start deleting
c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil)
for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
}
log.Info("All pods torn down")
}

// tests the recovery workflow of the job when savepoint is disabled.
func (s *IntegSuite) TestJobRecoveryWithoutSavepoint(c *C) {

const finalizer = "simple.finalizers.test.com"
const testName = "cancelrecovery"

config, err := s.Util.ReadFlinkApplication("test_app.yaml")
c.Assert(err, IsNil, Commentf("Failed to read test app yaml"))

config.Name = testName
config.ObjectMeta.Labels["integTest"] = testName
config.Finalizers = append(config.Finalizers, finalizer)
config.Spec.DeleteMode = v1beta1.DeleteModeForceCancel
config.Spec.SavepointDisabled = true

c.Assert(s.Util.CreateFlinkApplication(config), IsNil,
Commentf("Failed to create flink application"))

c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationRunning, v1beta1.FlinkApplicationSavepointing), IsNil)

c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)
currApp, _ := s.Util.GetFlinkApplication(config.Name)
c.Assert(currApp.Status.SavepointPath, Equals, "")

// Test updating the app with a bad jar name -- this should cause a failed deploy and roll back
_, err = s.Util.Update(config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.JarName = "nonexistent.jar"
app.Spec.RestartNonce = "rollback"
})
c.Assert(err, IsNil)
c.Assert(s.Util.WaitForPhase(config.Name, v1beta1.FlinkApplicationDeployFailed, ""), IsNil)
c.Assert(s.Util.WaitForAllTasksRunning(config.Name), IsNil)

// assert the restart of the job with a new job id and old deploy hash.
newApp, err := s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
c.Assert(newApp.Status.JobStatus.JobID, Not(Equals), currApp.Status.JobStatus.JobID)
c.Assert(newApp.Status.SavepointPath, Equals, "")
c.Assert(newApp.Status.SavepointTriggerID, Equals, "")
c.Assert(newApp.Status.DeployHash, Equals, currApp.Status.DeployHash)

// assert that the restarted job wasn't restored from a savepoint.
endpoint := fmt.Sprintf("jobs/%s/checkpoints", newApp.Status.JobStatus.JobID)
res, err := s.Util.FlinkAPIGet(newApp, endpoint)
c.Assert(err, IsNil)
body := res.(map[string]interface{})
restored := (body["latest"].(map[string]interface{}))["restored"]
c.Assert(restored, IsNil)

// roll forward with the right config.
_ = WaitUpdateAndValidate(c, s, config.Name, func(app *v1beta1.FlinkApplication) {
app.Spec.JarName = config.Spec.JarName
app.Spec.RestartNonce = "rollback2"
app.Spec.Image = NewImage
}, "")

// assert the pods have the new image
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
for _, pod := range pods.Items {
c.Assert(pod.Spec.Containers[0].Image, Equals, NewImage)
}

// delete the application and ensure everything is cleaned up successfully
c.Assert(s.Util.FlinkApps().Delete(config.Name, &v1.DeleteOptions{}), IsNil)
var app *v1beta1.FlinkApplication
for {
app, err = s.Util.GetFlinkApplication(config.Name)
c.Assert(err, IsNil)
if len(app.Finalizers) == 1 && app.Finalizers[0] == finalizer {
break
}
time.Sleep(100 * time.Millisecond)
}
c.Assert(app.Status.SavepointPath, Equals, "")
c.Assert(app.Status.SavepointTriggerID, Equals, "")

app.Finalizers = []string{}
_, err = s.Util.FlinkApps().Update(app)
c.Assert(err, IsNil)

// wait until all pods are deleted
for {
pods, err := s.Util.KubeClient.CoreV1().Pods(s.Util.Namespace.Name).
List(v1.ListOptions{LabelSelector: "integTest=" + testName})
c.Assert(err, IsNil)
if len(pods.Items) == 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
log.Info("All pods torn down")
}
Loading