Skip to content

Commit

Permalink
addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Dec 2, 2022
1 parent 6b24d08 commit c3cf14a
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ protected void preCommit(HoodieInstant inflightInstant, HoodieCommitMetadata met
protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {
context.setJobStatus(this.getClass().getSimpleName(), "Committing to metadata table: " + config.getTableName());
table.getMetadataWriter(instantTime).ifPresent(w -> ((HoodieTableMetadataWriter) w).update(metadata, instantTime,
table.isTableServiceAction(new HoodieInstant(State.INFLIGHT, actionType, instantTime))));
table.isTableServiceAction(actionType, instantTime)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -860,19 +860,20 @@ public final Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeri

/**
* Check if action type is a table service.
* @param instant instant of interest
* @param actionType action type of the instant
* @param instantTime instant time of the instant.
* @return true if action represents a table service. false otherwise.
*/
public boolean isTableServiceAction(HoodieInstant instant) {
if (instant.getAction().equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, instant);
public boolean isTableServiceAction(String actionType, String instantTime) {
if (actionType.equals(HoodieTimeline.REPLACE_COMMIT_ACTION)) {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan = ClusteringUtils.getClusteringPlan(metaClient, new HoodieInstant(HoodieInstant.State.NIL, actionType, instantTime));
// only clustering is table service with replace commit action
return instantPlan.isPresent();
} else {
if (this.metaClient.getTableType() == HoodieTableType.COPY_ON_WRITE) {
return !instant.getAction().equals(HoodieTimeline.COMMIT_ACTION);
return !actionType.equals(HoodieTimeline.COMMIT_ACTION);
} else {
return !instant.getAction().equals(HoodieTimeline.DELTA_COMMIT_ACTION);
return !actionType.equals(HoodieTimeline.DELTA_COMMIT_ACTION);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieTable;
Expand Down Expand Up @@ -59,7 +58,7 @@ public BaseActionExecutor(HoodieEngineContext context, HoodieWriteConfig config,
*/
protected final void writeTableMetadata(HoodieCommitMetadata metadata, String actionType) {
table.getMetadataWriter(instantTime).ifPresent(w -> w.update(
metadata, instantTime, table.isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime))));
metadata, instantTime, table.isTableServiceAction(actionType, instantTime)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ protected void writeTableMetadata(HoodieTable table, String instantTime, String
// the schema expects to be immutable for SQL jobs but may be not for non-SQL
// jobs.
this.metadataWriter.initTableMetadata();
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(new HoodieInstant(HoodieInstant.State.INFLIGHT, actionType, instantTime)));
this.metadataWriter.update(metadata, instantTime, getHoodieTable().isTableServiceAction(actionType, instantTime));
} finally {
this.txnManager.getLockManager().unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ private void validateClusteringCommit(HoodieWriteMetadata<JavaRDD<WriteStatus>>

private void updateTableMetadata(HoodieTable table, HoodieCommitMetadata commitMetadata,
HoodieInstant hoodieInstant) {
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant);
boolean isTableServiceAction = table.isTableServiceAction(hoodieInstant.getAction(), hoodieInstant.getTimestamp());
// Do not do any conflict resolution here as we do with regular writes. We take the lock here to ensure all writes to metadata table happens within a
// single lock (single writer). Because more than one write to metadata table will result in conflicts since all of them updates the same partition.
table.getMetadataWriter(hoodieInstant.getTimestamp())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public enum State {
// Committed instant
COMPLETED,
// Invalid instant
INVALID
NIL
}

private State state = State.COMPLETED;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ private List<HoodieInstant> getAllInstants() {
List<HoodieInstant> allInstants = new ArrayList<>();
long instantTime = 1;
for (State state : State.values()) {
if (state == State.INVALID) {
if (state == State.NIL) {
continue;
}
for (String action : HoodieTimeline.VALID_ACTIONS_IN_TIMELINE) {
Expand Down

0 comments on commit c3cf14a

Please sign in to comment.