diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java index a7b3994357d2..c87d5b2443c4 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java @@ -63,7 +63,6 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists; @@ -153,12 +152,6 @@ public class StreamWriteOperatorCoordinator */ private CkpMetadata ckpMetadata; - /** - * Counter for the failed tasks, a number within the range (0, task_num) means - * a partial failover. - */ - private transient AtomicInteger failedCnt; - /** * Constructs a StreamingSinkOperatorCoordinator. * @@ -301,17 +294,6 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) { // reset the event this.eventBuffer[i] = null; LOG.warn("Reset the event for task [" + i + "]", throwable); - - // based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling, - // when a sub-task event is received, we can decide whether it recovers from a partial or complete failover, - // then to reuse the current instant(PARTIAL) or start a new one(COMPLETE). - - // reset the ckp metadata for either partial or complete failover - if (this.failedCnt.get() == 0) { - this.ckpMetadata.reset(); - } - // inc the failed tasks counter - this.failedCnt.incrementAndGet(); } @Override @@ -365,14 +347,6 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr private void reset() { this.eventBuffer = new WriteMetadataEvent[this.parallelism]; - this.failedCnt = new AtomicInteger(0); - } - - /** - * Checks whether it is a PARTIAL failover. - */ - private boolean isPartialFailover() { - return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism; } /** @@ -436,16 +410,6 @@ private void handleBootstrapEvent(WriteMetadataEvent event) { if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) { // start to initialize the instant. initInstant(event.getInstantTime()); - } else if (isPartialFailover()) { - // if the bootstrap event comes from a partial failover, - // decrement the failed tasks by one. - - // if all the failed task bootstrap events are received, send a start instant - // to the ckp metadata and unblock the data flushing. - if (this.failedCnt.decrementAndGet() <= 0) { - this.ckpMetadata.startInstant(this.instant); - this.failedCnt.set(0); - } } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java index d0f26740d6e2..6895b2a0c63d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/meta/CkpMetadata.java @@ -37,7 +37,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.stream.Collectors; @@ -62,7 +61,7 @@ public class CkpMetadata implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class); - private static final int MAX_RETAIN_CKP_NUM = 3; + protected static final int MAX_RETAIN_CKP_NUM = 3; // the ckp metadata directory private static final String CKP_META = "ckp_meta"; @@ -100,19 +99,6 @@ public void bootstrap() throws IOException { fs.mkdirs(path); } - /** - * Resets the message bus, would clean all the messages. - * - *

This expects to be called by the driver. - */ - public void reset() { - Iterator itr = this.instantCache.iterator(); - while (itr.hasNext()) { - cleanInstant(itr.next(), true); - itr.remove(); - } - } - public void startInstant(String instant) { Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT)); try { @@ -120,45 +106,30 @@ public void startInstant(String instant) { } catch (IOException e) { throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e); } - // cache the instant - cache(instant); // cleaning - clean(); + clean(instant); } - private void cache(String newInstant) { + private void clean(String newInstant) { if (this.instantCache == null) { this.instantCache = new ArrayList<>(); } this.instantCache.add(newInstant); - } - - private void clean() { if (instantCache.size() > MAX_RETAIN_CKP_NUM) { - boolean success = cleanInstant(instantCache.get(0), false); - if (success) { - instantCache.remove(0); - } - } - } - - private boolean cleanInstant(String instant, boolean throwsT) { - boolean success = true; - for (String fileName : CkpMessage.getAllFileNames(instant)) { - Path path = fullPath(fileName); - try { - fs.delete(path, false); - } catch (IOException ex) { - success = false; - final String errMsg = "Exception while cleaning the checkpoint meta file: " + path; - if (throwsT) { - throw new HoodieException(errMsg, ex); - } else { - LOG.warn(errMsg, ex); + final String instant = instantCache.get(0); + boolean[] error = new boolean[1]; + CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> { + try { + fs.delete(path, false); + } catch (IOException e) { + error[0] = true; + LOG.warn("Exception while cleaning the checkpoint meta file: " + path); } + }); + if (!error[0]) { + instantCache.remove(0); } } - return success; } /** diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 64bf8e278865..d5d35f7494f4 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -29,7 +29,6 @@ import org.apache.hudi.configuration.HadoopConfigurations; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; -import org.apache.hudi.sink.meta.CkpMetadata; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; import org.apache.hudi.sink.utils.NonThrownExecutor; import org.apache.hudi.util.StreamerUtil; @@ -165,14 +164,6 @@ public void testCheckpointCompleteWithPartialEvents() { assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } - @Test - public void testSubTaskFailed() { - coordinator.subtaskFailed(0, null); - assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned"); - CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath())); - assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned"); - } - @Test public void testHiveSyncInvoked() throws Exception { // reset