diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 075c1038a9a0a..b4958f5692db4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -348,7 +348,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, - table.isTableServiceAction(actionType))); + table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index ffd8e0c79710c..35eb0edfbfc61 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -58,6 +59,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -823,10 +825,23 @@ public final Option getMetadataWriter(String triggeri /** * Check if action type is a table service. - * @param actionType action type of interest. + * @param actionType action type of the instant + * @param instantTime instant time of the instant. * @return true if action represents a table service. false otherwise. */ - public abstract boolean isTableServiceAction(String actionType); + public boolean isTableServiceAction(String actionType, String instantTime) { + if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime)); + // only clustering is table service with replace commit action + return instantPlan.isPresent(); + } else { + if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } else { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + } + } /** * Get Table metadata writer. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index f893b4ccd5c4e..d2b2cef2f604b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -58,7 +58,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { table.getMetadataWriter(instantTime).ifPresent(w -> w.update( - metadata, instantTime, table.isTableServiceAction(actionType))); + metadata, instantTime, table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index 12081e6bf1caa..c26368d14b3cf 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -293,7 +293,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String // the schema expects to be immutable for SQL jobs but may be not for non-SQL // jobs. this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); } finally { this.txnManager.getLockManager().unlock(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 2c8a3c4e49c3a..543751a041078 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -93,11 +92,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index aa8adde7353c9..9b7d3447177eb 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -58,11 +57,6 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 88921334980ed..342c018e5a269 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index 32d30f704ecbb..5af29502a95cd 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -43,11 +42,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 7110e26bb068f..68a9ce32b5a90 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -425,7 +425,7 @@ private void validateClusteringCommit(HoodieWriteMetadata> private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. table.getMetadataWriter(hoodieInstant.getTimestamp()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index a88ca65c35a94..743aff51a1254 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index efc667af297be..c9d7424631916 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -79,11 +79,6 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 0af05b2d6b0de..bc7de2f175a44 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @Test + public void testArchiveTableWithReplaceCommits() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2); + for (int i = 1; i < 7; i++) { + if (i < 3) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), + Arrays.asList("p1", "p2"), 2); + } else { + testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (i == 6) { + // after all rounds, only 3 should be left in active timeline. 4,5,6 + assertEquals(originalCommits, commitsAfterArchival); + assertEquals(3, originalCommits.size()); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 8b1cb875c09f6..bd29e2d6a2f94 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -68,7 +68,7 @@ public enum State { // Committed instant COMPLETED, // Invalid instant - INVALID + NIL } private State state = State.COMPLETED; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index 1c8d5ece242da..182dd086789d0 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -657,7 +657,7 @@ private List getAllInstants() { List allInstants = new ArrayList<>(); long instantTime = 1; for (State state : State.values()) { - if (state == State.INVALID) { + if (state == State.NIL) { continue; } for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {