diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 76774e9618d79..222ff78edc9fe 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -138,6 +138,11 @@ protected void commit(String instantTime, Map statuses = preppedRecordList.size() > 0 diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java index 6f8f2261d8a0f..42d69da392843 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java @@ -22,6 +22,8 @@ import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.metadata.HoodieTableMetadata; @@ -252,6 +254,49 @@ void testSyncMetadataTable() throws Exception { assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION)); } + @Test + void testSyncMetadataTableWithReusedInstant() throws Exception { + // reset + reset(); + // override the default configuration + Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath()); + conf.setBoolean(FlinkOptions.METADATA_ENABLED, true); + OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1); + coordinator = new StreamWriteOperatorCoordinator(conf, context); + coordinator.start(); + coordinator.setExecutor(new MockCoordinatorExecutor(context)); + + final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0); + + coordinator.handleEventFromOperator(0, event0); + + String instant = coordinator.getInstant(); + assertNotEquals("", instant); + + final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath()); + HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf)); + HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP)); + + // writes a normal commit + mockWriteWithMetadata(); + instant = coordinator.getInstant(); + // creates an inflight commit on the metadata timeline + metadataTableMetaClient.getActiveTimeline() + .createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant)); + metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant); + metadataTableMetaClient.reloadActiveTimeline(); + + // write another commit with existing instant on the metadata timeline + instant = mockWriteWithMetadata(); + metadataTableMetaClient.reloadActiveTimeline(); + + completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants(); + assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(3L)); + assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant)); + } + // ------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------