Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement] [Seatunnel-web] Add support to provide reason for job failure #196

Merged
merged 1 commit into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,9 @@ Now ,let me show you how to use it.

#### Virtual Tables manage
![img.png](docs/images/VirtualImage.png)

### Upgrades
#### 1. Upgrade from 1.0.1 or before to 1.0.2 or after.
Execute the following SQL to upgrade the database:

```ALTER TABLE `t_st_job_instance` ADD COLUMN `error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL;```
Original file line number Diff line number Diff line change
Expand Up @@ -73,4 +73,7 @@ public class JobInstance {

@TableField("job_type")
private String jobType;

@TableField("error_message")
private String errorMessage;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.seatunnel.app.dal.entity.JobTask;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.engine.core.job.JobResult;

import lombok.NonNull;

Expand All @@ -40,5 +41,8 @@ String generateJobConfig(
JobExecutorRes getExecuteResource(@NonNull Long jobEngineId);

void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId);
@NonNull Integer userId,
@NonNull Long jobInstanceId,
@NonNull String jobEngineId,
JobResult jobResult);
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy;
import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory;
import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor;
import org.apache.seatunnel.app.utils.JobExecParamUtil;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
Expand All @@ -37,6 +38,7 @@
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;

Expand Down Expand Up @@ -128,6 +130,9 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst
JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId);
jobInstance.setJobStatus(JobStatus.FAILED.name());
jobInstance.setEndTime(new Date());
String jobInstanceErrorMessage =
JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage());
jobInstance.setErrorMessage(jobInstanceErrorMessage);
jobInstanceDao.update(jobInstance);
throw new RuntimeException(e.getMessage(), e);
}
Expand All @@ -152,21 +157,22 @@ public void waitJobFinish(
String jobEngineId,
SeaTunnelClient seaTunnelClient) {
ExecutorService executor = Executors.newFixedThreadPool(1);
CompletableFuture<JobStatus> future =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor);
CompletableFuture<JobResult> future =
CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2, executor);
JobResult jobResult = new JobResult(JobStatus.FAILED, "");
try {
log.info("future.get before");
JobStatus jobStatus = future.get();

jobResult = future.get();
executor.shutdown();
} catch (InterruptedException e) {
jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} catch (ExecutionException e) {
jobResult.setError(e.getMessage());
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
log.info("and jobInstanceService.complete begin");
jobInstanceService.complete(userId, jobInstanceId, jobEngineId);
jobInstanceService.complete(userId, jobInstanceId, jobEngineId, jobResult);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions;
import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes;
import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes;
import org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes;
import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant;
import org.apache.seatunnel.app.service.IDatasourceService;
import org.apache.seatunnel.app.service.IJobInstanceService;
Expand All @@ -64,7 +63,7 @@
import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.core.job.JobResult;
import org.apache.seatunnel.server.common.CodeGenerateUtils;
import org.apache.seatunnel.server.common.SeatunnelErrorEnum;
import org.apache.seatunnel.server.common.SeatunnelException;
Expand Down Expand Up @@ -361,34 +360,18 @@ public JobExecutorRes getExecuteResource(@NonNull Long jobEngineId) {

@Override
public void complete(
@NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) {
@NonNull Integer userId,
@NonNull Long jobInstanceId,
@NonNull String jobEngineId,
JobResult jobResult) {
funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId);
JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId);
jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId);

List<JobPipelineSummaryMetricsRes> status =
jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId);

String jobStatus;
Set<String> statusList =
status.stream()
.map(JobPipelineSummaryMetricsRes::getStatus)
.map(String::toUpperCase)
.collect(Collectors.toSet());
if (statusList.size() == 1 && statusList.contains("FINISHED")) {
jobStatus = JobStatus.FINISHED.name();
} else if (statusList.contains("FAILED")) {
jobStatus = JobStatus.FAILED.name();
} else if (statusList.contains("CANCELED")) {
jobStatus = JobStatus.CANCELED.name();
} else if (statusList.contains("CANCELLING")) {
jobStatus = JobStatus.CANCELING.name();
} else {
jobStatus = JobStatus.RUNNING.name();
}
jobInstance.setJobStatus(jobStatus);
jobInstance.setJobStatus(jobResult.getStatus().name());
jobInstance.setJobEngineId(jobEngineId);
jobInstance.setUpdateUserId(userId);
jobInstance.setErrorMessage(
JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError()));
jobInstanceDao.update(jobInstance);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,18 @@

public class JobExecParamUtil {

// The maximum length of the job execution error message, 4KB
private static final int ERROR_MESSAGE_MAX_LENGTH = 4096;

public static String getJobInstanceErrorMessage(String message) {
if (message == null) {
return null;
}
return message.length() > ERROR_MESSAGE_MAX_LENGTH
? message.substring(0, ERROR_MESSAGE_MAX_LENGTH)
: message;
}

public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) {
if (jobExecParam == null || jobExecParam.getEnv() == null) {
return envConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
<result column="engine_name" jdbcType="VARCHAR" property="engineName"/>
<result column="engine_version" jdbcType="VARCHAR" property="engineVersion"/>
<result column="job_engine_id" jdbcType="VARCHAR" property="jobEngineId"/>
<result column="error_message" jdbcType="VARCHAR" property="errorMessage"/>
</resultMap>
<sql id="Base_Column_List">
id
, `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id`
, `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id`,`error_message`
</sql>

<select id="queryJobInstanceListPaging" resultType="org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ CREATE TABLE t_st_job_instance (
update_time TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3),
end_time TIMESTAMP(3) DEFAULT NULL,
job_type VARCHAR(50) NOT NULL,
error_message VARCHAR(4096) DEFAULT NULL,
PRIMARY KEY (id)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ CREATE TABLE `t_st_job_instance` (
`update_time` timestamp(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),
`end_time` timestamp(3) NULL DEFAULT NULL,
`job_type` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_bin ROW_FORMAT = Dynamic;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public Result<Void> deleteSingleTask(long jobVersionId, String pluginId) {
return JSONTestUtils.parseObject(response, Result.class);
}

public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
public String createFakeSourcePlugin(String datasourceId, long jobVersionId, String rows) {
DataSourceOption tableOption = new DataSourceOption();
tableOption.setDatabases(Arrays.asList("fake_database"));
tableOption.setTables(Arrays.asList("fake_table"));
Expand All @@ -88,14 +88,26 @@ public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
.dataSourceId(Long.parseLong(datasourceId))
.sceneMode(SceneMode.SINGLE_TABLE)
.config(
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"double.fake.mode\":\"RANGE\",\"double.template\":\"\",\"rows\":\"\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
"{\"query\":\"\",\"tables_configs\":\"\",\"schema\":\"fields {\\n name = \\\"string\\\"\\n age = \\\"int\\\"\\n }\",\"string.fake.mode\":\"RANGE\",\"string.template\":\"\",\"tinyint.fake.mode\":\"RANGE\",\"tinyint.template\":\"\",\"smallint.fake.mode\":\"RANGE\",\"smallint.template\":\"\",\"int.fake.mode\":\"RANGE\",\"int.template\":\"\",\"bigint.fake.mode\":\"RANGE\",\"bigint.template\":\"\",\"float.fake.mode\":\"RANGE\",\"float.template\":\"\",\"double.fake.mode\":\"RANGE\",\"double.template\":\"\",\"rows\":\""
+ rows
+ "\",\"row.num\":5,\"split.num\":1,\"split.read-interval\":1,\"map.size\":5,\"array.size\":5,\"bytes.length\":5,\"date.year.template\":\"\",\"date.month.template\":\"\",\"date.day.template\":\"\",\"time.hour.template\":\"\",\"time.minute.template\":\"\",\"time.second.template\":\"\",\"parallelism\":1}")
.build();

Result<Void> srcResult = saveSingleTask(jobVersionId, sourcePluginConfig);
assertTrue(srcResult.isSuccess());
return sourcePluginId;
}

public String createFakeSourcePlugin(String datasourceId, long jobVersionId) {
return createFakeSourcePlugin(datasourceId, jobVersionId, "");
}

public String createFakeSourcePluginThatFails(String datasourceId, long jobVersionId) {
String rows =
"[{kind=INSERT, fields=[\"org\", 100]}, {kind=INSERT, fields=[\"apache\", 50]}, {kind=INSERT, fields=[\"seatunnel\", 25]}, {kind=INSERT, fields=[\"seatunnel-web\", 12]}, {kind=INSERT, fields=[\"etl\", 6_age_invalid_number]}]";
return createFakeSourcePlugin(datasourceId, jobVersionId, rows);
}

public String createConsoleSinkPlugin(String datasourceId, long jobVersionId) {
DataSourceOption sinkTableOption = new DataSourceOption();
sinkTableOption.setDatabases(Arrays.asList("console_fake_database"));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.app.controller;

import org.apache.seatunnel.app.common.Result;
import org.apache.seatunnel.app.common.SeatunnelWebTestingBase;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.utils.JSONTestUtils;
import org.apache.seatunnel.app.utils.PageInfo;

import com.fasterxml.jackson.core.type.TypeReference;

import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class TaskInstanceControllerWrapper extends SeatunnelWebTestingBase {

private static SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public Result<PageInfo<SeaTunnelJobInstanceDto>> getTaskInstanceList(
String jobDefineName,
String executorName,
String stateType,
String startTime,
String endTime,
String syncTaskType,
Integer pageNo,
Integer pageSize) {
String response =
sendRequest(
urlWithParam(
"task/jobMetrics?jobDefineName="
+ jobDefineName
+ "&executorName="
+ executorName
+ "&stateType="
+ stateType
+ "&startDate="
+ startTime
+ "&endDate="
+ endTime
+ "&syncTaskType="
+ syncTaskType
+ "&pageNo="
+ pageNo
+ "&pageSize="
+ pageSize));
return JSONTestUtils.parseObject(
response, new TypeReference<Result<PageInfo<SeaTunnelJobInstanceDto>>>() {});
}

public SeaTunnelJobInstanceDto getTaskInstanceList(String jobDefineName) {
String startTime =
URLEncoder.encode(
dateFormat.format(
new Date(System.currentTimeMillis() - 1000 * 60 * 60 * 24)));
String endTime =
URLEncoder.encode(
dateFormat.format(
new Date(System.currentTimeMillis() + 1000 * 60 * 60 * 24)));
String syncTaskType = "BATCH";
Integer pageNo = 1;
Integer pageSize = 10;
Result<PageInfo<SeaTunnelJobInstanceDto>> result =
getTaskInstanceList(
jobDefineName,
null,
null,
startTime,
endTime,
syncTaskType,
pageNo,
pageSize);
assertTrue(result.isSuccess());
if (result.getData().getTotalList().isEmpty()) {
return null;
}
assertEquals(1, result.getData().getTotalList().size());
return result.getData().getTotalList().get(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import org.apache.seatunnel.app.controller.JobControllerWrapper;
import org.apache.seatunnel.app.controller.JobExecutorControllerWrapper;
import org.apache.seatunnel.app.controller.SeatunnelDatasourceControllerWrapper;
import org.apache.seatunnel.app.controller.TaskInstanceControllerWrapper;
import org.apache.seatunnel.app.domain.dto.job.SeaTunnelJobInstanceDto;
import org.apache.seatunnel.app.domain.request.datasource.DatasourceReq;
import org.apache.seatunnel.app.domain.request.job.JobCreateReq;
import org.apache.seatunnel.app.domain.request.job.JobExecParam;
Expand Down Expand Up @@ -51,13 +53,15 @@ public class JobExecutorControllerTest {
private static final String uniqueId = "_" + System.currentTimeMillis();
private static SeatunnelDatasourceControllerWrapper seatunnelDatasourceControllerWrapper;
private static JobControllerWrapper jobControllerWrapper;
private static TaskInstanceControllerWrapper taskInstanceControllerWrapper;

@BeforeAll
public static void setUp() {
seaTunnelWebCluster.start();
jobExecutorControllerWrapper = new JobExecutorControllerWrapper();
seatunnelDatasourceControllerWrapper = new SeatunnelDatasourceControllerWrapper();
jobControllerWrapper = new JobControllerWrapper();
taskInstanceControllerWrapper = new TaskInstanceControllerWrapper();
}

@Test
Expand Down Expand Up @@ -274,6 +278,25 @@ public void executeJob_JobStatusUpdate_WhenSubmissionFailed() {
assertFalse(result.isSuccess());
// Even though job failed but job instance is created into the database.
assertTrue(result.getData() > 0);
SeaTunnelJobInstanceDto taskInstanceList =
taskInstanceControllerWrapper.getTaskInstanceList(jobName);
assertNotNull(taskInstanceList.getErrorMessage());
}

@Test
public void storeErrorMessageWhenJobFailed() throws InterruptedException {
String jobName = "failureCause" + uniqueId;
long jobVersionId = JobUtils.createJob(jobName, true);
Result<Long> result = jobExecutorControllerWrapper.jobExecutor(jobVersionId);
// job submitted successfully but it will fail during execution
assertTrue(result.isSuccess());
assertTrue(result.getData() > 0);
JobUtils.waitForJobCompletion(result.getData());
// extra second to let the data get updated in the database
Thread.sleep(2000);
SeaTunnelJobInstanceDto taskInstanceList =
taskInstanceControllerWrapper.getTaskInstanceList(jobName);
assertNotNull(taskInstanceList.getErrorMessage());
}

@AfterAll
Expand Down
Loading
Loading