From 40a25463eef54b01538d00917b8af02d669c23c1 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Sat, 22 Oct 2022 13:10:01 -0700 Subject: [PATCH] Fixing isTableService for replace commits --- .../hudi/client/BaseHoodieWriteClient.java | 2 +- .../org/apache/hudi/table/HoodieTable.java | 18 +++++++++++++-- .../hudi/table/action/BaseActionExecutor.java | 3 ++- .../hudi/client/HoodieFlinkWriteClient.java | 2 +- .../table/HoodieFlinkCopyOnWriteTable.java | 6 ----- .../table/HoodieFlinkMergeOnReadTable.java | 6 ----- .../table/HoodieJavaCopyOnWriteTable.java | 6 ----- .../table/HoodieJavaMergeOnReadTable.java | 6 ----- .../hudi/client/SparkRDDWriteClient.java | 2 +- .../table/HoodieSparkCopyOnWriteTable.java | 5 ---- .../table/HoodieSparkMergeOnReadTable.java | 5 ---- .../hudi/io/TestHoodieTimelineArchiver.java | 23 +++++++++++++++++++ 12 files changed, 44 insertions(+), 40 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java index 4a3f6bd3112b2..041075fc16b10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java @@ -337,7 +337,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) { context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName()); table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime, - table.isTableServiceAction(actionType))); + table.isTableServiceAction(new HoodieInstant(State.INFLIGHT, actionType, instantTime)))); } /** diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java index d94d229ff527d..c9269d1889b54 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java @@ -44,6 +44,7 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -58,6 +59,7 @@ import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.table.view.TableFileSystemView.SliceView; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Functions; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; @@ -858,10 +860,22 @@ public final Option getMetadataWriter(String triggeri /** * Check if action type is a table service. - * @param actionType action type of interest. + * @param instant instant of interest * @return true if action represents a table service. false otherwise. */ - public abstract boolean isTableServiceAction(String actionType); + public boolean isTableServiceAction(HoodieInstant instant) { + if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) { + Option> instantPlan = ClusteringUtils.getClusteringPlan(getMetaClient(), instant); + // only clustering is table service with replace commit action + return instantPlan.isPresent(); + } else { + if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) { + return !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION); + } else { + return !instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION); + } + } + } /** * Get Table metadata writer. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java index f893b4ccd5c4e..bfbd533a82aa3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java @@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.HoodieTable; @@ -58,7 +59,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config, */ protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) { table.getMetadataWriter(instantTime).ifPresent(w -> w.update( - metadata, instantTime, table.isTableServiceAction(actionType))); + metadata, instantTime, table.isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime)))); } /** diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java index a00c361fde7e5..c7fda7615e769 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java @@ -287,7 +287,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String // the schema expects to be immutable for SQL jobs but may be not for non-SQL // jobs. this.metadataWriter.initTableMetadata(); - this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType)); + this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime))); } finally { this.txnManager.getLockManager().unlock(); } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java index 38e3918bd4602..ce3fa03fad8e6 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieIOException; @@ -94,11 +93,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - /** * Upsert a batch of new records into Hoodie table at the supplied instantTime. * diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java index d69534c0611ee..b4595439cc519 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkMergeOnReadTable.java @@ -28,7 +28,6 @@ import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.config.HoodieWriteConfig; @@ -59,11 +58,6 @@ public class HoodieFlinkMergeOnReadTable super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert( HoodieEngineContext context, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java index 8e72682725c3b..de908161be93d 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaCopyOnWriteTable.java @@ -38,7 +38,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieInstant; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieNotSupportedException; @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config, super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java index f5bc09b4d1cc7..2f5f291087310 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/HoodieJavaMergeOnReadTable.java @@ -26,7 +26,6 @@ import org.apache.hudi.common.model.HoodieRecordPayload; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.table.action.HoodieWriteMetadata; @@ -44,11 +43,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsertPrepped(HoodieEngineContext context, String instantTime, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index ef37dd18356b2..3c307b1a76bcb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -483,7 +483,7 @@ private void validateClusteringCommit(HoodieWriteMetadata> private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata, HoodieInstant hoodieInstant) { - boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction()); + boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant); // Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a // single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition. table.getMetadataWriter(hoodieInstant.getTimestamp()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java index 115aea06f2a2f..df33c4458f522 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkCopyOnWriteTable.java @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java index bf88aa2690f26..67194aa06e0f4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkMergeOnReadTable.java @@ -84,11 +84,6 @@ public class HoodieSparkMergeOnReadTable extends super(config, context, metaClient); } - @Override - public boolean isTableServiceAction(String actionType) { - return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION); - } - @Override public HoodieWriteMetadata> upsert(HoodieEngineContext context, String instantTime, HoodieData> records) { return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute(); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java index 0af05b2d6b0de..bc7de2f175a44 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio } } + @Test + public void testArchiveTableWithReplaceCommits() throws Exception { + HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2); + for (int i = 1; i < 7; i++) { + if (i < 3) { + testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(), + Arrays.asList("p1", "p2"), 2); + } else { + testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2); + } + // trigger archival + Pair, List> commitsList = archiveAndGetCommitsList(writeConfig); + List originalCommits = commitsList.getKey(); + List commitsAfterArchival = commitsList.getValue(); + + if (i == 6) { + // after all rounds, only 3 should be left in active timeline. 4,5,6 + assertEquals(originalCommits, commitsAfterArchival); + assertEquals(3, originalCommits.size()); + } + } + } + @ParameterizedTest @ValueSource(booleans = {true, false}) public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {