Skip to content

Commit

Permalink
[Improve][CDC] Stop snapshot readers in increment phase
Browse files Browse the repository at this point in the history
  • Loading branch information
hailin0 committed Apr 22, 2024
1 parent e310353 commit 3135bf0
Show file tree
Hide file tree
Showing 25 changed files with 326 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ public Optional<SourceSplitBase> getNext() {

@Override
public boolean waitingForCompletedSplits() {
return snapshotSplitAssigner.waitingForCompletedSplits();
return snapshotSplitAssigner.waitingForCompletedSplits()
|| incrementalSplitAssigner.waitingForAssignedSplits();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,16 @@ private void assignSplits() {
awaitingReader.remove();
LOG.debug("Assign split {} to subtask {}", sourceSplit, nextAwaiting);
} else {
// there is no available splits by now, skip assigning
break;
if (splitAssigner.waitingForCompletedSplits()) {
// there is no available splits by now, skip assigning
break;
} else {
LOG.info(
"No more splits available(snapshot/increment), signal no more splits to subtask {}",
nextAwaiting);
context.signalNoMoreSplits(nextAwaiting);
awaitingReader.remove();
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,4 +278,8 @@ public boolean completedSnapshotPhase(List<TableId> tableIds) {
return context.getAssignedSnapshotSplit().isEmpty()
&& context.getSplitCompletedOffsets().isEmpty();
}

public boolean waitingForAssignedSplits() {
return !(splitAssigned && noMoreSplits());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,13 @@ public void pollNext(Collector<T> output) throws Exception {
context.sendSplitRequest();
needSendSplitRequest.compareAndSet(true, false);
}
super.pollNext(output);

if (isNoMoreSplitsAssignment() && isNoMoreRecords()) {
log.info("Reader {} send NoMoreElement event", context.getIndexOfSubtask());
context.signalNoMoreElement();
} else {
super.pollNext(output);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt
private RecordsWithSplitIds<E> currentFetch;
protected SplitContext<T, SplitStateT> currentSplitContext;
private Collector<T> currentSplitOutput;
private boolean noMoreSplitsAssignment;
@Getter private volatile boolean noMoreSplitsAssignment;

public SourceReaderBase(
BlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Expand Down Expand Up @@ -94,10 +94,11 @@ public void pollNext(Collector<T> output) throws Exception {
if (recordsWithSplitId == null) {
if (Boundedness.BOUNDED.equals(context.getBoundedness())
&& noMoreSplitsAssignment
&& splitFetcherManager.maybeShutdownFinishedFetchers()
&& elementsQueue.isEmpty()) {
&& isNoMoreRecords()) {
context.signalNoMoreElement();
log.info("Send NoMoreElement event");
log.info(
"Reader {} into idle state, send NoMoreElement event",
context.getIndexOfSubtask());
}
return;
}
Expand Down Expand Up @@ -137,7 +138,7 @@ public void addSplits(List<SplitT> splits) {

@Override
public void handleNoMoreSplits() {
log.info("Reader received NoMoreSplits event.");
log.info("Reader {} received NoMoreSplits event.", context.getIndexOfSubtask());
noMoreSplitsAssignment = true;
}

Expand All @@ -146,9 +147,15 @@ public void handleSourceEvent(SourceEvent sourceEvent) {
log.info("Received unhandled source event: {}", sourceEvent);
}

protected boolean isNoMoreRecords() {
return splitFetcherManager.maybeShutdownFinishedFetchers()
&& elementsQueue.isEmpty()
&& currentFetch == null;
}

@Override
public void close() {
log.info("Closing Source Reader.");
log.info("Closing Source Reader {}.", context.getIndexOfSubtask());
try {
splitFetcherManager.close(options.getSourceReaderCloseTimeout());
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,46 @@
package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.engine.core.checkpoint.CheckpointType;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
import org.apache.seatunnel.engine.server.task.record.Barrier;

import com.google.common.base.Objects;
import lombok.Getter;

import java.io.Serializable;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

import static com.google.common.base.Preconditions.checkNotNull;

@Getter
public class CheckpointBarrier implements Barrier, Serializable {
private final long id;
private final long timestamp;
private final CheckpointType checkpointType;
private final Set<TaskLocation> prepareCloseTasks;
private final Set<TaskLocation> closedTasks;

public CheckpointBarrier(long id, long timestamp, CheckpointType checkpointType) {
this(id, timestamp, checkpointType, Collections.emptySet(), Collections.emptySet());
}

public CheckpointBarrier(
long id,
long timestamp,
CheckpointType checkpointType,
Set<TaskLocation> prepareCloseTasks,
Set<TaskLocation> closedTasks) {
this.id = id;
this.timestamp = timestamp;
this.checkpointType = checkNotNull(checkpointType);
}

public long getId() {
return id;
this.prepareCloseTasks = prepareCloseTasks;
this.closedTasks = closedTasks;
if (new HashSet(prepareCloseTasks).removeAll(closedTasks)) {
throw new IllegalArgumentException(
"The prepareCloseTasks collection should not contain elements of the closedTasks collection");
}
}

@Override
Expand All @@ -51,12 +70,17 @@ public boolean prepareClose() {
return checkpointType.isFinalCheckpoint();
}

public long getTimestamp() {
return timestamp;
@Override
public boolean prepareClose(TaskLocation task) {
if (prepareClose()) {
return true;
}
return prepareCloseTasks.contains(task);
}

public CheckpointType getCheckpointType() {
return checkpointType;
@Override
public Set<TaskLocation> closedTasks() {
return Collections.unmodifiableSet(closedTasks);
}

@Override
Expand All @@ -81,7 +105,8 @@ public boolean equals(Object other) {
@Override
public String toString() {
return String.format(
"CheckpointBarrier %d @ %d Options: %s", id, timestamp, checkpointType);
"CheckpointBarrier %d @ %d type: %s, prepareCloseTasks: %s, closedTasks: %s",
id, timestamp, checkpointType, prepareCloseTasks, closedTasks);
}

public boolean isAuto() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class CheckpointCoordinator {
private final CheckpointPlan plan;

private final Set<TaskLocation> readyToCloseStartingTask;
private final Set<TaskLocation> readyToCloseIdleTask;
private final Set<TaskLocation> closedIdleTask;
private final ConcurrentHashMap<Long, PendingCheckpoint> pendingCheckpoints;

private final ArrayDeque<String> completedCheckpointIds;
Expand Down Expand Up @@ -189,6 +192,8 @@ public CheckpointCoordinator(
this.pipelineTaskStatus = new ConcurrentHashMap<>();
this.checkpointIdCounter = checkpointIdCounter;
this.readyToCloseStartingTask = new CopyOnWriteArraySet<>();
this.readyToCloseIdleTask = new CopyOnWriteArraySet<>();
this.closedIdleTask = new CopyOnWriteArraySet<>();

LOG.info(
"Create CheckpointCoordinator for job({}@{}) with plan({})",
Expand Down Expand Up @@ -309,7 +314,11 @@ private void restoreTaskState(TaskLocation taskLocation) {
for (int i = tuple.f1();
i < actionState.getParallelism();
i += currentParallelism) {
states.add(actionState.getSubtaskStates().get(i));
ActionSubtaskState subtaskState =
actionState.getSubtaskStates().get(i);
if (subtaskState != null) {
states.add(subtaskState);
}
}
});
}
Expand Down Expand Up @@ -397,6 +406,46 @@ protected void readyToClose(TaskLocation taskLocation) {
}
}

protected void readyToCloseIdleTask(TaskLocation taskLocation) {
if (plan.getStartingSubtasks().contains(taskLocation)) {
throw new UnsupportedOperationException("Unsupported close starting task");
}

LOG.info("Received close idle task: {}", taskLocation);
synchronized (readyToCloseIdleTask) {
if (readyToCloseIdleTask.contains(taskLocation)
|| closedIdleTask.contains(taskLocation)) {
LOG.warn("Task {} already in readyToCloseIdleTask or closedIdleTask", taskLocation);
return;
}

List<TaskLocation> subTaskList = new ArrayList<>();
for (TaskLocation subTask : plan.getPipelineSubtasks()) {
if (subTask.getTaskGroupLocation().equals(taskLocation.getTaskGroupLocation())) {
// close all subtask in the same task group
subTaskList.add(subTask);
LOG.info("Add task {} to prepare close list", subTask.getTaskID());
}
}
if (subTaskList.size() != 2) {
throw new UnsupportedOperationException(
"Unsupported close not reader/writer task group: " + subTaskList);
}
readyToCloseIdleTask.addAll(subTaskList);
tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE);
}
}

protected void completedCloseIdleTask(TaskLocation taskLocation) {
synchronized (readyToCloseIdleTask) {
if (readyToCloseIdleTask.contains(taskLocation)) {
readyToCloseIdleTask.remove(taskLocation);
closedIdleTask.add(taskLocation);
LOG.info("Completed close idle task {}", taskLocation.getTaskID());
}
}
}

protected void restoreCoordinator(boolean alreadyStarted) {
LOG.info("received restore CheckpointCoordinator with alreadyStarted= " + alreadyStarted);
errorByPhysicalVertex = new AtomicReference<>();
Expand Down Expand Up @@ -546,14 +595,17 @@ private void startTriggerPendingCheckpoint(

// Trigger the barrier and wait for all tasks to ACK
LOG.debug("trigger checkpoint barrier {}", pendingCheckpoint.getInfo());

CompletableFuture<InvocationFuture<?>[]> completableFutureArray =
CompletableFuture.supplyAsync(
() ->
new CheckpointBarrier(
pendingCheckpoint.getCheckpointId(),
pendingCheckpoint
.getCheckpointTimestamp(),
pendingCheckpoint.getCheckpointType()),
pendingCheckpoint.getCheckpointType(),
new HashSet<>(readyToCloseIdleTask),
new HashSet<>(closedIdleTask)),
executorService)
.thenApplyAsync(this::triggerCheckpoint, executorService);

Expand Down Expand Up @@ -666,6 +718,7 @@ private CompletableFuture<PendingCheckpoint> triggerPendingCheckpoint(
private Set<Long> getNotYetAcknowledgedTasks() {
// TODO: some tasks have completed and don't need to be ack
return plan.getPipelineSubtasks().stream()
.filter(e -> !closedIdleTask.contains(e))
.map(TaskLocation::getTaskID)
.collect(Collectors.toCollection(CopyOnWriteArraySet::new));
}
Expand Down Expand Up @@ -715,6 +768,8 @@ protected void cleanPendingCheckpoint(CheckpointCloseReason closedReason) {
}
pipelineTaskStatus.clear();
readyToCloseStartingTask.clear();
readyToCloseIdleTask.clear();
closedIdleTask.clear();
pendingCounter.set(0);
schemaChanging.set(false);
scheduler.shutdownNow();
Expand Down Expand Up @@ -752,6 +807,11 @@ protected void acknowledgeTask(TaskAcknowledgeOperation ackOperation) {
pendingCheckpoint.getCheckpointType().isSavepoint()
? SubtaskStatus.SAVEPOINT_PREPARE_CLOSE
: SubtaskStatus.RUNNING);

if (ackOperation.getBarrier().getCheckpointType().isGeneralCheckpoint()
&& ackOperation.getBarrier().prepareClose(location)) {
completedCloseIdleTask(location);
}
}

public synchronized void completePendingCheckpoint(CompletedCheckpoint completedCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,15 @@ public void readyToClose(TaskLocation taskLocation) {
getCheckpointCoordinator(taskLocation).readyToClose(taskLocation);
}

/**
* Called by the {@link SourceSplitEnumeratorTask}. <br>
* used by SourceSplitEnumeratorTask to tell CheckpointCoordinator pipeline will trigger close
* barrier of idle task by SourceSplitEnumeratorTask.
*/
public void readyToCloseIdleTask(TaskLocation taskLocation) {
getCheckpointCoordinator(taskLocation).readyToCloseIdleTask(taskLocation);
}

/**
* Called by the JobMaster. <br>
* Listen to the {@link PipelineStatus} of the {@link Pipeline}, which is used to shut down the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,45 @@ public JobDAGInfo getJobDAGInfo() {
return jobDAGInfo;
}

public void releaseTaskGroupResource(
PipelineLocation pipelineLocation, TaskGroupLocation taskGroupLocation) {
Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
ownedSlotProfilesIMap.get(pipelineLocation);
if (taskGroupLocationSlotProfileMap == null) {
return;
}
SlotProfile taskGroupSlotProfile = taskGroupLocationSlotProfileMap.get(taskGroupLocation);
if (taskGroupSlotProfile == null) {
return;
}

try {
RetryUtils.retryWithException(
() -> {
LOGGER.info(
String.format("release the task %s resource", pipelineLocation));

resourceManager
.releaseResources(
jobImmutableInformation.getJobId(),
Collections.singletonList(taskGroupSlotProfile))
.join();

return null;
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(
String.format(
"release the task %s resource failed, with exception: %s ",
pipelineLocation, ExceptionUtils.getMessage(e)));
}
}

public void releasePipelineResource(SubPlan subPlan) {
try {
Map<TaskGroupLocation, SlotProfile> taskGroupLocationSlotProfileMap =
Expand Down Expand Up @@ -663,6 +702,13 @@ public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {

task.updateStateByExecutionService(
taskExecutionState);
if (taskExecutionState
.getExecutionState()
.isEndState()) {
releaseTaskGroupResource(
pipeline.getPipelineLocation(),
task.getTaskGroupLocation());
}
});
});
}
Expand Down
Loading

0 comments on commit 3135bf0

Please sign in to comment.