From fafb3641d2925cf80347177dacbc4fcc5ef3a0e3 Mon Sep 17 00:00:00 2001 From: xiaofu Date: Sat, 8 Jul 2023 22:53:25 +0800 Subject: [PATCH 1/7] fix:hdfs Checkpoint Storage management fails to delete historical files --- .../seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java index 9dcc94f8058..739274bb6ab 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java @@ -283,7 +283,7 @@ public synchronized void deleteCheckpoint(String jobId, String pipelineId, Strin if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointId.equals(getCheckpointIdByFileName(fileName))) { try { - fs.delete(new Path(fileName), false); + fs.delete(new Path(path+fileName), false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}", @@ -311,7 +311,7 @@ public void deleteCheckpoint(String jobId, String pipelineId, List check if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointIdList.contains(checkpointIdByFileName)) { try { - fs.delete(new Path(fileName), false); + fs.delete(new Path(path+fileName), false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}", From 5e4139a2b27bcdf5f34b2c7ba8aafe6fdb5031d6 Mon Sep 17 00:00:00 2001 From: xiaofu Date: Sat, 8 Jul 2023 23:39:38 +0800 Subject: [PATCH 2/7] fix:hdfs Checkpoint Storage management fails to delete historical files --- .../seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java index 739274bb6ab..adfcfaa76d6 100644 --- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/HdfsStorage.java @@ -283,7 +283,7 @@ public synchronized void deleteCheckpoint(String jobId, String pipelineId, Strin if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointId.equals(getCheckpointIdByFileName(fileName))) { try { - fs.delete(new Path(path+fileName), false); + fs.delete(new Path(path + fileName), false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}", @@ -311,7 +311,7 @@ public void deleteCheckpoint(String jobId, String pipelineId, List check if (pipelineId.equals(getPipelineIdByFileName(fileName)) && checkpointIdList.contains(checkpointIdByFileName)) { try { - fs.delete(new Path(path+fileName), false); + fs.delete(new Path(path + fileName), false); } catch (Exception e) { log.error( "Failed to delete checkpoint {} for job {}, pipeline {}", From 9dc3a24c7cf2fd38d496761c34e6885ed081a461 Mon Sep 17 00:00:00 2001 From: wu-a-ge Date: Sat, 8 Jul 2023 23:49:21 +0800 Subject: [PATCH 3/7] fix after the savepoint job is restored, the checkpoint file cannot be generated --- .../server/checkpoint/CheckpointCoordinator.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 3ae2658509e..14c14800c64 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -183,8 +183,19 @@ public CheckpointCoordinator( this.checkpointIdCounter = checkpointIdCounter; this.readyToCloseStartingTask = new CopyOnWriteArraySet<>(); if (pipelineState != null) { - this.latestCompletedCheckpoint = + // fix after the savepoint job is restored, the checkpoint file cannot be generated + CompletedCheckpoint tmpCheckpoint = serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class); + this.latestCompletedCheckpoint = + new CheckpointCoordinator( + tmpCheckpoint.getJobId(), + tmpCheckpoint.getPipelineId(), + tmpCheckpoint.getCheckpointId(), + tmpCheckpoint.getCheckpointTimestamp(), + CheckpointType.CHECKPOINT_TYPE, + tmpCheckpoint.getCompletedTimestamp(), + tmpCheckpoint.getTaskStates(), + tmpCheckpoint.getTaskStatistics()); } this.checkpointCoordinatorFuture = new CompletableFuture(); From 68f978395544150499bc65c735c2a4edfa133e81 Mon Sep 17 00:00:00 2001 From: wu-a-ge Date: Sat, 29 Jul 2023 21:18:09 +0800 Subject: [PATCH 4/7] [Feature][Zeta] The expiration time of a historical Job can be configured --- config/seatunnel.yaml | 1 + .../engine/common/config/EngineConfig.java | 10 ++++++++++ .../config/YamlSeaTunnelDomConfigProcessor.java | 5 +++++ .../common/config/server/ServerConfigOptions.java | 5 +++++ .../seatunnel/engine/server/CoordinatorService.java | 3 ++- .../engine/server/master/JobHistoryService.java | 13 ++++++++----- 6 files changed, 31 insertions(+), 6 deletions(-) diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml index 7e496ca39ad..7c689a328d3 100644 --- a/config/seatunnel.yaml +++ b/config/seatunnel.yaml @@ -17,6 +17,7 @@ seatunnel: engine: + history-job-expire-minutes: 1440 backup-count: 1 queue-type: blockingqueue print-execution-info-interval: 60 diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java index edc18a0b15e..e162b428bb4 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java @@ -32,6 +32,7 @@ @Data @SuppressWarnings("checkstyle:MagicNumber") public class EngineConfig { + private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue(); private int printExecutionInfoInterval = ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue(); @@ -50,6 +51,8 @@ public class EngineConfig { private CheckpointConfig checkpointConfig = ServerConfigOptions.CHECKPOINT.defaultValue(); private QueueType queueType = ServerConfigOptions.QUEUE_TYPE.defaultValue(); + private int historyJobExpireMinutes = + ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue(); public void setBackupCount(int newBackupCount) { checkBackupCount(newBackupCount, 0); @@ -82,6 +85,13 @@ public void setTaskExecutionThreadShareMode(ThreadShareMode taskExecutionThreadS this.taskExecutionThreadShareMode = taskExecutionThreadShareMode; } + public void setHistoryJobExpireMinutes(int historyJobExpireMinutes) { + checkPositive( + historyJobExpireMinutes, + ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + " must be > 0"); + this.historyJobExpireMinutes = historyJobExpireMinutes; + } + public EngineConfig setQueueType(QueueType queueType) { checkNotNull(queueType); this.queueType = queueType; diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java index 718e915a0c3..f96da8b4c4d 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java @@ -131,6 +131,11 @@ private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) { engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node)); } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) { engineConfig.setCheckpointConfig(parseCheckpointConfig(node)); + } else if (ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key().equals(name)) { + engineConfig.setHistoryJobExpireMinutes( + getIntegerValue( + ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key(), + getTextContent(node))); } else { LOGGER.warning("Unrecognized element: " + name); } diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java index b5d02c03443..dfa53f0f4c2 100644 --- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java +++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java @@ -138,4 +138,9 @@ public class ServerConfigOptions { .type(new TypeReference>() {}) .noDefaultValue() .withDescription("The checkpoint storage instance configuration."); + public static final Option HISTORY_JOB_EXPIRE_MINUTES = + Options.key("history-job-expire-minutes") + .intType() + .defaultValue(1440) + .withDescription("The expire time of history jobs.time unit minute"); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index d2931d0c37e..84df6c9ace6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -220,7 +220,8 @@ private void initCoordinatorService() { .getMap(Constant.IMAP_FINISHED_JOB_METRICS), nodeEngine .getHazelcastInstance() - .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO)); + .getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO), + engineConfig.getHistoryJobExpireMinutes()); List> collect = runningJobInfoIMap.entrySet().stream() diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java index 12dcae40ca7..164ebb51cf1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java @@ -74,20 +74,22 @@ public class JobHistoryService { * finishedJobStateImap key is jobId and value is jobState(json) JobStateData Indicates the * status of the job, pipeline, and task */ - // TODO need to limit the amount of storage private final IMap finishedJobStateImap; private final IMap finishedJobMetricsImap; private final ObjectMapper objectMapper; + private final int finishedJobExpireTime; + public JobHistoryService( IMap runningJobStateIMap, ILogger logger, Map runningJobMasterMap, IMap finishedJobStateImap, IMap finishedJobMetricsImap, - IMap finishedJobVertexInfoImap) { + IMap finishedJobVertexInfoImap, + int finishedJobExpireTime) { this.runningJobStateIMap = runningJobStateIMap; this.logger = logger; this.runningJobMasterMap = runningJobMasterMap; @@ -96,6 +98,7 @@ public JobHistoryService( this.finishedJobDAGInfoImap = finishedJobVertexInfoImap; this.objectMapper = new ObjectMapper(); this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false); + this.finishedJobExpireTime = finishedJobExpireTime; } // Gets the status of a running and completed job @@ -161,14 +164,14 @@ public String getJobDetailStateAsString(Long jobId) { public void storeFinishedJobState(JobMaster jobMaster) { JobState jobState = toJobStateMapper(jobMaster, false); jobState.setFinishTime(System.currentTimeMillis()); - finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS); + finishedJobStateImap.put(jobState.jobId, jobState, finishedJobExpireTime, TimeUnit.MINUTES); } @SuppressWarnings("checkstyle:MagicNumber") public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) { finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new HashMap<>())); JobMetrics newMetrics = finishedJobMetricsImap.get(jobId).merge(metrics); - finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS); + finishedJobMetricsImap.put(jobId, newMetrics, finishedJobExpireTime, TimeUnit.MINUTES); } private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) { @@ -227,7 +230,7 @@ private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) { } public void storeJobInfo(long jobId, JobDAGInfo jobInfo) { - finishedJobDAGInfoImap.put(jobId, jobInfo); + finishedJobDAGInfoImap.put(jobId, jobInfo, finishedJobExpireTime, TimeUnit.MINUTES); } @AllArgsConstructor From 7542853bca52812ab029f8add6ce9ae21a78168b Mon Sep 17 00:00:00 2001 From: wu-a-ge Date: Sat, 29 Jul 2023 21:59:10 +0800 Subject: [PATCH 5/7] add e2e test case and code style --- docs/en/connector-v2/sink/Doris.md | 4 +-- docs/en/connector-v2/source/Hudi.md | 2 +- .../seatunnel/engine/e2e/JobExecutionIT.java | 27 +++++++++++++++++++ .../src/test/resources/seatunnel.yaml | 1 + 4 files changed, 31 insertions(+), 3 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 506cb7f2485..6bf8dc5369c 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -32,7 +32,7 @@ Version Supported ## Sink Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | | username | String | Yes | - | `Doris` user username | @@ -49,7 +49,7 @@ Version Supported ## Data Type Mapping -| Doris Data type | SeaTunnel Data type | +| Doris Data type | SeaTunnel Data type | |-----------------|-----------------------------------------| | BOOLEAN | BOOLEAN | | TINYINT | TINYINT | diff --git a/docs/en/connector-v2/source/Hudi.md b/docs/en/connector-v2/source/Hudi.md index b70d34608ea..ffe17f7de71 100644 --- a/docs/en/connector-v2/source/Hudi.md +++ b/docs/en/connector-v2/source/Hudi.md @@ -39,7 +39,7 @@ In order to use this connector, You must ensure your spark/flink cluster already ## Source Options -| Name | Type | Required | Default | Description | +| Name | Type | Required | Default | Description | |-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | table.path | String | Yes | - | The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. | | table.type | String | Yes | - | The type of hudi table. Now we only support 'cow', 'mor' is not support yet. | diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 4609a10dc4c..f0f30cbc7ce 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.core.job.JobResult; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; @@ -129,6 +130,32 @@ public void cancelJobTest() throws Exception { objectCompletableFuture.get()))); } + @Test + public void testExpiredJobWasDeleted() throws Exception { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("batch_fakesource_to_file.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("job_expire"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(filePath, jobConfig); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + JobResult result = clientJobProxy.doWaitForJobComplete().get(); + Assertions.assertEquals(result.getStatus(), JobStatus.FINISHED); + Awaitility.await() + .atMost(65, TimeUnit.SECONDS) + .untilAsserted( + () -> + Assertions.assertThrowsExactly( + NullPointerException.class, + () -> clientJobProxy.getJobStatus())); + } + @AfterAll static void afterClass() { if (hazelcastInstance != null) { diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml index 16b9f55c30d..3897ae95031 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml @@ -17,6 +17,7 @@ seatunnel: engine: + history-job-expire-minutes: 1 backup-count: 2 queue-type: blockingqueue print-execution-info-interval: 10 From df3e01621a7b115d13296d6eb7887e01908fc26e Mon Sep 17 00:00:00 2001 From: wu-a-ge Date: Mon, 7 Aug 2023 23:34:20 +0800 Subject: [PATCH 6/7] [Docs] Historical Job expiration Config --- docs/en/seatunnel-engine/deployment.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md index c07cd45d6b1..1f8692530cd 100644 --- a/docs/en/seatunnel-engine/deployment.md +++ b/docs/en/seatunnel-engine/deployment.md @@ -103,6 +103,18 @@ seatunnel: About the checkpoint storage, you can see [checkpoint storage](checkpoint-storage.md) +### 4.4 Historical Job expiration Config + +The information about each completed Job, such as status, counters, and error logs, is stored in the IMap object. As the number of running jobs increases, the memory increases and eventually the memory will overflow. Therefore, you can adjust the history-job-expire-minutes parameter to solve this problem. The time unit of this parameter is minute. The default value is 1440 minutes, that is, one day. + +Example + +``` +seatunnel: + engine: + history-job-expire-minutes: 1440 +``` + ## 5. Config SeaTunnel Engine Server All SeaTunnel Engine Server config in `hazelcast.yaml` file. From 6a8482062a00d3ef23f9508a35366abd05cc9e88 Mon Sep 17 00:00:00 2001 From: hailin0 Date: Wed, 9 Aug 2023 22:09:05 +0800 Subject: [PATCH 7/7] fix codetyle --- .../java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 8c614f3a9c5..b645a85b4ef 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -153,7 +153,7 @@ public void testGetErrorInfo() throws ExecutionException, InterruptedException { Assertions.assertEquals(result.getStatus(), JobStatus.FAILED); Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException")); } - + @Test public void testExpiredJobWasDeleted() throws Exception { Common.setDeployMode(DeployMode.CLIENT);