diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index c87d5b2443c4..a7b3994357d2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -63,6 +63,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator */ private CkpMetadata ckpMetadata; + /** + * Counter for the failed tasks, a number within the range (0, task_num) means + * a partial failover. + */ + private transient AtomicInteger failedCnt; + /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -294,6 +301,17 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); + + // based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling, + // when a sub-task event is received, we can decide whether it recovers from a partial or complete failover, + // then to reuse the current instant(PARTIAL) or start a new one(COMPLETE). + + // reset the ckp metadata for either partial or complete failover + if (this.failedCnt.get() == 0) { + this.ckpMetadata.reset(); + } + // inc the failed tasks counter + this.failedCnt.incrementAndGet(); } @Override @@ -347,6 +365,14 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr private void reset() { this.eventBuffer = new WriteMetadataEvent[this.parallelism]; + this.failedCnt = new AtomicInteger(0); + } + + /** + * Checks whether it is a PARTIAL failover. + */ + private boolean isPartialFailover() { + return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism; } /** @@ -410,6 +436,16 @@ private void handleBootstrapEvent(WriteMetadataEvent event) { if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) { // start to initialize the instant. initInstant(event.getInstantTime()); + } else if (isPartialFailover()) { + // if the bootstrap event comes from a partial failover, + // decrement the failed tasks by one. + + // if all the failed task bootstrap events are received, send a start instant + // to the ckp metadata and unblock the data flushing. + if (this.failedCnt.decrementAndGet() <= 0) { + this.ckpMetadata.startInstant(this.instant); + this.failedCnt.set(0); + } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index 6895b2a0c63d..d0f26740d6e2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -37,6 +37,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class); - protected static final int MAX_RETAIN_CKP_NUM = 3; + private static final int MAX_RETAIN_CKP_NUM = 3; // the ckp metadata directory private static final String CKP_META = "ckp_meta"; @@ -99,6 +100,19 @@ public void bootstrap() throws IOException { fs.mkdirs(path); } + /** + * Resets the message bus, would clean all the messages. + * + *
This expects to be called by the driver.
+ */
+ public void reset() {
+ Iterator