Skip to content

Commit

Permalink
[Bug] [Seatunnel-web] When job execution initialization fails, the jo…
Browse files Browse the repository at this point in the history
…b execution status remains unchanged.
  • Loading branch information
arshadmohammad committed Aug 25, 2024
1 parent 5bb86f7 commit 7aa0d7b
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
Expand All @@ -39,6 +38,7 @@
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

import org.springframework.stereotype.Service;

Expand All @@ -52,6 +52,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -73,9 +74,18 @@ public Result<Long> jobExecute(Integer userId, Long jobDefineId, JobExecParam ex

String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);

Long jobInstanceId =
executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());
return Result.success(jobInstanceId);
try {
executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
// Even though job execution submission failed, we still need to return the
// jobInstanceId to the user
// as the job instance has been created in the database.
failure.setData(executeResource.getJobInstanceId());
return failure;
}
}

public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) {
Expand All @@ -101,35 +111,38 @@ public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) {
return filePath;
}

public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
Common.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
SeaTunnelClient seaTunnelClient;
ClientJobProxy clientJobProxy;
try {
seaTunnelClient = createSeaTunnelClient();
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy = jobExecutionEnv.execute();
} catch (Throwable e) {
log.error("Job execution submission failed.", e);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
jobInstanceDao.update(jobInstance);

CompletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
});

} catch (ExecutionException | InterruptedException e) {
ExceptionUtils.getMessage(e);
throw new RuntimeException(e);
throw new RuntimeException(e.getMessage(), e);
}
return jobInstanceId;
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstanceDao.update(jobInstance);
CompletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
});
}

public void waitJobFinish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"),
JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),

JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"),
/* datasource and virtual table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,28 @@ public void restoreJob_shouldReturnSuccess_whenValidRequest() {
assertTrue(result.isSuccess());
}

@Test
public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
String jobName = "execJobStatus" + uniqueId;
JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile();
jobCreateReq.getJobConfig().setName(jobName);
jobCreateReq.getJobConfig().setDescription(jobName + " description");
String datasourceName = "execJobStatus_db_1" + uniqueId;
String mysqlDatasourceId =
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
}
Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
Long jobVersionId = job.getData();
Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId);
// Fails because of the wrong database credentials.
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the database.
assertTrue(result.getData() > 0);
}

@AfterAll
public static void tearDown() {
seaTunnelWebCluster.stop();
Expand Down

0 comments on commit 7aa0d7b

Please sign in to comment.