From 59a5e7c1565bd5624c95c7ff6680ec502d8be010 Mon Sep 17 00:00:00 2001 From: chenshizhi Date: Fri, 9 Dec 2022 20:42:38 +0800 Subject: [PATCH] add clustering condition --- .../org/apache/hudi/metadata/CkpMetadata.java | 4 ++ .../common/AbstractStreamWriteFunction.java | 4 ++ .../TestStreamWriteOperatorCoordinator.java | 62 +++++++++++++++++++ 3 files changed, 70 insertions(+) diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java index aa1318899e2d1..f707378890c78 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/CkpMetadata.java @@ -108,7 +108,11 @@ public void close() { private void bootstrap(HoodieTableMetaClient metaClient) throws IOException { fs.delete(path, true); fs.mkdirs(path); + // The last pending instant excluding compaction and replacecommit should start + // for recommits of the last inflight instant if the write metadata checkpoint successfully + // but was not committed due to some rare cases. metaClient.getActiveTimeline().reload().getCommitsTimeline().filterPendingExcludingCompaction() + .filter(instant -> !HoodieTimeline.REPLACE_COMMIT_ACTION.equals(instant.getAction())) .lastInstant() .ifPresent(instant -> startInstant(instant.getTimestamp())); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java index ec2a2c465c5fa..d3490f812626d 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/common/AbstractStreamWriteFunction.java @@ -195,6 +195,8 @@ public void setOperatorEventGateway(OperatorEventGateway operatorEventGateway) { private void restoreWriteMetadata() throws Exception { boolean eventSent = false; for (WriteMetadataEvent event : this.writeMetadataState.get()) { + LOG.info("restoreWriteMetadata send event, instant {}, pending instant {}, task[{}].", + event.getInstantTime(), this.currentInstant, taskID); if (Objects.equals(this.currentInstant, event.getInstantTime())) { // Reset taskID for event event.setTaskID(taskID); @@ -236,6 +238,8 @@ private void reloadWriteMetaState() throws Exception { .bootstrap(true) .build(); this.writeMetadataState.add(event); + LOG.info("reloadWriteMetaState send event, instant {}, task[{}].", + event.getInstantTime(), taskID); writeStatuses.clear(); } diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index f2a8a49b5ebcc..a4d922ed44448 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -28,6 +28,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.configuration.HadoopConfigurations; +import org.apache.hudi.metadata.CkpMetadata; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.sink.event.WriteMetadataEvent; import org.apache.hudi.sink.utils.MockCoordinatorExecutor; @@ -165,6 +166,40 @@ public void testCheckpointCompleteWithPartialEvents() { assertThat("Commits the instant with partial events anyway", lastCompleted, is(instant)); } + @Test + public void testRecommitWithCheckpointCompleteException() throws Exception { + // uncompleted meta events case + final CompletableFuture future = new CompletableFuture<>(); + final String instant = coordinator.getInstant(); + coordinator.checkpointCoordinator(1, future); + // not execute notify checkpoint complete to imitate failed commit even though it checkpoints successfully + OperatorEvent event1 = createOperatorEvent(0, instant, "par1", false, 0.2); + coordinator.handleEventFromOperator(0, event1); + OperatorEvent event2 = createOperatorEvent(1, instant, "par2", false, 0.2); + coordinator.handleEventFromOperator(1, event2); + + // recover from last successful checkpoint + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 2); + coordinator = new StreamWriteOperatorCoordinator( + TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()), context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + // send bootstrap event based on CkpMetadata + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder() + .setConf(HadoopConfigurations.getHadoopConf(new Configuration())).setBasePath(tempFile.getAbsolutePath()).build(); + CkpMetadata ckpMetadata = CkpMetadata.getInstance(metaClient.getFs(), metaClient.getBasePathV2().toString()); + String lastPendingInstant = StreamerUtil.getLastPendingInstant(metaClient); + String lastPendingInstantCached = ckpMetadata.lastPendingInstant(); + assertThat("Pending instant to be recommitted", instant.equals(lastPendingInstant) && instant.equals(lastPendingInstantCached)); + OperatorEvent event3 = createBootstrapEvent(0, lastPendingInstantCached, "par1", false, 0.2); + OperatorEvent event4 = createBootstrapEvent(1, lastPendingInstantCached, "par2", false, 0.2); + coordinator.handleEventFromOperator(0, event3); + coordinator.handleEventFromOperator(1, event4); + metaClient.reloadActiveTimeline(); + String lastCompleted = StreamerUtil.getLastCompletedInstant(metaClient); + assertThat("Recommits the instant with bootstrap events from checkpoint metadata", lastCompleted, is(instant)); + } + @Test public void testHiveSyncInvoked() throws Exception { // reset @@ -413,6 +448,33 @@ private static WriteMetadataEvent createOperatorEvent( .build(); } + private static WriteMetadataEvent createBootstrapEvent( + int taskId, + String instant, + String partitionPath, + boolean trackSuccessRecords, + double failureFraction) { + final WriteStatus writeStatus = new WriteStatus(trackSuccessRecords, failureFraction); + writeStatus.setPartitionPath(partitionPath); + + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partitionPath); + writeStat.setFileId("fileId123"); + writeStat.setPath("path123"); + writeStat.setFileSizeInBytes(123); + writeStat.setTotalWriteBytes(123); + writeStat.setNumWrites(1); + + writeStatus.setStat(writeStat); + + return WriteMetadataEvent.builder() + .taskID(taskId) + .instantTime(instant) + .writeStatus(Collections.singletonList(writeStatus)) + .bootstrap(true) + .build(); + } + private void reset() throws Exception { FileUtils.cleanDirectory(tempFile); }