Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[STRMCMP-639] Trigger immediate rollback when job submission fails #117

Merged
merged 2 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pkg/controller/flink/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ const retryCount = 3
const httpGetTimeOut = 5 * time.Second
const defaultTimeOut = 1 * time.Minute

// ProgramInvocationException is thrown when the entry class doesn't exist or throws an exception
const programInvocationException = "org.apache.flink.client.program.ProgramInvocationException"

// JobSubmissionException is thrown when there is an error submitting the job (e.g., the savepoint is
// incompatible, classes for parts of the jobgraph cannot be found, jobgraph is invalid, etc.)
const jobSubmissionException = "org.apache.flink.runtime.client.JobSubmissionException"

type FlinkAPIInterface interface {
CancelJobWithSavepoint(ctx context.Context, url string, jobID string) (string, error)
ForceCancelJob(ctx context.Context, url string, jobID string) error
Expand Down Expand Up @@ -231,7 +238,7 @@ func (c *FlinkJobManagerClient) SubmitJob(ctx context.Context, url string, jarID
// Flink returns a 500 when the entry class doesn't exist or crashes on start, but we want to fail fast
// in those cases
body := response.String()
if strings.Contains(body, "org.apache.flink.client.program.ProgramInvocationException") {
if strings.Contains(body, programInvocationException) || strings.Contains(body, jobSubmissionException) {
return nil, GetNonRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), body)
}
return nil, GetRetryableErrorWithMessage(err, v1beta1.SubmitJob, response.Status(), DefaultRetries, string(response.Body()))
Expand Down
23 changes: 21 additions & 2 deletions pkg/controller/flink/client/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
const testURL = "http://abc.com"
const invalidTestResponse = "invalid response"
const wrongEntryClassResponse = `{"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'com.lyft.streamingplatform.OperatorTestAppX' was not found in the jar file.\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:617)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:199)\n\tat org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:149)\n\tat org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:125)\n\tat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: java.lang.ClassNotFoundException: com.lyft.streamingplatform.OperatorTestAppX\n\tat java.net.URLClassLoader.findClass(URLClassLoader.java:382)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:424)\n\tat java.lang.ClassLoader.loadClass(ClassLoader.java:357)\n\tat java.lang.Class.forName0(Native Method)\n\tat java.lang.Class.forName(Class.java:348)\n\tat org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:614)\n\t... 8 more\n\nEnd of exception on server side>"]}`
const incompatibleSavepointResponse = `{"errors":["Internal server error.","<Exception on server side:\norg.apache.flink.runtime.client.JobSubmissionException: Failed to submit job.\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$internalSubmitJob$2(Dispatcher.java:309)\n\tat java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)\n\tat java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)\n\tat java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)\n\tat akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)\n\tat akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)\n\tat scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager\n\tat org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)\n\tat java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)\n\t... 6 more\nCaused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:152)\n\tat org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:76)\n\tat org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:351)\n\tat org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)\n\t... 7 more\nCaused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint file:/checkpoints/flink/savepoints/savepoint-fca98f-4c1a4baeebec. Cannot map checkpoint/savepoint state for operator f3dcb3ca563a8ba134b6239a5c78c939 to the new program, because the operator is not available in the new program. If you want to allow to skip this, you can set the --allowNonRestoredState option on the CLI.\n\tat org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)\n\tat org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1266)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1190)\n\tat org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:287)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:83)\n\tat org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:37)\n\tat org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:146)\n\t... 10 more\n\nEnd of exception on server side>"]}`
const fakeJobsURL = "http://abc.com/jobs"
const fakeOverviewURL = "http://abc.com/overview"
const fakeJobConfigURL = "http://abc.com/jobs/1/config"
Expand Down Expand Up @@ -350,8 +351,6 @@ func TestSubmitStartupFail(t *testing.T) {
defer httpmock.DeactivateAndReset()
ctx := context.Background()

//var message json.RawMessage
//err := json.Unmarshal([]byte(wrongEntryClassResponse), message)
responder := httpmock.NewStringResponder(500, wrongEntryClassResponse)
httpmock.RegisterResponder("POST", fakeSubmitURL, responder)

Expand All @@ -367,6 +366,26 @@ func TestSubmitStartupFail(t *testing.T) {
wrongEntryClassResponse+"'")
}

func TestIncompatibleSavepointFail(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
ctx := context.Background()

responder := httpmock.NewStringResponder(500, incompatibleSavepointResponse)
httpmock.RegisterResponder("POST", fakeSubmitURL, responder)

client := getTestJobManagerClient()
resp, err := client.SubmitJob(ctx, testURL, "1", SubmitJobRequest{
Parallelism: 10,
})
assert.Nil(t, resp)
flinkAppError, _ := err.(*v1beta1.FlinkApplicationError)
assert.True(t, flinkAppError.IsFailFast)

assert.EqualError(t, err, "SubmitJob call failed with status 500 and message '"+
incompatibleSavepointResponse+"'")
}

func TestSubmitJobError(t *testing.T) {
httpmock.Activate()
defer httpmock.DeactivateAndReset()
Expand Down