Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5078] Fixing determination of table service for metadata calls #7037

Merged
merged 1 commit into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(actionType)));
table.isTableServiceAction(actionType, instantTime)));
}
nsivabalan marked this conversation as resolved.
Show resolved Hide resolved

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
Expand All @@ -58,6 +59,7 @@
import org.apache.hudi.common.table.view.TableFileSystemView;
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.Functions;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -859,10 +861,23 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri

/**
* Check if action type is a table service.
* @param actionType action type of interest.
* @param actionType action type of the instant
* @param instantTime instant time of the instant.
* @return true if action represents a table service. false otherwise.
*/
public abstract boolean isTableServiceAction(String actionType);
public boolean isTableServiceAction(String actionType, String instantTime) {
if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
// only clustering is table service with replace commit action
return instantPlan.isPresent();
} else {
if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
} else {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
}
}

/**
* Get Table metadata writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(actionType)));
metadata, instantTime, table.isTableServiceAction(actionType, instantTime)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType));
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
Expand Down Expand Up @@ -93,11 +92,6 @@ public HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

/**
* Upsert a batch of new records into Hoodie table at the supplied instantTime.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -59,11 +58,6 @@ public class HoodieFlinkMergeOnReadTable<T extends HoodieRecordPayload>
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(
HoodieEngineContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
Expand Down Expand Up @@ -90,11 +89,6 @@ protected HoodieJavaCopyOnWriteTable(HoodieWriteConfig config,
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsert(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.HoodieWriteMetadata;
Expand All @@ -44,11 +43,6 @@ protected HoodieJavaMergeOnReadTable(HoodieWriteConfig config, HoodieEngineConte
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<List<WriteStatus>> upsertPrepped(HoodieEngineContext context,
String instantTime,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction());
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,6 @@ public HoodieSparkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@ public class HoodieSparkMergeOnReadTable<T extends HoodieRecordPayload> extends
super(config, context, metaClient);
}

@Override
public boolean isTableServiceAction(String actionType) {
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}

@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>> upsert(HoodieEngineContext context, String instantTime, HoodieData<HoodieRecord<T>> records) {
return new SparkUpsertDeltaCommitActionExecutor<>((HoodieSparkEngineContext) context, config, this, instantTime, records).execute();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,29 @@ public void testArchiveTableWithArchival(boolean enableMetadata) throws Exceptio
}
}

@Test
public void testArchiveTableWithReplaceCommits() throws Exception {
HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(true, 2, 4, 2);
for (int i = 1; i < 7; i++) {
if (i < 3) {
testTable.doWriteOperation("0000000" + i, WriteOperationType.UPSERT, i == 1 ? Arrays.asList("p1", "p2") : Collections.emptyList(),
Arrays.asList("p1", "p2"), 2);
} else {
testTable.doWriteOperation("0000000" + i, WriteOperationType.INSERT_OVERWRITE, Collections.emptyList(), Arrays.asList("p1", "p2"), 2);
}
// trigger archival
Pair<List<HoodieInstant>, List<HoodieInstant>> commitsList = archiveAndGetCommitsList(writeConfig);
List<HoodieInstant> originalCommits = commitsList.getKey();
List<HoodieInstant> commitsAfterArchival = commitsList.getValue();

if (i == 6) {
// after all rounds, only 3 should be left in active timeline. 4,5,6
assertEquals(originalCommits, commitsAfterArchival);
assertEquals(3, originalCommits.size());
}
}
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testSavepointWithArchival(boolean archiveBeyondSavepoint) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public enum State {
// Committed instant
COMPLETED,
// Invalid instant
INVALID
NIL
xushiyan marked this conversation as resolved.
Show resolved Hide resolved
}

private State state = State.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ private List<HoodieInstant> getAllInstants() {
List<HoodieInstant> allInstants = new ArrayList<>();
long instantTime = 1;
for (State state : State.values()) {
if (state == State.INVALID) {
if (state == State.NIL) {
continue;
}
for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
Expand Down