Skip to content

Commit

Permalink
do not abort instant when some write task was failed
Browse files Browse the repository at this point in the history
  • Loading branch information
wxplovecc committed Jun 24, 2022
1 parent 6099161 commit 1b1334c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -266,12 +267,9 @@ public void notifyCheckpointComplete(long checkpointId) {
@Override
public void notifyCheckpointAborted(long checkpointId) {
if (checkpointId == this.checkpointId) {
// write task failed we do not reuse the instant
if (Arrays.stream(eventBuffer).anyMatch(s -> s == null)) {
executor.execute(() -> {
this.ckpMetadata.abortInstant(this.instant);
}, "abort instant %s", this.instant);
}
executor.execute(() -> {
this.ckpMetadata.abortInstant(this.instant);
}, "abort instant %s", this.instant);
}
}

Expand Down Expand Up @@ -308,6 +306,13 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
// reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);
if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) {
try {
this.ckpMetadata.bootstrap(this.metaClient);
} catch (IOException e) {
throw new HoodieException("Bootstrap ckpMetadata exception", e);
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ public void commitInstant(String instant) {
public void abortInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED));
try {
fs.createNewFile(path);
if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) {
fs.createNewFile(path);
}
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant);
}
Expand Down

0 comments on commit 1b1334c

Please sign in to comment.