diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java index 057517780007..8e42b7c6eaef 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java @@ -625,7 +625,11 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC checkArgument(Objects.equals(prevColumnStats.getFileName(), newColumnStats.getFileName())); checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName())); - if (newColumnStats.getIsDeleted()) { + // We're handling 2 cases in here + // - New record is a tombstone: in this case it simply overwrites previous state + // - Previous record is a tombstone: in that case new proper record would also + // be simply overwriting previous state + if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) { return newColumnStats; } @@ -639,8 +643,8 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC Comparable maxValue = (Comparable) Stream.of( - (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMinValue()), - (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMinValue())) + (Comparable) unwrapStatisticValueWrapper(prevColumnStats.getMaxValue()), + (Comparable) unwrapStatisticValueWrapper(newColumnStats.getMaxValue())) .filter(Objects::nonNull) .max(Comparator.naturalOrder()) .orElse(null); @@ -658,6 +662,28 @@ private static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataC .build(); } + @Override + public boolean equals(Object other) { + if (other == this) { + return true; + } else if (!(other instanceof HoodieMetadataPayload)) { + return false; + } + + HoodieMetadataPayload otherMetadataPayload = (HoodieMetadataPayload) other; + + return this.type == otherMetadataPayload.type + && Objects.equals(this.key, otherMetadataPayload.key) + && Objects.equals(this.filesystemMetadata, otherMetadataPayload.filesystemMetadata) + && Objects.equals(this.bloomFilterMetadata, otherMetadataPayload.bloomFilterMetadata) + && Objects.equals(this.columnStatMetadata, otherMetadataPayload.columnStatMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(key, type, filesystemMetadata, bloomFilterMetadata, columnStatMetadata); + } + @Override public String toString() { final StringBuilder sb = new StringBuilder("HoodieMetadataPayload {"); diff --git a/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java new file mode 100644 index 000000000000..7b4d432b3f80 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataPayload.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.metadata; + +import org.apache.avro.generic.IndexedRecord; +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.util.CollectionUtils.createImmutableMap; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieMetadataPayload extends HoodieCommonTestHarness { + + @Test + public void testFileSystemMetadataPayloadMerging() { + String partitionName = "2022/10/01"; + + Map firstCommitAddedFiles = createImmutableMap( + Pair.of("file1.parquet", 1000L), + Pair.of("file2.parquet", 2000L), + Pair.of("file3.parquet", 3000L) + ); + + HoodieRecord firstPartitionFilesRecord = + HoodieMetadataPayload.createPartitionFilesRecord(partitionName, Option.of(firstCommitAddedFiles), Option.empty()); + + Map secondCommitAddedFiles = createImmutableMap( + // NOTE: This is an append + Pair.of("file3.parquet", 3333L), + Pair.of("file4.parquet", 4000L), + Pair.of("file5.parquet", 5000L) + ); + + List secondCommitDeletedFiles = Collections.singletonList("file1.parquet"); + + HoodieRecord secondPartitionFilesRecord = + HoodieMetadataPayload.createPartitionFilesRecord(partitionName, Option.of(secondCommitAddedFiles), Option.of(secondCommitDeletedFiles)); + + HoodieMetadataPayload combinedPartitionFilesRecordPayload = + secondPartitionFilesRecord.getData().preCombine(firstPartitionFilesRecord.getData()); + + HoodieMetadataPayload expectedCombinedPartitionedFilesRecordPayload = + HoodieMetadataPayload.createPartitionFilesRecord(partitionName, + Option.of( + createImmutableMap( + Pair.of("file2.parquet", 2000L), + Pair.of("file3.parquet", 3333L), + Pair.of("file4.parquet", 4000L), + Pair.of("file5.parquet", 5000L) + ) + ), + Option.empty() + ).getData(); + + assertEquals(expectedCombinedPartitionedFilesRecordPayload, combinedPartitionFilesRecordPayload); + } + + @Test + public void testColumnStatsPayloadMerging() throws IOException { + String partitionPath = "2022/10/01"; + String fileName = "file.parquet"; + String targetColName = "c1"; + + HoodieColumnRangeMetadata c1Metadata = + HoodieColumnRangeMetadata.create(fileName, targetColName, 100, 1000, 5, 1000, 123456, 123456); + + HoodieRecord columnStatsRecord = + HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1Metadata), false) + .findFirst().get(); + + //////////////////////////////////////////////////////////////////////// + // Case 1: Combining proper (non-deleted) records + //////////////////////////////////////////////////////////////////////// + + // NOTE: Column Stats record will only be merged in case existing file will be modified, + // which could only happen on storages schemes supporting appends + HoodieColumnRangeMetadata c1AppendedBlockMetadata = + HoodieColumnRangeMetadata.create(fileName, targetColName, 0, 500, 0, 100, 12345, 12345); + + HoodieRecord updatedColumnStatsRecord = + HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1AppendedBlockMetadata), false) + .findFirst().get(); + + HoodieMetadataPayload combinedMetadataPayload = + columnStatsRecord.getData().preCombine(updatedColumnStatsRecord.getData()); + + HoodieColumnRangeMetadata expectedColumnRangeMetadata = + HoodieColumnRangeMetadata.create(fileName, targetColName, 0, 1000, 5, 1100, 135801, 135801); + + HoodieRecord expectedColumnStatsRecord = + HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(expectedColumnRangeMetadata), false) + .findFirst().get(); + + // Assert combined payload + assertEquals(combinedMetadataPayload, expectedColumnStatsRecord.getData()); + + Option alternativelyCombinedMetadataPayloadAvro = + columnStatsRecord.getData().combineAndGetUpdateValue(updatedColumnStatsRecord.getData().getInsertValue(null).get(), null); + + // Assert that using legacy API yields the same value + assertEquals(combinedMetadataPayload.getInsertValue(null), alternativelyCombinedMetadataPayloadAvro); + + //////////////////////////////////////////////////////////////////////// + // Case 2: Combining w/ deleted records + //////////////////////////////////////////////////////////////////////// + + HoodieColumnRangeMetadata c1StubbedMetadata = + HoodieColumnRangeMetadata.stub(fileName, targetColName); + + HoodieRecord deletedColumnStatsRecord = + HoodieMetadataPayload.createColumnStatsRecords(partitionPath, Collections.singletonList(c1StubbedMetadata), true) + .findFirst().get(); + + // NOTE: In this case, deleted (or tombstone) record will be therefore deleting + // previous state of the record + HoodieMetadataPayload deletedCombinedMetadataPayload = + deletedColumnStatsRecord.getData().preCombine(columnStatsRecord.getData()); + + assertEquals(deletedColumnStatsRecord.getData(), deletedCombinedMetadataPayload); + + // NOTE: In this case, proper incoming record will be overwriting previously deleted + // record + HoodieMetadataPayload overwrittenCombinedMetadataPayload = + columnStatsRecord.getData().preCombine(deletedColumnStatsRecord.getData()); + + assertEquals(columnStatsRecord.getData(), overwrittenCombinedMetadataPayload); + } +}