diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 8db0e0f3694c..45d1b200f7ab 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -349,7 +349,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, - table.isTableServiceAction(actionType))); + table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index e3045fbece83..2f93b228f6e7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -58,6 +59,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -859,10 +861,23 @@ public final Option getMetadataWriter(String triggeri /** * Check if action type is a table service. - * @param actionType action type of interest. + * @param actionType action type of the instant + * @param instantTime instant time of the instant. * @return true if action represents a table service. false otherwise. */ - public abstract boolean isTableServiceAction(String actionType); + public boolean isTableServiceAction(String actionType, String instantTime) { + if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime)); + // only clustering is table service with replace commit action + return instantPlan.isPresent(); + } else { + if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { + return !actionType.equals(HoodieTimeline.COMMIT_ACTION); + } else { + return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + } + } /** * Get Table metadata writer. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index f893b4ccd5c4..d2b2cef2f604 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -58,7 +58,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { table.getMetadataWriter(instantTime).ifPresent(w -> w.update( - metadata, instantTime, table.isTableServiceAction(actionType))); + metadata, instantTime, table.isTableServiceAction(actionType, instantTime))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index d3578a8cffa3..98f58db66cde 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -290,7 +290,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String // the schema expects to be immutable for SQL jobs but may be not for non-SQL // jobs. this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime)); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 2608161ab095..cb38cb876dae 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -93,11 +92,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index d69534c0611e..b4595439cc51 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -59,11 +58,6 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index c98d30adb785..efa96e2eddba 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index f5bc09b4d1cc..2f5f29108731 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -44,11 +43,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index c200abee5e74..a73c92c383f3 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -483,7 +483,7 @@ private void validateClusteringCommit(HoodieWriteMetadata> private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp()); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. table.getMetadataWriter(hoodieInstant.getTimestamp()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 115aea06f2a2..df33c4458f52 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index bf88aa2690f2..67194aa06e0f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -84,11 +84,6 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 76b51e6970b6..72aba1f16381 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @Test + public void testArchiveTableWithReplaceCommits() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2); + for (int i = 1; i < 7; i++) { + if (i < 3) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), + Arrays.asList("p1", "p2"), 2); + } else { + testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (i == 6) { + // after all rounds, only 3 should be left in active timeline. 4,5,6 + assertEquals(originalCommits, commitsAfterArchival); + assertEquals(3, originalCommits.size()); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java index 0115742e07a0..94ae26254995 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java @@ -67,7 +67,7 @@ public enum State { // Committed instant COMPLETED, // Invalid instant - INVALID + NIL } private State state = State.COMPLETED; diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java index f9792d2c4b98..69b43d001585 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/timeline/TestHoodieActiveTimeline.java @@ -663,7 +663,7 @@ private List getAllInstants() { List allInstants = new ArrayList<>(); long instantTime = 1; for (State state : State.values()) { - if (state == State.INVALID) { + if (state == State.NIL) { continue; } for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {