Skip to content

Commit

Permalink
[HUDI-4098] Metadata table heartbeat for instant has expired, last he…
Browse files Browse the repository at this point in the history
…artbeat 0
  • Loading branch information
danny0405 committed May 14, 2022
1 parent 701f8c0 commit 9192e47
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ protected void commit(String instantTime, Map<MetadataPartitionType, HoodieData<
// reuses the same instant time without rollback first. It is a no-op here as the
// clean plan is the same, so we don't need to delete the requested and inflight instant
// files in the active timeline.

// The metadata writer uses LAZY cleaning strategy without auto commit,
// write client then checks the heartbeat expiration when committing the instant,
// sets up the heartbeat explicitly to make the check pass.
writeClient.getHeartbeatClient().start(instantTime);
}

List<WriteStatus> statuses = preppedRecordList.size() > 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
Expand Down Expand Up @@ -253,6 +255,49 @@ void testSyncMetadataTable() throws Exception {
assertThat(completedTimeline.nthFromLastInstant(1).get().getAction(), is(HoodieTimeline.COMMIT_ACTION));
}

@Test
void testSyncMetadataTableWithReusedInstant() throws Exception {
// reset
reset();
// override the default configuration
Configuration conf = TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
conf.setBoolean(FlinkOptions.METADATA_ENABLED, true);
OperatorCoordinator.Context context = new MockOperatorCoordinatorContext(new OperatorID(), 1);
coordinator = new StreamWriteOperatorCoordinator(conf, context);
coordinator.start();
coordinator.setExecutor(new MockCoordinatorExecutor(context));

final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);

coordinator.handleEventFromOperator(0, event0);

String instant = coordinator.getInstant();
assertNotEquals("", instant);

final String metadataTableBasePath = HoodieTableMetadata.getMetadataTableBasePath(tempFile.getAbsolutePath());
HoodieTableMetaClient metadataTableMetaClient = StreamerUtil.createMetaClient(metadataTableBasePath, HadoopConfigurations.getHadoopConf(conf));
HoodieTimeline completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(1L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP));

// writes a normal commit
mockWriteWithMetadata();
instant = coordinator.getInstant();
// creates an inflight commit on the metadata timeline
metadataTableMetaClient.getActiveTimeline()
.createNewInstant(new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant));
metadataTableMetaClient.getActiveTimeline().transitionRequestedToInflight(HoodieActiveTimeline.DELTA_COMMIT_ACTION, instant);
metadataTableMetaClient.reloadActiveTimeline();

// write another commit with existing instant on the metadata timeline
instant = mockWriteWithMetadata();
metadataTableMetaClient.reloadActiveTimeline();

completedTimeline = metadataTableMetaClient.getActiveTimeline().filterCompletedInstants();
assertThat("One instant need to sync to metadata table", completedTimeline.getInstants().count(), is(3L));
assertThat(completedTimeline.lastInstant().get().getTimestamp(), is(instant));
}

// -------------------------------------------------------------------------
// Utilities
// -------------------------------------------------------------------------
Expand Down

0 comments on commit 9192e47

Please sign in to comment.