Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4741] hotfix to avoid partial failover cause restored subtask f… #6796

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import static org.apache.hudi.util.StreamerUtil.initTableIfNotExists;
Expand Down Expand Up @@ -152,6 +153,12 @@ public class StreamWriteOperatorCoordinator
*/
private CkpMetadata ckpMetadata;

/**
* Counter for the failed tasks, a number within the range (0, task_num) means
* a partial failover.
*/
private transient AtomicInteger failedCnt;

/**
* Constructs a StreamingSinkOperatorCoordinator.
*
Expand Down Expand Up @@ -294,6 +301,17 @@ public void subtaskFailed(int i, @Nullable Throwable throwable) {
// reset the event
this.eventBuffer[i] = null;
LOG.warn("Reset the event for task [" + i + "]", throwable);

// based on the fact: the #subtaskFailed in invoked before all the failed tasks scheduling,
// when a sub-task event is received, we can decide whether it recovers from a partial or complete failover,
// then to reuse the current instant(PARTIAL) or start a new one(COMPLETE).

// reset the ckp metadata for either partial or complete failover
if (this.failedCnt.get() == 0) {
this.ckpMetadata.reset();
}
// inc the failed tasks counter
this.failedCnt.incrementAndGet();
}

@Override
Expand Down Expand Up @@ -347,6 +365,14 @@ private static CkpMetadata initCkpMetadata(HoodieTableMetaClient metaClient) thr

private void reset() {
this.eventBuffer = new WriteMetadataEvent[this.parallelism];
this.failedCnt = new AtomicInteger(0);
}

/**
* Checks whether it is a PARTIAL failover.
*/
private boolean isPartialFailover() {
return this.failedCnt.get() > 0 && this.failedCnt.get() < this.parallelism;
}

/**
Expand Down Expand Up @@ -410,6 +436,16 @@ private void handleBootstrapEvent(WriteMetadataEvent event) {
if (Arrays.stream(eventBuffer).allMatch(evt -> evt != null && evt.isBootstrap())) {
// start to initialize the instant.
initInstant(event.getInstantTime());
} else if (isPartialFailover()) {
// if the bootstrap event comes from a partial failover,
// decrement the failed tasks by one.

// if all the failed task bootstrap events are received, send a start instant
// to the ckp metadata and unblock the data flushing.
if (this.failedCnt.decrementAndGet() <= 0) {
this.ckpMetadata.startInstant(this.instant);
this.failedCnt.set(0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -61,7 +62,7 @@ public class CkpMetadata implements Serializable {

private static final Logger LOG = LoggerFactory.getLogger(CkpMetadata.class);

protected static final int MAX_RETAIN_CKP_NUM = 3;
private static final int MAX_RETAIN_CKP_NUM = 3;

// the ckp metadata directory
private static final String CKP_META = "ckp_meta";
Expand Down Expand Up @@ -99,39 +100,67 @@ public void bootstrap() throws IOException {
fs.mkdirs(path);
}

/**
* Resets the message bus, would clean all the messages.
*
* <p>This expects to be called by the driver.
*/
public void reset() {
Iterator<String> itr = this.instantCache.iterator();
while (itr.hasNext()) {
cleanInstant(itr.next(), true);
itr.remove();
}
}

public void startInstant(String instant) {
Path path = fullPath(CkpMessage.getFileName(instant, CkpMessage.State.INFLIGHT));
try {
fs.createNewFile(path);
} catch (IOException e) {
throw new HoodieException("Exception while adding checkpoint start metadata for instant: " + instant, e);
}
// cache the instant
cache(instant);
// cleaning
clean(instant);
clean();
}

private void clean(String newInstant) {
private void cache(String newInstant) {
if (this.instantCache == null) {
this.instantCache = new ArrayList<>();
}
this.instantCache.add(newInstant);
}

private void clean() {
if (instantCache.size() > MAX_RETAIN_CKP_NUM) {
final String instant = instantCache.get(0);
boolean[] error = new boolean[1];
CkpMessage.getAllFileNames(instant).stream().map(this::fullPath).forEach(path -> {
try {
fs.delete(path, false);
} catch (IOException e) {
error[0] = true;
LOG.warn("Exception while cleaning the checkpoint meta file: " + path);
}
});
if (!error[0]) {
boolean success = cleanInstant(instantCache.get(0), false);
if (success) {
instantCache.remove(0);
}
}
}

private boolean cleanInstant(String instant, boolean throwsT) {
boolean success = true;
for (String fileName : CkpMessage.getAllFileNames(instant)) {
Path path = fullPath(fileName);
try {
fs.delete(path, false);
} catch (IOException ex) {
success = false;
final String errMsg = "Exception while cleaning the checkpoint meta file: " + path;
if (throwsT) {
throw new HoodieException(errMsg, ex);
} else {
LOG.warn(errMsg, ex);
}
}
}
return success;
}

/**
* Add a checkpoint commit message.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.meta.CkpMetadata;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
import org.apache.hudi.sink.utils.NonThrownExecutor;
import org.apache.hudi.util.StreamerUtil;
Expand Down Expand Up @@ -164,6 +165,14 @@ public void testCheckpointCompleteWithPartialEvents() {
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}

@Test
public void testSubTaskFailed() {
coordinator.subtaskFailed(0, null);
assertNull(coordinator.getEventBuffer()[0], "The write meta event should be cleaned");
CkpMetadata ckpMetadata = CkpMetadata.getInstance(TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()));
assertNull(ckpMetadata.lastPendingInstant(), "The pending instant should be cleaned");
}

@Test
public void testHiveSyncInvoked() throws Exception {
// reset
Expand Down