Skip to content

Commit

Permalink
Fixing isTableService for replace commits
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 7, 2022
1 parent 7a1a683 commit 40a2546
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
table.isTableServiceAction(new HoodieInstant(State.INFLIGHT, actionType, instantTime))));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -58,6 +59,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -858,10 +860,22 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri

/**
* Check if action type is a table service.
* @param actionType action type of interest.
* @param instant instant of interest
* @return true if action represents a table service. false otherwise.
*/
public abstract boolean isTableServiceAction(String actionType);
public boolean isTableServiceAction(HoodieInstant instant) {
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(getMetaClient(), instant);
// only clustering is table service with replace commit action
return instantPlan.isPresent();
} else {
if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
return !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION);
} else {
return !instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
}
}

/**
* Get Table metadata writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -58,7 +59,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(actionType)));
metadata, instantTime, table.isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime))));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime)));
} finally {
this.txnManager.getLockManager().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -94,11 +93,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -59,11 +58,6 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -44,11 +43,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant);
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio
}
}

@Test
public void testArchiveTableWithReplaceCommits() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2);
for (int i = 1; i < 7; i++) {
if (i < 3) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
} else {
testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
}
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();

if (i == 6) {
// after all rounds, only 3 should be left in active timeline. 4,5,6
assertEquals(originalCommits, commitsAfterArchival);
assertEquals(3, originalCommits.size());
}
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
Expand Down

0 comments on commit 40a2546

Please sign in to comment.