From 12459d1998f7c6c8717bb9564a806b0f3d4a5de3 Mon Sep 17 00:00:00 2001 From: Mohammad Arshad Date: Sun, 25 Aug 2024 14:49:39 +0530 Subject: [PATCH] [Bug] [Seatunnel-web] When job execution initialization fails, the job execution status remains unchanged. (#194) --- .../service/impl/JobExecutorServiceImpl.java | 59 +++++++++++-------- .../server/common/SeatunnelErrorEnum.java | 1 + .../app/test/JobExecutorControllerTest.java | 22 +++++++ 3 files changed, 59 insertions(+), 23 deletions(-) diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index fbcf0190a..b8ca731ea 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -30,7 +30,6 @@ import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; -import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.client.SeaTunnelClient; import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment; import org.apache.seatunnel.engine.client.job.ClientJobProxy; @@ -39,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder; import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.springframework.stereotype.Service; @@ -52,6 +52,7 @@ import java.io.File; import java.io.FileWriter; import java.io.IOException; +import java.util.Date; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -73,9 +74,18 @@ public Result jobExecute(Integer userId, Long jobDefineId, JobExecParam ex String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId); - Long jobInstanceId = - executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId()); - return Result.success(jobInstanceId); + try { + executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId()); + return Result.success(executeResource.getJobInstanceId()); + } catch (RuntimeException e) { + Result failure = + Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage()); + // Even though job execution submission failed, we still need to return the + // jobInstanceId to the user + // as the job instance has been created in the database. + failure.setData(executeResource.getJobInstanceId()); + return failure; + } } public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { @@ -101,35 +111,38 @@ public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) { return filePath; } - public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { + private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) { Common.setDeployMode(DeployMode.CLIENT); JobConfig jobConfig = new JobConfig(); jobConfig.setName(jobInstanceId + "_job"); - SeaTunnelClient seaTunnelClient = createSeaTunnelClient(); + SeaTunnelClient seaTunnelClient; + ClientJobProxy clientJobProxy; try { + seaTunnelClient = createSeaTunnelClient(); SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build(); ClientJobExecutionEnvironment jobExecutionEnv = seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); - final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + clientJobProxy = jobExecutionEnv.execute(); + } catch (Throwable e) { + log.error("Job execution submission failed.", e); JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); - jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); + jobInstance.setJobStatus(JobStatus.FAILED.name()); + jobInstance.setEndTime(new Date()); jobInstanceDao.update(jobInstance); - - CompletableFuture.runAsync( - () -> { - waitJobFinish( - clientJobProxy, - userId, - jobInstanceId, - Long.toString(clientJobProxy.getJobId()), - seaTunnelClient); - }); - - } catch (ExecutionException | InterruptedException e) { - ExceptionUtils.getMessage(e); - throw new RuntimeException(e); + throw new RuntimeException(e.getMessage(), e); } - return jobInstanceId; + JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); + jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId())); + jobInstanceDao.update(jobInstance); + CompletableFuture.runAsync( + () -> { + waitJobFinish( + clientJobProxy, + userId, + jobInstanceId, + Long.toString(clientJobProxy.getJobId()), + seaTunnelClient); + }); } public void waitJobFinish( diff --git a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java index b81cec095..5f4b218e6 100644 --- a/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java +++ b/seatunnel-server/seatunnel-server-common/src/main/java/org/apache/seatunnel/server/common/SeatunnelErrorEnum.java @@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum { "load job state from engine error", "load job statue from engine [%s] error, error msg is [%s]"), UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"), + JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"), JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"), /* datasource and virtual table */ diff --git a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java index 7c942e1d0..965ad200c 100644 --- a/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java +++ b/seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/JobExecutorControllerTest.java @@ -243,6 +243,28 @@ public void restoreJob_shouldReturnSuccess_whenValidRequest() { assertTrue(result.isSuccess()); } + @Test + public void executeJob_JobStatusUpdate_WhenSubmissionFailed() { + String jobName = "execJobStatus" + uniqueId; + JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile(); + jobCreateReq.getJobConfig().setName(jobName); + jobCreateReq.getJobConfig().setDescription(jobName + " description"); + String datasourceName = "execJobStatus_db_1" + uniqueId; + String mysqlDatasourceId = + seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName); + for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) { + pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId)); + } + Result job = jobControllerWrapper.createJob(jobCreateReq); + assertTrue(job.isSuccess()); + Long jobVersionId = job.getData(); + Result result = jobExecutorControllerWrapper.jobExecutor(jobVersionId); + // Fails because of the wrong database credentials. + assertFalse(result.isSuccess()); + // Even though job failed but job instance is created into the database. + assertTrue(result.getData() > 0); + } + @AfterAll public static void tearDown() { seaTunnelWebCluster.stop();