Skip to content

Commit

Permalink
[HUDI-5078] Fixing isTableService for replace commits (apache#7037)
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan authored and Alexey Kudinkin committed Dec 14, 2022
1 parent 978f9ae commit 1c1843d
Show file tree
Hide file tree
Showing 14 changed files with 46 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
table.isTableServiceAction(actionType, instantTime)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -58,6 +59,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -823,10 +825,23 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri

/**
* Check if action type is a table service.
* @param actionType action type of interest.
* @param actionType action type of the instant
* @param instantTime instant time of the instant.
* @return true if action represents a table service. false otherwise.
*/
public abstract boolean isTableServiceAction(String actionType);
public boolean isTableServiceAction(String actionType, String instantTime) {
if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
// only clustering is table service with replace commit action
return instantPlan.isPresent();
} else {
if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
} else {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
}
}

/**
* Get Table metadata writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(actionType)));
metadata, instantTime, table.isTableServiceAction(actionType, instantTime)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
} finally {
this.txnManager.getLockManager().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -93,11 +92,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -58,11 +57,6 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -43,11 +42,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio
}
}

@Test
public void testArchiveTableWithReplaceCommits() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2);
for (int i = 1; i < 7; i++) {
if (i < 3) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
} else {
testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
}
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();

if (i == 6) {
// after all rounds, only 3 should be left in active timeline. 4,5,6
assertEquals(originalCommits, commitsAfterArchival);
assertEquals(3, originalCommits.size());
}
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public enum State {
// Committed instant
COMPLETED,
// Invalid instant
INVALID
NIL
}

private State state = State.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ private List<HoodieInstant> getAllInstants() {
List<HoodieInstant> allInstants = new ArrayList<>();
long instantTime = 1;
for (State state : State.values()) {
if (state == State.INVALID) {
if (state == State.NIL) {
continue;
}
for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
Expand Down

0 comments on commit 1c1843d

Please sign in to comment.