From 6943d3305c51f5876247d6c2fd40c40d5342f2e6 Mon Sep 17 00:00:00 2001 From: Mohammad Arshad Date: Tue, 27 Aug 2024 21:47:22 +0530 Subject: [PATCH] [Improvement] [Seatunnel-web] Add support to provide reason for job failure --- README.md | 6 ++ .../seatunnel/app/dal/entity/JobInstance.java | 3 + .../app/service/IJobInstanceService.java | 6 +- .../service/impl/JobExecutorServiceImpl.java | 18 ++-- .../service/impl/JobInstanceServiceImpl.java | 33 ++---- .../seatunnel/app/utils/JobExecParamUtil.java | 12 +++ .../app/dal/mapper/JobInstanceMapper.xml | 3 +- .../resources/script/seatunnel_server_h2.sql | 1 + .../script/seatunnel_server_mysql.sql | 1 + .../controller/JobTaskControllerWrapper.java | 16 ++- .../TaskInstanceControllerWrapper.java | 100 ++++++++++++++++++ .../app/test/JobExecutorControllerTest.java | 23 ++++ .../app/test/TaskInstanceControllerTest.java | 73 +++++++++++++ .../apache/seatunnel/app/utils/JobUtils.java | 19 +++- .../src/test/resources/hazelcast.yaml | 10 +- 15 files changed, 277 insertions(+), 47 deletions(-) create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/controller/TaskInstanceControllerWrapper.java create mode 100644 seatunnel-web-it/src/test/java/org/apache/seatunnel/app/test/TaskInstanceControllerTest.java diff --git a/README.md b/README.md index 0eae9fc6b..8f5165237 100644 --- a/README.md +++ b/README.md @@ -206,3 +206,9 @@ Now ,let me show you how to use it. #### Virtual Tables manage ![img.png](docs/images/VirtualImage.png) + +### Upgrades +#### 1. Upgrade from 1.0.1 or before to 1.0.2 or after. +Execute the following SQL to upgrade the database: + +```ALTER TABLE `t_st_job_instance` ADD COLUMN `error_message` varchar(4096) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NULL DEFAULT NULL;``` diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java index edcb2f7ef..db5cbeba2 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/dal/entity/JobInstance.java @@ -73,4 +73,7 @@ public class JobInstance { @TableField("job_type") private String jobType; + + @TableField("error_message") + private String errorMessage; } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java index e6db1d844..e971452ea 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/IJobInstanceService.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.app.dal.entity.JobTask; import org.apache.seatunnel.app.domain.request.job.JobExecParam; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; +import org.apache.seatunnel.engine.core.job.JobResult; import lombok.NonNull; @@ -40,5 +41,8 @@ String generateJobConfig( JobExecutorRes getExecuteResource(@NonNull Long jobEngineId); void complete( - @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId); + @NonNull Integer userId, + @NonNull Long jobInstanceId, + @NonNull String jobEngineId, + JobResult jobResult); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java index b8ca731ea..e7490347b 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobExecutorServiceImpl.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy; import org.apache.seatunnel.app.thirdparty.metrics.EngineMetricsExtractorFactory; import org.apache.seatunnel.app.thirdparty.metrics.IEngineMetricsExtractor; +import org.apache.seatunnel.app.utils.JobExecParamUtil; import org.apache.seatunnel.common.config.Common; import org.apache.seatunnel.common.config.DeployMode; import org.apache.seatunnel.engine.client.SeaTunnelClient; @@ -37,6 +38,7 @@ import org.apache.seatunnel.engine.common.config.JobConfig; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; import org.apache.seatunnel.engine.common.config.YamlSeaTunnelConfigBuilder; +import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; @@ -128,6 +130,9 @@ private void executeJobBySeaTunnel(Integer userId, String filePath, Long jobInst JobInstance jobInstance = jobInstanceDao.getJobInstance(jobInstanceId); jobInstance.setJobStatus(JobStatus.FAILED.name()); jobInstance.setEndTime(new Date()); + String jobInstanceErrorMessage = + JobExecParamUtil.getJobInstanceErrorMessage(e.getMessage()); + jobInstance.setErrorMessage(jobInstanceErrorMessage); jobInstanceDao.update(jobInstance); throw new RuntimeException(e.getMessage(), e); } @@ -152,21 +157,22 @@ public void waitJobFinish( String jobEngineId, SeaTunnelClient seaTunnelClient) { ExecutorService executor = Executors.newFixedThreadPool(1); - CompletableFuture future = - CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete, executor); + CompletableFuture future = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobCompleteV2, executor); + JobResult jobResult = new JobResult(JobStatus.FAILED, ""); try { - log.info("future.get before"); - JobStatus jobStatus = future.get(); - + jobResult = future.get(); executor.shutdown(); } catch (InterruptedException e) { + jobResult.setError(e.getMessage()); throw new RuntimeException(e); } catch (ExecutionException e) { + jobResult.setError(e.getMessage()); throw new RuntimeException(e); } finally { seaTunnelClient.close(); log.info("and jobInstanceService.complete begin"); - jobInstanceService.complete(userId, jobInstanceId, jobEngineId); + jobInstanceService.complete(userId, jobInstanceId, jobEngineId, jobResult); } } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java index 7a05da7ef..68f7ee00f 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobInstanceServiceImpl.java @@ -52,7 +52,6 @@ import org.apache.seatunnel.app.domain.request.job.transform.TransformOptions; import org.apache.seatunnel.app.domain.response.datasource.VirtualTableDetailRes; import org.apache.seatunnel.app.domain.response.executor.JobExecutorRes; -import org.apache.seatunnel.app.domain.response.metrics.JobPipelineSummaryMetricsRes; import org.apache.seatunnel.app.permission.constants.SeatunnelFuncPermissionKeyConstant; import org.apache.seatunnel.app.service.IDatasourceService; import org.apache.seatunnel.app.service.IJobInstanceService; @@ -64,7 +63,7 @@ import org.apache.seatunnel.app.utils.SeaTunnelConfigUtil; import org.apache.seatunnel.common.constants.PluginType; import org.apache.seatunnel.common.utils.ExceptionUtils; -import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.server.common.CodeGenerateUtils; import org.apache.seatunnel.server.common.SeatunnelErrorEnum; import org.apache.seatunnel.server.common.SeatunnelException; @@ -361,34 +360,18 @@ public JobExecutorRes getExecuteResource(@NonNull Long jobEngineId) { @Override public void complete( - @NonNull Integer userId, @NonNull Long jobInstanceId, @NonNull String jobEngineId) { + @NonNull Integer userId, + @NonNull Long jobInstanceId, + @NonNull String jobEngineId, + JobResult jobResult) { funcPermissionCheck(SeatunnelFuncPermissionKeyConstant.JOB_EXECUTOR_COMPLETE, userId); JobInstance jobInstance = jobInstanceDao.getJobInstanceMapper().selectById(jobInstanceId); jobMetricsService.syncJobDataToDb(jobInstance, userId, jobEngineId); - - List status = - jobMetricsService.getJobPipelineSummaryMetrics(userId, jobInstanceId); - - String jobStatus; - Set statusList = - status.stream() - .map(JobPipelineSummaryMetricsRes::getStatus) - .map(String::toUpperCase) - .collect(Collectors.toSet()); - if (statusList.size() == 1 && statusList.contains("FINISHED")) { - jobStatus = JobStatus.FINISHED.name(); - } else if (statusList.contains("FAILED")) { - jobStatus = JobStatus.FAILED.name(); - } else if (statusList.contains("CANCELED")) { - jobStatus = JobStatus.CANCELED.name(); - } else if (statusList.contains("CANCELLING")) { - jobStatus = JobStatus.CANCELING.name(); - } else { - jobStatus = JobStatus.RUNNING.name(); - } - jobInstance.setJobStatus(jobStatus); + jobInstance.setJobStatus(jobResult.getStatus().name()); jobInstance.setJobEngineId(jobEngineId); jobInstance.setUpdateUserId(userId); + jobInstance.setErrorMessage( + JobExecParamUtil.getJobInstanceErrorMessage(jobResult.getError())); jobInstanceDao.update(jobInstance); } diff --git a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java index 5e2368492..497524df9 100644 --- a/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java +++ b/seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/utils/JobExecParamUtil.java @@ -27,6 +27,18 @@ public class JobExecParamUtil { + // The maximum length of the job execution error message, 4KB + private static final int ERROR_MESSAGE_MAX_LENGTH = 4096; + + public static String getJobInstanceErrorMessage(String message) { + if (message == null) { + return null; + } + return message.length() > ERROR_MESSAGE_MAX_LENGTH + ? message.substring(0, ERROR_MESSAGE_MAX_LENGTH) + : message; + } + public static Config updateEnvConfig(JobExecParam jobExecParam, Config envConfig) { if (jobExecParam == null || jobExecParam.getEnv() == null) { return envConfig; diff --git a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml index 60c2ae11d..ee61b70c6 100644 --- a/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml +++ b/seatunnel-server/seatunnel-app/src/main/resources/org/apache/seatunnel/app/dal/mapper/JobInstanceMapper.xml @@ -24,10 +24,11 @@ + id - , `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id` + , `job_define_id`, `job_status`, `job_config`, `engine_name`, `engine_version`, `job_engine_id`,`error_message`