Skip to content

Commit

Permalink
add clustering condition
Browse files Browse the repository at this point in the history
  • Loading branch information
chenshzh committed Dec 9, 2022
1 parent bf9931b commit 59a5e7c
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,11 @@ public void close() {
private void bootstrap(HoodieTableMetaClient metaClient) throws IOException {
fs.delete(path, true);
fs.mkdirs(path);
// The last pending instant excluding compaction and replacecommit should start
// for recommits of the last inflight instant if the write metadata checkpoint successfully
// but was not committed due to some rare cases.
metaClient.getActiveTimeline().reload().getCommitsTimeline().filterPendingExcludingCompaction()
.filter(instant -> !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction()))
.lastInstant()
.ifPresent(instant -> startInstant(instant.getTimestamp()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,8 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) {
private void restoreWriteMetadata() throws Exception {
boolean eventSent = false;
for (WriteMetadataEvent event : this.writeMetadataState.get()) {
LOG.info("restoreWriteMetadata send event, instant {}, pending instant {}, task[{}].",
event.getInstantTime(), this.currentInstant, taskID);
if (Objects.equals(this.currentInstant, event.getInstantTime())) {
// Reset taskID for event
event.setTaskID(taskID);
Expand Down Expand Up @@ -236,6 +238,8 @@ private void reloadWriteMetaState() throws Exception {
.bootstrap(true)
.build();
this.writeMetadataState.add(event);
LOG.info("reloadWriteMetaState send event, instant {}, task[{}].",
event.getInstantTime(), taskID);
writeStatuses.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.metadata.CkpMetadata;
import org.apache.hudi.metadata.HoodieTableMetadata;
import org.apache.hudi.sink.event.WriteMetadataEvent;
import org.apache.hudi.sink.utils.MockCoordinatorExecutor;
Expand Down Expand Up @@ -165,6 +166,40 @@ public void testCheckpointCompleteWithPartialEvents() {
assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant));
}

@Test
public void testRecommitWithCheckpointCompleteException() throws Exception {
// uncompleted meta events case
final CompletableFuture<byte[]> future = new CompletableFuture<>();
final String instant = coordinator.getInstant();
coordinator.checkpointCoordinator(1, future);
// not execute notify checkpoint complete to imitate failed commit even though it checkpoints successfully
OperatorEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2);
coordinator.handleEventFromOperator(0, event1);
OperatorEvent event2 = createOperatorEvent(1, instant, "par2", false, 0.2);
coordinator.handleEventFromOperator(1, event2);

// recover from last successful checkpoint
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2);
coordinator = new StreamWriteOperatorCoordinator(
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));
// send bootstrap event based on CkpMetadata
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder()
.setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(tempFile.getAbsolutePath()).build();
CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePathV2().toString());
String lastPendingInstant = StreamerUtil.getLastPendingInstant(metaClient);
String lastPendingInstantCached = ckpMetadata.lastPendingInstant();
assertThat("Pending instant to be recommitted", instant.equals(lastPendingInstant) && instant.equals(lastPendingInstantCached));
OperatorEvent event3 = createBootstrapEvent(0, lastPendingInstantCached, "par1", false, 0.2);
OperatorEvent event4 = createBootstrapEvent(1, lastPendingInstantCached, "par2", false, 0.2);
coordinator.handleEventFromOperator(0, event3);
coordinator.handleEventFromOperator(1, event4);
metaClient.reloadActiveTimeline();
String lastCompleted = StreamerUtil.getLastCompletedInstant(metaClient);
assertThat("Recommits the instant with bootstrap events from checkpoint metadata", lastCompleted, is(instant));
}

@Test
public void testHiveSyncInvoked() throws Exception {
// reset
Expand Down Expand Up @@ -413,6 +448,33 @@ private static WriteMetadataEvent createOperatorEvent(
.build();
}

private static WriteMetadataEvent createBootstrapEvent(
int taskId,
String instant,
String partitionPath,
boolean trackSuccessRecords,
double failureFraction) {
final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction);
writeStatus.setPartitionPath(partitionPath);

HoodieWriteStat writeStat = new HoodieWriteStat();
writeStat.setPartitionPath(partitionPath);
writeStat.setFileId("fileId123");
writeStat.setPath("path123");
writeStat.setFileSizeInBytes(123);
writeStat.setTotalWriteBytes(123);
writeStat.setNumWrites(1);

writeStatus.setStat(writeStat);

return WriteMetadataEvent.builder()
.taskID(taskId)
.instantTime(instant)
.writeStatus(Collections.singletonList(writeStatus))
.bootstrap(true)
.build();
}

private void reset() throws Exception {
FileUtils.cleanDirectory(tempFile);
}
Expand Down

0 comments on commit 59a5e7c

Please sign in to comment.