diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index f68caffe4e3a..b85393403fb1 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -50,6 +50,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.Serializable; import java.util.Arrays; import java.util.Collection; @@ -266,12 +267,9 @@ public void notifyCheckpointComplete(long checkpointId) { @Override public void notifyCheckpointAborted(long checkpointId) { if (checkpointId == this.checkpointId) { - // write task failed we do not reuse the instant - if (Arrays.stream(eventBuffer).anyMatch(s -> s == null)) { - executor.execute(() -> { - this.ckpMetadata.abortInstant(this.instant); - }, "abort instant %s", this.instant); - } + executor.execute(() -> { + this.ckpMetadata.abortInstant(this.instant); + }, "abort instant %s", this.instant); } } @@ -308,6 +306,13 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); + if (Arrays.stream(this.eventBuffer).allMatch(event -> event == null)) { + try { + this.ckpMetadata.bootstrap(this.metaClient); + } catch (IOException e) { + throw new HoodieException("Bootstrap ckpMetadata exception", e); + } + } } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index f059c7050ca5..05859964ceda 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -152,7 +152,9 @@ public void commitInstant(String instant) { public void abortInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.ABORTED)); try { - fs.createNewFile(path); + if (fs.exists(fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)))) { + fs.createNewFile(path); + } } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint abort metadata for instant: " + instant); }