Skip to content

Commit

Permalink
[Bug] [Seatunnel-web] When job execution initialization fails, the jo…
Browse files Browse the repository at this point in the history
…b execution status remains unchanged.
  • Loading branch information
arshadmohammad committed Aug 24, 2024
1 parent 39db793 commit c6e3cbb
Show file tree
Hide file tree
Showing 6 changed files with 206 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobExecutionEnvironment;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
Expand All @@ -38,6 +37,7 @@
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

import org.springframework.stereotype.Service;

Expand All @@ -51,6 +51,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -72,9 +73,18 @@ public Result<Long> jobExecute(Integer userId, Long jobDefineId) {

String configFile = writeJobConfigIntoConfFile(jobConfig, jobDefineId);

Long jobInstanceId =
executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());
return Result.success(jobInstanceId);
try {
executeJobBySeaTunnel(userId, configFile, executeResource.getJobInstanceId());
return Result.success(executeResource.getJobInstanceId());
} catch (RuntimeException e) {
Result<Long> failure =
Result.failure(SeatunnelErrorEnum.JUB_EXEC_SUBMISSION_ERROR, e.getMessage());
// Even though job execution submission failed, we still need to return the
// jobInstanceId to the user
// as the job instance has been created in the database.
failure.setData(executeResource.getJobInstanceId());
return failure;
}
}

public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) {
Expand All @@ -100,35 +110,38 @@ public String writeJobConfigIntoConfFile(String jobConfig, Long jobDefineId) {
return filePath;
}

public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {
Common.setDeployMode(DeployMode.CLIENT);
JobConfig jobConfig = new JobConfig();
jobConfig.setName(jobInstanceId + "_job");
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
SeaTunnelClient seaTunnelClient;
ClientJobProxy clientJobProxy;
try {
seaTunnelClient = createSeaTunnelClient();
SeaTunnelConfig seaTunnelConfig = new YamlSeaTunnelConfigBuilder().build();
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);
final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy = jobExecutionEnv.execute();
} catch (Throwable e) {
log.error("Job execution submission failed.", e);
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
jobInstanceDao.update(jobInstance);

CompletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
});

} catch (ExecutionException | InterruptedException e) {
ExceptionUtils.getMessage(e);
throw new RuntimeException(e);
throw new RuntimeException(e.getMessage(), e);
}
return jobInstanceId;
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));
jobInstanceDao.update(jobInstance);
CompletableFuture.runAsync(
() -> {
waitJobFinish(
clientJobProxy,
userId,
jobInstanceId,
Long.toString(clientJobProxy.getJobId()),
seaTunnelClient);
});
}

public void waitJobFinish(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public enum SeatunnelErrorEnum {
"load job state from engine error",
"load job statue from engine [%s] error, error msg is [%s]"),
UNSUPPORTED_ENGINE(40003, "unsupported engine", "unsupported engine [%s] version [%s]"),
JUB_EXEC_SUBMISSION_ERROR(40004, "Job execution submission error.", "%s"),

JOB_RUN_GENERATE_UUID_ERROR(50001, "generate uuid error", "generate uuid error"),
/* datasource and virtual table */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public String createConsoleDatasource(String datasourceName) {
return result.getData();
}

public String createMysqlDatasource(String datasourceName) {
DatasourceReq req = getMysqlDatasource(datasourceName);
Result<String> result = createDatasource(req);
assertTrue(result.isSuccess());
return result.getData();
}

public DatasourceReq getFakeSourceDatasourceReq(String datasourceName) {
DatasourceReq req = new DatasourceReq();
req.setDatasourceName(datasourceName);
Expand Down Expand Up @@ -104,4 +111,14 @@ public Result<PageInfo<DatasourceRes>> getDatasourceList(
return JSONTestUtils.parseObject(
response, new TypeReference<Result<PageInfo<DatasourceRes>>>() {});
}

public DatasourceReq getMysqlDatasource(String datasourceName) {
DatasourceReq req = new DatasourceReq();
req.setDatasourceName(datasourceName);
req.setPluginName("JDBC-Mysql");
req.setDescription(datasourceName + " description");
req.setDatasourceConfig(
"{\"url\":\"jdbc:mysql://localhost:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&allowPublicKeyRetrieval=true\",\"driver\":\"com.mysql.cj.jdbc.Driver\",\"user\":\"someUser\",\"password\":\"somePassword\"}");
return req;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeaTunnelWebCluster;
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.PluginConfig;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;
import org.apache.seatunnel.app.utils.JobUtils;

Expand All @@ -29,17 +33,22 @@
import java.util.List;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class JobExecutorControllerTest {
private static final SeaTunnelWebCluster seaTunnelWebCluster = new SeaTunnelWebCluster();
private static JobExecutorControllerWrapper jobExecutorControllerWrapper;
private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
private static final String uniqueId = "_" + System.currentTimeMillis();

@BeforeAll
public static void setUp() {
seaTunnelWebCluster.start();
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
}

@Test
Expand All @@ -57,6 +66,28 @@ public void executeJob_shouldReturnSuccess_whenValidRequest() {
assertEquals(5, listResult.getData().get(0).getWriteRowCount());
}

@Test
public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
String jobName = "execJobStatus" + uniqueId;
JobCreateReq jobCreateReq = JobUtils.populateMySQLJobCreateReqFromFile();
jobCreateReq.getJobConfig().setName(jobName);
jobCreateReq.getJobConfig().setDescription(jobName + " description");
String datasourceName = "execJobStatus_db_1" + uniqueId;
String mysqlDatasourceId =
seatunnelDatasourceControllerWrapper.createMysqlDatasource(datasourceName);
for (PluginConfig pluginConfig : jobCreateReq.getPluginConfigs()) {
pluginConfig.setDataSourceId(Long.parseLong(mysqlDatasourceId));
}
Result<Long> job = jobControllerWrapper.createJob(jobCreateReq);
assertTrue(job.isSuccess());
Long jobVersionId = job.getData();
Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId);
// Fails because of the wrong database credentials.
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the database.
assertTrue(result.getData() > 0);
}

@Test
public void restoreJob_shouldReturnSuccess_whenValidRequest() {
String jobName = "jobRestore" + uniqueId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.domain.request.job.Edge;
import org.apache.seatunnel.app.domain.request.job.JobConfig;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobDAG;
import org.apache.seatunnel.app.domain.response.job.JobTaskCheckRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineDetailMetricsRes;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -138,4 +142,15 @@ public static Long createJob(String jobName) {
assertTrue(jobTaskCheckResResult.isSuccess());
return jobVersionId;
}

public static JobCreateReq populateMySQLJobCreateReqFromFile() {
String filePath = "src/test/resources/jobs/mysql_source_mysql_sink.json";
String jsonContent;
try {
jsonContent = new String(Files.readAllBytes(Paths.get(filePath)));
} catch (IOException e) {
throw new RuntimeException(e);
}
return JSONTestUtils.parseObject(jsonContent, JobCreateReq.class);
}
}
106 changes: 106 additions & 0 deletions seatunnel-web-it/src/test/resources/jobs/mysql_source_mysql_sink.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
{
"jobConfig": {
"name": "mysql_source_mysql_sink",
"description": "mysql_source_mysql_sink description",
"engine": "SeaTunnel",
"env": {
"job.mode": "BATCH",
"job.name": "SeaTunnel_Job",
"jars": "",
"checkpoint.interval": "",
"checkpoint.timeout": "",
"read_limit.rows_per_second": "",
"read_limit.bytes_per_second": "",
"custom_parameters": ""
}
},
"pluginConfigs": [
{
"pluginId": "1724412762429155",
"name": "mysql_source_1",
"type": "SOURCE",
"connectorType": null,
"tableOption": {
"databases": [
"test"
],
"tables": [
"test_table"
]
},
"selectTableFields": {
"tableFields": [
"name",
"age"
],
"all": true
},
"dataSourceId": 14717667385504,
"sceneMode": "SINGLE_TABLE",
"config": "{\"query\":\"\",\"connection_check_timeout_sec\":30,\"fetch_size\":\"\",\"partition_column\":\"\",\"partition_upper_bound\":\"\",\"partition_lower_bound\":\"\",\"partition_num\":\"\",\"compatible_mode\":\"\",\"properties\":\"\",\"table_path\":\"\",\"where_condition\":\"\",\"table_list\":\"\",\"split.size\":8096,\"split.even-distribution.factor.upper-bound\":100,\"split.even-distribution.factor.lower-bound\":0.05,\"split.sample-sharding.threshold\":1000,\"split.inverse-sampling.rate\":1000,\"parallelism\":1}",
"outputSchema": [
{
"fields": [
{
"type": "LONGTEXT",
"name": "name",
"comment": "",
"primaryKey": false,
"defaultValue": null,
"nullable": false,
"properties": null,
"unSupport": false,
"outputDataType": "STRING"
},
{
"type": "INT",
"name": "age",
"comment": "",
"primaryKey": false,
"defaultValue": null,
"nullable": false,
"properties": null,
"unSupport": false,
"outputDataType": "INT"
}
],
"tableName": "test_table",
"database": "test"
}
],
"transformOptions": {}
},
{
"pluginId": "17244128298414uc",
"name": "mysql_sink_1",
"type": "SINK",
"connectorType": null,
"tableOption": {
"databases": [
"test"
],
"tables": [
"test_table"
]
},
"selectTableFields": {
"tableFields": [
"name",
"age"
],
"all": true
},
"dataSourceId": 14717667385504,
"config": "{\"query\":\"\",\"schema_save_mode\":\"CREATE_SCHEMA_WHEN_NOT_EXIST\",\"data_save_mode\":\"APPEND_DATA\",\"custom_sql\":\"\",\"connection_check_timeout_sec\":30,\"batch_size\":1000,\"is_exactly_once\":\"false\",\"xa_data_source_class_name\":\"\",\"max_commit_attempts\":3,\"transaction_timeout_sec\":-1,\"max_retries\":\"1\",\"auto_commit\":\"true\",\"support_upsert_by_query_primary_key_exist\":\"false\",\"primary_keys\":\"\",\"compatible_mode\":\"\",\"multi_table_sink_replica\":1}",
"transformOptions": {}
}
],
"jobDAG": {
"edges": [
{
"inputPluginId": "mysql_source_1",
"targetPluginId": "mysql_sink_1"
}
]
}
}

0 comments on commit c6e3cbb

Please sign in to comment.