From 8272803ca23decb2485d7fdd67c1b2d679da3ec8 Mon Sep 17 00:00:00 2001 From: Eric Date: Fri, 12 Jan 2024 14:03:25 +0800 Subject: [PATCH 1/9] Fix job can not restore when last checkpoint failed --- .../server/checkpoint/CheckpointCoordinator.java | 14 ++++++++++++++ .../server/checkpoint/CheckpointManager.java | 2 +- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 6a88f169fc5..aa8fd623fc8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -827,6 +827,20 @@ public boolean isCompleted() { && !latestCompletedCheckpoint.isRestored(); } + public boolean isNoErrorCompleted() { + if (latestCompletedCheckpoint == null) { + return false; + } + return latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint() + && (runningJobStateIMap + .get(checkpointStateImapKey) + .equals(CheckpointCoordinatorStatus.FINISHED) + || runningJobStateIMap + .get(checkpointStateImapKey) + .equals(CheckpointCoordinatorStatus.SUSPEND)) + && !latestCompletedCheckpoint.isRestored(); + } + public boolean isEndOfSavePoint() { if (latestCompletedCheckpoint == null) { return false; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index d915cb5a79c..e2522ad5ae3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -251,7 +251,7 @@ public CompletableFuture shutdown(JobStatus jobStatus) { * the pipeline has been completed; */ public boolean isCompletedPipeline(int pipelineId) { - return getCheckpointCoordinator(pipelineId).isCompleted(); + return getCheckpointCoordinator(pipelineId).isNoErrorCompleted(); } /** From 23468f57c838d8db30b40a5ecd3c581e59c1bbce Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 13 Jan 2024 12:47:18 +0800 Subject: [PATCH 2/9] add test case --- .../seatunnel/engine/e2e/JobExecutionIT.java | 27 ++++++ .../batch_last_checkpoint_error.conf | 91 +++++++++++++++++++ .../checkpoint/CheckpointCoordinator.java | 10 +- 3 files changed, 122 insertions(+), 6 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 26d021992a9..0dc4d7ba126 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -201,4 +201,31 @@ void afterClass() { hazelcastInstance.shutdown(); } } + + @Test + public void testLastCheckpointErrorJob() throws Exception { + Common.setDeployMode(DeployMode.CLIENT); + String filePath = TestUtils.getResource("batch_last_checkpoint_error.conf"); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName("batch_last_checkpoint_error"); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT")); + SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + ClientJobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(filePath, jobConfig, SEATUNNEL_CONFIG); + + final ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + + await().atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FAILED.equals( + objectCompletableFuture.get()))); + } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf new file mode 100644 index 00000000000..b05abe2d5a3 --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf @@ -0,0 +1,91 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + parallelism = 1 + result_table_name = "fake" + schema = { + fields { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + c_row = { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + } +} + +transform { +} + +sink { + LocalFile { + path = "/tmp/hive/warehouse/test1" + field_delimiter = "\t" + row_delimiter = "\n" + partition_by = ["c_string"] + partition_dir_expression = "${k0}=${v0}/${k1}=${v1}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + save_mode = "error" + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index aa8fd623fc8..569de7f0da6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -831,13 +831,11 @@ public boolean isNoErrorCompleted() { if (latestCompletedCheckpoint == null) { return false; } + CheckpointCoordinatorStatus status = + (CheckpointCoordinatorStatus) runningJobStateIMap.get(checkpointStateImapKey); return latestCompletedCheckpoint.getCheckpointType().isFinalCheckpoint() - && (runningJobStateIMap - .get(checkpointStateImapKey) - .equals(CheckpointCoordinatorStatus.FINISHED) - || runningJobStateIMap - .get(checkpointStateImapKey) - .equals(CheckpointCoordinatorStatus.SUSPEND)) + && (status.equals(CheckpointCoordinatorStatus.FINISHED) + || status.equals(CheckpointCoordinatorStatus.SUSPEND)) && !latestCompletedCheckpoint.isRestored(); } From 48641b5224de5438d9f03748a44ea94c8abd8386 Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 13 Jan 2024 15:21:40 +0800 Subject: [PATCH 3/9] Add test case --- .../MultiTableSinkAggregatedCommitter.java | 15 +++++++++++++-- .../resources/batch_last_checkpoint_error.conf | 4 ++-- .../src/test/resources/log4j2-test.properties | 2 +- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java index 86fe23adc9f..22453efd272 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java @@ -75,6 +75,7 @@ private void initResourceManager() { @Override public List commit( List aggregatedCommitInfo) throws IOException { + List errorList = new ArrayList<>(); for (String sinkIdentifier : aggCommitters.keySet()) { SinkAggregatedCommitter sinkCommitter = aggCommitters.get(sinkIdentifier); if (sinkCommitter != null) { @@ -87,10 +88,20 @@ public List commit( .get(sinkIdentifier)) .filter(Objects::nonNull) .collect(Collectors.toList()); - sinkCommitter.commit(commitInfo); + List errCommitList = sinkCommitter.commit(commitInfo); + if (errCommitList.size() == 0) { + continue; + } + + for (int i = 0; i > errCommitList.size(); i++) { + if (errorList.size() < i + 1) { + errorList.add(i, new MultiTableAggregatedCommitInfo(new HashMap<>())); + } + errorList.get(i).getCommitInfo().put(sinkIdentifier, errCommitList.get(i)); + } } } - return new ArrayList<>(); + return errorList; } @Override diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf index b05abe2d5a3..84356210ea3 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_last_checkpoint_error.conf @@ -76,11 +76,11 @@ transform { sink { LocalFile { - path = "/tmp/hive/warehouse/test1" + path = "/hive/warehouse/test1" field_delimiter = "\t" row_delimiter = "\n" partition_by = ["c_string"] - partition_dir_expression = "${k0}=${v0}/${k1}=${v1}" + partition_dir_expression = "${k0}=${v0}" is_partition_field_write_in_file = true file_name_expression = "${transactionId}_${now}" file_format_type = "text" diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties index f0090af0248..e0706a7367c 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties @@ -22,7 +22,7 @@ rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender logger.zeta.name=org.apache.seatunnel.engine -logger.zeta.level=WARN +logger.zeta.level=INFO appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE From 0fe913f076150d65012d1edf231bf4ba8b18e332 Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 13 Jan 2024 19:44:37 +0800 Subject: [PATCH 4/9] Add test case --- .../multitablesink/MultiTableSinkAggregatedCommitter.java | 2 +- .../org/apache/seatunnel/engine/e2e/JobExecutionIT.java | 7 +++++++ .../seatunnel/engine/server/dag/physical/SubPlan.java | 4 +++- 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java index 22453efd272..625bfd6eb48 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java @@ -93,7 +93,7 @@ public List commit( continue; } - for (int i = 0; i > errCommitList.size(); i++) { + for (int i = 0; i < errCommitList.size(); i++) { if (errorList.size() < i + 1) { errorList.add(i, new MultiTableAggregatedCommitInfo(new HashMap<>())); } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index 0dc4d7ba126..d59952e58c8 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -202,6 +202,13 @@ void afterClass() { } } + @Test + public void testFor() throws Exception { + for (int i = 0; i < 100; i++) { + testLastCheckpointErrorJob(); + } + } + @Test public void testLastCheckpointErrorJob() throws Exception { Common.setDeployMode(DeployMode.CLIENT); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index c1e7f975c41..a2623d5b2ce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -560,7 +560,9 @@ public void handleCheckpointError() { log.warn( String.format( "%s checkpoint have error, cancel the pipeline", getPipelineFullName())); - this.cancelPipeline(); + if (!getPipelineState().isEndState()) { + updatePipelineState(PipelineStatus.CANCELING); + } } public void startSubPlanStateProcess() { From c1b6ae0aac1146ddd35b10a20268b5bf23b18946 Mon Sep 17 00:00:00 2001 From: Eric Date: Sat, 13 Jan 2024 19:44:37 +0800 Subject: [PATCH 5/9] Add test case --- .../MultiTableSinkAggregatedCommitter.java | 2 +- .../engine/server/CoordinatorService.java | 2 + .../engine/server/TaskExecutionService.java | 80 ++++++++++++++----- .../checkpoint/CheckpointCoordinator.java | 2 +- .../engine/server/dag/physical/SubPlan.java | 5 +- .../server/task/flow/SinkFlowLifeCycle.java | 3 + .../engine/server/master/JobMetricsTest.java | 29 ++++++- .../src/test/resources/log4j2-test.properties | 2 +- 8 files changed, 101 insertions(+), 24 deletions(-) diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java index 22453efd272..625bfd6eb48 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/multitablesink/MultiTableSinkAggregatedCommitter.java @@ -93,7 +93,7 @@ public List commit( continue; } - for (int i = 0; i > errCommitList.size(); i++) { + for (int i = 0; i < errCommitList.size(); i++) { if (errorList.size() < i + 1) { errorList.add(i, new MultiTableAggregatedCommitInfo(new HashMap<>())); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index a5028f3597a..35b8585875f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -532,6 +532,8 @@ public JobMetrics getJobMetrics(long jobId) { } JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics()); JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId); + boolean a = jobMetricsImap != null; + logger.warning("===============jobMetricsImap is null" + a); return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 482c4d6712a..31207260969 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -401,6 +401,11 @@ public PassiveCompletableFuture deployLocalTask( String.format( "Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState())); + + // update metrics + updateDoneTaskMetricsContextInImap( + getExecutionContext(taskGroup.getTaskGroupLocation())); + notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), r); }), executorService); @@ -569,28 +574,67 @@ private void updateMetricsContextInImap() { }); }); if (localMap.size() > 0) { - try { - if (!metricsImap.tryLock( - Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, TimeUnit.SECONDS)) { - logger.info("try lock failed in update metrics"); - return; - } - HashMap centralMap = - metricsImap.computeIfAbsent( - Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>()); - centralMap.putAll(localMap); - metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); - } catch (Exception e) { - logger.warning( - "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", - e); - } finally { - metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); - } + updateMetrics(metricsImap, localMap); } this.printTaskExecutionRuntimeInfo(); } + private void updateDoneTaskMetricsContextInImap(TaskGroupContext taskGroupContext) { + if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) { + logger.warning( + String.format( + "The Node is not ready yet, Node state %s,looking forward to the next " + + "scheduling", + nodeEngine.getNode().getState())); + return; + } + IMap> metricsImap = + nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); + HashMap localMap = new HashMap<>(); + taskGroupContext + .getTaskGroup() + .getTasks() + .forEach( + task -> { + // MetricsContext only exists in SeaTunnelTask + if (task instanceof SeaTunnelTask) { + SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task; + if (null != seaTunnelTask.getMetricsContext()) { + localMap.put( + seaTunnelTask.getTaskLocation(), + seaTunnelTask.getMetricsContext()); + } + } + }); + if (localMap.size() > 0) { + updateMetrics(metricsImap, localMap); + } + this.printTaskExecutionRuntimeInfo(); + } + + private void updateMetrics( + IMap> metricsImap, + HashMap localMap) { + try { + if (!metricsImap.tryLock(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, TimeUnit.SECONDS)) { + logger.info("try lock failed in update metrics"); + return; + } + HashMap centralMap = + metricsImap.computeIfAbsent( + Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>()); + centralMap.putAll(localMap); + metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); + logger.warning("Update metrics success"); + } catch (Exception e) { + logger.warning( + "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", + e); + } finally { + metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); + } + } + public void printTaskExecutionRuntimeInfo() { if (logger.isFineEnabled()) { ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService; diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 569de7f0da6..95d3495ae5b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -770,7 +770,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed LOG.error("store checkpoint states failed.", e); sneakyThrow(e); } - LOG.info( + LOG.warn( "pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index c1e7f975c41..17db2097f41 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -286,6 +286,7 @@ private void subPlanDone(PipelineStatus pipelineStatus) { jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); notifyCheckpointManagerPipelineEnd(pipelineStatus); jobMaster.releasePipelineResource(this); + log.warn("-----------------------subPlanDone--------------------"); return null; }, new RetryUtils.RetryMaterial( @@ -560,7 +561,9 @@ public void handleCheckpointError() { log.warn( String.format( "%s checkpoint have error, cancel the pipeline", getPipelineFullName())); - this.cancelPipeline(); + if (!getPipelineState().isEndState()) { + updatePipelineState(PipelineStatus.CANCELING); + } } public void startSubPlanStateProcess() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index 202e0c2e8b6..cde88b21f6a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -113,6 +113,9 @@ public SinkFlowLifeCycle( this.containAggCommitter = containAggCommitter; this.metricsContext = metricsContext; sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT); + if (sinkWriteCount.getCount() == 0) { + System.out.println("ho"); + } sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS); sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES); sinkWriteBytesPerSeconds = metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index 20de2932a10..e194a1badbf 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -97,6 +97,13 @@ public void testGetJobMetrics() throws Exception { assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0); } + @Test + public void testFor() throws InterruptedException { + for (int i = 0; i < 100; i++) { + testMetricsOnJobRestart(); + } + } + @Test public void testMetricsOnJobRestart() throws InterruptedException { @@ -114,7 +121,7 @@ public void testMetricsOnJobRestart() throws InterruptedException { Thread.sleep(10000); - log.info(coordinatorService.getJobMetrics(jobId3).toJsonString()); + log.warn(coordinatorService.getJobMetrics(jobId3).toJsonString()); // start savePoint coordinatorService.savePoint(jobId3); @@ -127,6 +134,10 @@ public void testMetricsOnJobRestart() throws InterruptedException { JobStatus.SAVEPOINT_DONE, server.getCoordinatorService().getJobStatus(jobId3))); + log.warn( + "================after save point=======" + + coordinatorService.getJobMetrics(jobId3).toJsonString()); + // restore job startJob(jobId3, "stream_fake_to_console.conf", true); await().atMost(120000, TimeUnit.MILLISECONDS) @@ -140,8 +151,22 @@ public void testMetricsOnJobRestart() throws InterruptedException { await().atMost(60000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { + Thread.sleep(2000); JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3); - log.info(jobMetrics.toJsonString()); + log.error( + "=============================================" + + jobMetrics + .get(SINK_WRITE_COUNT) + .get(0) + .value() + .toString()); + log.error( + "=============================================" + + jobMetrics + .get(SINK_WRITE_COUNT) + .get(1) + .value() + .toString()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(0).value()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(1).value()); assertTrue( diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties index 153d4d97c68..b6427facbcc 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -rootLogger.level = INFO +rootLogger.level = WARN rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender From 2b0a6ae67f48b4e65972028737a63c222c16e6ba Mon Sep 17 00:00:00 2001 From: Eric Date: Tue, 16 Jan 2024 17:08:00 +0800 Subject: [PATCH 6/9] close the zeta engine client is ci --- .../java/org/apache/seatunnel/engine/e2e/RestApiIT.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java index c3e9c558490..e64243be79d 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java @@ -62,6 +62,8 @@ public class RestApiIT { private static HazelcastInstanceImpl node2; + private static SeaTunnelClient engineClient; + private static final String jobName = "test测试"; private static final String paramJobName = "param_test测试"; @@ -80,7 +82,7 @@ void beforeClass() throws Exception { ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); clientConfig.setClusterName(testClusterName); - SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig); + engineClient = new SeaTunnelClient(clientConfig); ClientJobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig); @@ -456,6 +458,10 @@ public void testEncryptConfig() { @AfterEach void afterClass() { + if (engineClient != null) { + engineClient.close(); + } + if (node1 != null) { node1.shutdown(); } From da5dfe71a375b225bd77ff595914268db5d8a1d8 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 17 Jan 2024 15:53:41 +0800 Subject: [PATCH 7/9] format code --- .../engine/server/CoordinatorService.java | 2 - .../engine/server/TaskExecutionService.java | 80 +++++-------------- .../checkpoint/CheckpointCoordinator.java | 14 +++- .../server/checkpoint/CheckpointManager.java | 8 -- .../engine/server/master/JobMetricsTest.java | 29 +------ 5 files changed, 30 insertions(+), 103 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java index 35b8585875f..a5028f3597a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java @@ -532,8 +532,6 @@ public JobMetrics getJobMetrics(long jobId) { } JobMetrics jobMetrics = JobMetricsUtil.toJobMetrics(runningJobMaster.getCurrJobMetrics()); JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId); - boolean a = jobMetricsImap != null; - logger.warning("===============jobMetricsImap is null" + a); return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics; } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java index 31207260969..482c4d6712a 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java @@ -401,11 +401,6 @@ public PassiveCompletableFuture deployLocalTask( String.format( "Task %s complete with state %s", r.getTaskGroupLocation(), r.getExecutionState())); - - // update metrics - updateDoneTaskMetricsContextInImap( - getExecutionContext(taskGroup.getTaskGroupLocation())); - notifyTaskStatusToMaster(taskGroup.getTaskGroupLocation(), r); }), executorService); @@ -574,65 +569,26 @@ private void updateMetricsContextInImap() { }); }); if (localMap.size() > 0) { - updateMetrics(metricsImap, localMap); - } - this.printTaskExecutionRuntimeInfo(); - } - - private void updateDoneTaskMetricsContextInImap(TaskGroupContext taskGroupContext) { - if (!nodeEngine.getNode().getState().equals(NodeState.ACTIVE)) { - logger.warning( - String.format( - "The Node is not ready yet, Node state %s,looking forward to the next " - + "scheduling", - nodeEngine.getNode().getState())); - return; - } - IMap> metricsImap = - nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_METRICS); - HashMap localMap = new HashMap<>(); - taskGroupContext - .getTaskGroup() - .getTasks() - .forEach( - task -> { - // MetricsContext only exists in SeaTunnelTask - if (task instanceof SeaTunnelTask) { - SeaTunnelTask seaTunnelTask = (SeaTunnelTask) task; - if (null != seaTunnelTask.getMetricsContext()) { - localMap.put( - seaTunnelTask.getTaskLocation(), - seaTunnelTask.getMetricsContext()); - } - } - }); - if (localMap.size() > 0) { - updateMetrics(metricsImap, localMap); - } - this.printTaskExecutionRuntimeInfo(); - } - - private void updateMetrics( - IMap> metricsImap, - HashMap localMap) { - try { - if (!metricsImap.tryLock(Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, TimeUnit.SECONDS)) { - logger.info("try lock failed in update metrics"); - return; + try { + if (!metricsImap.tryLock( + Constant.IMAP_RUNNING_JOB_METRICS_KEY, 2, TimeUnit.SECONDS)) { + logger.info("try lock failed in update metrics"); + return; + } + HashMap centralMap = + metricsImap.computeIfAbsent( + Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>()); + centralMap.putAll(localMap); + metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); + } catch (Exception e) { + logger.warning( + "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", + e); + } finally { + metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); } - HashMap centralMap = - metricsImap.computeIfAbsent( - Constant.IMAP_RUNNING_JOB_METRICS_KEY, k -> new HashMap<>()); - centralMap.putAll(localMap); - metricsImap.put(Constant.IMAP_RUNNING_JOB_METRICS_KEY, centralMap); - logger.warning("Update metrics success"); - } catch (Exception e) { - logger.warning( - "The Imap acquisition failed due to the hazelcast node being offline or restarted, and will be retried next time", - e); - } finally { - metricsImap.unlock(Constant.IMAP_RUNNING_JOB_METRICS_KEY); } + this.printTaskExecutionRuntimeInfo(); } public void printTaskExecutionRuntimeInfo() { diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 95d3495ae5b..917d88480be 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -470,18 +470,24 @@ public static Map getPipelineTasks(Set pipelineSubt .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().size())); } + @SneakyThrows public PassiveCompletableFuture startSavepoint() { LOG.info(String.format("Start save point for Job (%s)", jobId)); if (!isAllTaskReady) { - CompletableFuture savepointFuture = new CompletableFuture(); + CompletableFuture savepointFuture = new CompletableFuture<>(); savepointFuture.completeExceptionally( new CheckpointException( CheckpointCloseReason.TASK_NOT_ALL_READY_WHEN_SAVEPOINT)); return new PassiveCompletableFuture<>(savepointFuture); } - CompletableFuture savepoint = - createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE); - startTriggerPendingCheckpoint(savepoint); + CompletableFuture savepoint; + synchronized (lock) { + while (pendingCounter.get() > 0) { + Thread.sleep(500); + } + savepoint = createPendingCheckpoint(Instant.now().toEpochMilli(), SAVEPOINT_TYPE); + startTriggerPendingCheckpoint(savepoint); + } PendingCheckpoint savepointPendingCheckpoint = savepoint.join(); LOG.info( String.format( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index e2522ad5ae3..0622a99bd9d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -162,14 +162,6 @@ public PassiveCompletableFuture[] triggerSavePoints() { .toArray(PassiveCompletableFuture[]::new); } - /** - * Called by the JobMaster, actually triggered by the user.
- * After the savepoint is triggered, it will cause the pipeline to stop automatically. - */ - public PassiveCompletableFuture triggerSavepoint(int pipelineId) { - return getCheckpointCoordinator(pipelineId).startSavepoint(); - } - public void reportedPipelineRunning(int pipelineId, boolean alreadyStarted) { log.info( "reported pipeline running stack: " diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index e194a1badbf..20de2932a10 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -97,13 +97,6 @@ public void testGetJobMetrics() throws Exception { assertTrue((Double) jobMetrics.get(SINK_WRITE_QPS).get(0).value() > 0); } - @Test - public void testFor() throws InterruptedException { - for (int i = 0; i < 100; i++) { - testMetricsOnJobRestart(); - } - } - @Test public void testMetricsOnJobRestart() throws InterruptedException { @@ -121,7 +114,7 @@ public void testMetricsOnJobRestart() throws InterruptedException { Thread.sleep(10000); - log.warn(coordinatorService.getJobMetrics(jobId3).toJsonString()); + log.info(coordinatorService.getJobMetrics(jobId3).toJsonString()); // start savePoint coordinatorService.savePoint(jobId3); @@ -134,10 +127,6 @@ public void testMetricsOnJobRestart() throws InterruptedException { JobStatus.SAVEPOINT_DONE, server.getCoordinatorService().getJobStatus(jobId3))); - log.warn( - "================after save point=======" - + coordinatorService.getJobMetrics(jobId3).toJsonString()); - // restore job startJob(jobId3, "stream_fake_to_console.conf", true); await().atMost(120000, TimeUnit.MILLISECONDS) @@ -151,22 +140,8 @@ public void testMetricsOnJobRestart() throws InterruptedException { await().atMost(60000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { - Thread.sleep(2000); JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3); - log.error( - "=============================================" - + jobMetrics - .get(SINK_WRITE_COUNT) - .get(0) - .value() - .toString()); - log.error( - "=============================================" - + jobMetrics - .get(SINK_WRITE_COUNT) - .get(1) - .value() - .toString()); + log.info(jobMetrics.toJsonString()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(0).value()); assertTrue(40 < (Long) jobMetrics.get(SINK_WRITE_COUNT).get(1).value()); assertTrue( From 472fab4f3dfc77589717534f8d81e96bb4b5fa38 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 17 Jan 2024 15:55:43 +0800 Subject: [PATCH 8/9] remove test case --- .../org/apache/seatunnel/engine/e2e/JobExecutionIT.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java index d59952e58c8..0dc4d7ba126 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java @@ -202,13 +202,6 @@ void afterClass() { } } - @Test - public void testFor() throws Exception { - for (int i = 0; i < 100; i++) { - testLastCheckpointErrorJob(); - } - } - @Test public void testLastCheckpointErrorJob() throws Exception { Common.setDeployMode(DeployMode.CLIENT); From cc135ab139c2ff964397c449d4dfdaa386aa8fc5 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 17 Jan 2024 16:00:39 +0800 Subject: [PATCH 9/9] remove test code --- .../src/test/resources/log4j2-test.properties | 2 +- .../engine/server/checkpoint/CheckpointCoordinator.java | 2 +- .../apache/seatunnel/engine/server/dag/physical/SubPlan.java | 1 - .../seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java | 3 --- .../src/test/resources/log4j2-test.properties | 2 +- 5 files changed, 3 insertions(+), 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties index e0706a7367c..f0090af0248 100644 --- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2-test.properties @@ -22,7 +22,7 @@ rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender logger.zeta.name=org.apache.seatunnel.engine -logger.zeta.level=INFO +logger.zeta.level=WARN appender.consoleStdout.name = consoleStdoutAppender appender.consoleStdout.type = CONSOLE diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 917d88480be..a27bed8102d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -776,7 +776,7 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed LOG.error("store checkpoint states failed.", e); sneakyThrow(e); } - LOG.warn( + LOG.info( "pending checkpoint({}/{}@{}) notify finished!", completedCheckpoint.getCheckpointId(), completedCheckpoint.getPipelineId(), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java index 17db2097f41..a2623d5b2ce 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java @@ -286,7 +286,6 @@ private void subPlanDone(PipelineStatus pipelineStatus) { jobMaster.removeMetricsContext(getPipelineLocation(), pipelineStatus); notifyCheckpointManagerPipelineEnd(pipelineStatus); jobMaster.releasePipelineResource(this); - log.warn("-----------------------subPlanDone--------------------"); return null; }, new RetryUtils.RetryMaterial( diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java index cde88b21f6a..202e0c2e8b6 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java @@ -113,9 +113,6 @@ public SinkFlowLifeCycle( this.containAggCommitter = containAggCommitter; this.metricsContext = metricsContext; sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT); - if (sinkWriteCount.getCount() == 0) { - System.out.println("ho"); - } sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS); sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES); sinkWriteBytesPerSeconds = metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties index b6427facbcc..153d4d97c68 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/log4j2-test.properties @@ -16,7 +16,7 @@ # limitations under the License. ################################################################################ -rootLogger.level = WARN +rootLogger.level = INFO rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender