diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index ee1f7ae5c7e..62208d759a8 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -18,10 +18,13 @@ package org.apache.seatunnel.engine.server.checkpoint; import org.apache.seatunnel.common.utils.ExceptionUtils; +import org.apache.seatunnel.common.utils.RetryUtils; import org.apache.seatunnel.common.utils.SeaTunnelException; import org.apache.seatunnel.engine.checkpoint.storage.PipelineState; import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage; +import org.apache.seatunnel.engine.common.Constant; import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; +import org.apache.seatunnel.engine.common.utils.ExceptionUtil; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; import org.apache.seatunnel.engine.core.checkpoint.Checkpoint; import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter; @@ -41,8 +44,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.Getter; +import lombok.NonNull; import lombok.SneakyThrows; import java.time.Instant; @@ -131,6 +136,10 @@ public class CheckpointCoordinator { private AtomicReference errorByPhysicalVertex = new AtomicReference<>(); + private final IMap runningJobStateIMap; + + private final String checkpointStateImapKey; + @SneakyThrows public CheckpointCoordinator( CheckpointManager manager, @@ -140,13 +149,17 @@ public CheckpointCoordinator( CheckpointPlan plan, CheckpointIDCounter checkpointIdCounter, PipelineState pipelineState, - ExecutorService executorService) { + ExecutorService executorService, + IMap runningJobStateIMap, + boolean isStartWithSavePoint) { this.executorService = executorService; this.checkpointManager = manager; this.checkpointStorage = checkpointStorage; this.jobId = jobId; this.pipelineId = plan.getPipelineId(); + this.checkpointStateImapKey = "checkpoint_state_" + jobId + "_" + pipelineId; + this.runningJobStateIMap = runningJobStateIMap; this.plan = plan; this.coordinatorConfig = checkpointConfig; this.tolerableFailureCheckpoints = coordinatorConfig.getTolerableFailureCheckpoints(); @@ -174,6 +187,27 @@ public CheckpointCoordinator( serializer.deserialize(pipelineState.getStates(), CompletedCheckpoint.class); } this.checkpointCoordinatorFuture = new CompletableFuture(); + + // For job restore from master node active switch + CheckpointCoordinatorStatus checkpointCoordinatorStatus = + (CheckpointCoordinatorStatus) runningJobStateIMap.get(checkpointStateImapKey); + + // This is not a new job + if (isStartWithSavePoint) { + updateStatus(CheckpointCoordinatorStatus.RUNNING); + return; + } + + // If checkpointCoordinatorStatus is not null it means this CheckpointCoordinator is created + // by job restore from master node active switch + if (checkpointCoordinatorStatus != null) { + if (checkpointCoordinatorStatus.isEndState()) { + this.checkpointCoordinatorFuture.complete( + new CheckpointCoordinatorState(checkpointCoordinatorStatus, null)); + } else { + updateStatus(CheckpointCoordinatorStatus.RUNNING); + } + } } public int getPipelineId() { @@ -223,6 +257,7 @@ private void handleCoordinatorError(CheckpointCloseReason reason, Throwable e) { return; } cleanPendingCheckpoint(reason); + updateStatus(CheckpointCoordinatorStatus.FAILED); checkpointCoordinatorFuture.complete( new CheckpointCoordinatorState( CheckpointCoordinatorStatus.FAILED, errorByPhysicalVertex.get())); @@ -307,6 +342,7 @@ protected void restoreCoordinator(boolean alreadyStarted) { LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted); errorByPhysicalVertex = new AtomicReference<>(); checkpointCoordinatorFuture = new CompletableFuture<>(); + updateStatus(CheckpointCoordinatorStatus.RUNNING); cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_RESET); shutdown = false; if (alreadyStarted) { @@ -676,8 +712,15 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed latestCompletedCheckpoint = completedCheckpoint; if (isCompleted()) { cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED); - checkpointCoordinatorFuture.complete( - new CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null)); + if (latestCompletedCheckpoint.getCheckpointType().equals(SAVEPOINT_TYPE)) { + updateStatus(CheckpointCoordinatorStatus.SUSPEND); + checkpointCoordinatorFuture.complete( + new CheckpointCoordinatorState(CheckpointCoordinatorStatus.SUSPEND, null)); + } else { + updateStatus(CheckpointCoordinatorStatus.FINISHED); + checkpointCoordinatorFuture.complete( + new CheckpointCoordinatorState(CheckpointCoordinatorStatus.FINISHED, null)); + } } } @@ -716,9 +759,36 @@ public PassiveCompletableFuture cancelCheckpoint() { return new PassiveCompletableFuture<>(checkpointCoordinatorFuture); } cleanPendingCheckpoint(CheckpointCloseReason.PIPELINE_END); + updateStatus(CheckpointCoordinatorStatus.CANCELED); CheckpointCoordinatorState checkpointCoordinatorState = new CheckpointCoordinatorState(CheckpointCoordinatorStatus.CANCELED, null); checkpointCoordinatorFuture.complete(checkpointCoordinatorState); return new PassiveCompletableFuture<>(checkpointCoordinatorFuture); } + + private synchronized void updateStatus(@NonNull CheckpointCoordinatorStatus targetStatus) { + try { + RetryUtils.retryWithException( + () -> { + LOG.info( + String.format( + "Turn %s state from %s to %s", + checkpointStateImapKey, + runningJobStateIMap.get(checkpointStateImapKey), + targetStatus)); + runningJobStateIMap.set(checkpointStateImapKey, targetStatus); + return null; + }, + new RetryUtils.RetryMaterial( + Constant.OPERATION_RETRY_TIME, + true, + exception -> ExceptionUtil.isOperationNeedRetryException(exception), + Constant.OPERATION_RETRY_SLEEP)); + } catch (Exception e) { + LOG.warn( + String.format( + "Set %s state %s to IMap failed, skip do it", + checkpointStateImapKey, targetStatus)); + } + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java index 86abfea9c18..4c65e7f8800 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinatorStatus.java @@ -18,9 +18,18 @@ package org.apache.seatunnel.engine.server.checkpoint; public enum CheckpointCoordinatorStatus { + RUNNING, + FINISHED, CANCELED, - FAILED; + FAILED, + + /** for savepoint job */ + SUSPEND; + + public boolean isEndState() { + return this == FINISHED || this == CANCELED || this == FAILED || this == SUSPEND; + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index 39d25636cce..9f9649f03a1 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -42,6 +42,7 @@ import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState; import org.apache.seatunnel.engine.server.utils.NodeEngineUtil; +import com.hazelcast.map.IMap; import com.hazelcast.spi.impl.NodeEngine; import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture; import lombok.extern.slf4j.Slf4j; @@ -84,7 +85,8 @@ public CheckpointManager( JobMaster jobMaster, Map checkpointPlanMap, CheckpointConfig checkpointConfig, - ExecutorService executorService) + ExecutorService executorService, + IMap runningJobStateIMap) throws CheckpointStorageException { this.executorService = executorService; this.jobId = jobId; @@ -130,7 +132,9 @@ public CheckpointManager( plan, idCounter, pipelineState, - executorService); + executorService, + runningJobStateIMap, + isStartWithSavePoint); } catch (Exception e) { ExceptionUtil.sneakyThrow(e); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java index 20923cc2bdf..65666413548 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalVertex.java @@ -265,7 +265,7 @@ private TaskDeployState deployOnLocal(@NonNull SlotProfile slotProfile) throws E }); } - private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) throws Exception { + private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) { return deployInternal( taskGroupImmutableInformation -> { try { @@ -281,7 +281,16 @@ private TaskDeployState deployOnRemote(@NonNull SlotProfile slotProfile) throws slotProfile.getWorker()) .get(); } catch (Exception e) { - throw new RuntimeException(e); + if (getExecutionState().isEndState()) { + LOGGER.warning(ExceptionUtils.getMessage(e)); + LOGGER.warning( + String.format( + "%s deploy error, but the state is already in end state %s, skip this error", + getTaskFullName(), currExecutionState)); + return TaskDeployState.success(); + } else { + return TaskDeployState.failed(e); + } } }); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index e63a19f97fa..876ac80cc6f 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -242,7 +242,8 @@ public void initCheckPointManager() throws CheckpointStorageException { this, checkpointPlanMap, jobCheckpointConfig, - executorService); + executorService, + runningJobStateIMap); } // TODO replace it after ReadableConfig Support parse yaml format, then use only one config to diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java index ab5ad8f826f..177f17203ca 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/scheduler/PipelineBaseScheduler.java @@ -259,10 +259,6 @@ private CompletableFuture deployTask(PhysicalVertex task, SlotProfile slot task.getTaskGroupLocation(), ExecutionState.FAILED, state.getThrowableMsg())); - throw new SeaTunnelEngineException( - String.format( - "deploy task %s failed, error msg: \n%s", - task.getTaskFullName(), state.getThrowableMsg())); } } catch (Exception e) { throw new SeaTunnelEngineException(e); diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java index 5859407389c..9666bdb3588 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java @@ -341,7 +341,8 @@ public void ack(Barrier barrier) { new TaskAcknowledgeOperation( this.taskLocation, (CheckpointBarrier) barrier, - checkpointStates.get(barrier.getId()))); + checkpointStates.get(barrier.getId()))) + .join(); } } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java index e831a926975..797033f8d2e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SinkAggregatedCommitterTask.java @@ -224,7 +224,8 @@ public void triggerBarrier(Barrier barrier) throws Exception { (CheckpointBarrier) barrier, Collections.singletonList( new ActionSubtaskState( - ActionStateKey.of(sink), -1, states)))); + ActionStateKey.of(sink), -1, states)))) + .join(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java index 18ba0a78e69..b39c282f3c2 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSplitEnumeratorTask.java @@ -160,7 +160,8 @@ public void triggerBarrier(Barrier barrier) throws Exception { new ActionSubtaskState( ActionStateKey.of(source), -1, - Collections.singletonList(serialize))))); + Collections.singletonList(serialize))))) + .join(); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java index a953a118fd2..c4b7b8af7d3 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java @@ -44,6 +44,7 @@ import java.util.concurrent.CompletableFuture; import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID; +import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE; @DisabledOnOs(OS.WINDOWS) @Disabled @@ -88,7 +89,8 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException null, planMap, new CheckpointConfig(), - instance.getExecutorService("test")); + instance.getExecutorService("test"), + nodeEngine.getHazelcastInstance().getMap(IMAP_RUNNING_JOB_STATE)); Assertions.assertTrue(checkpointManager.isCompletedPipeline(1)); checkpointManager.listenPipeline(1, PipelineStatus.FINISHED); Assertions.assertNull(checkpointIdMap.get(1));