Skip to content

Commit

Permalink
update attributes order, support nan in v1 metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
yyanyy committed Nov 24, 2020
1 parent 505a671 commit 4fc8067
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 63 deletions.
14 changes: 4 additions & 10 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg;

import java.util.List;
import java.util.Map;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.BinaryType;
import org.apache.iceberg.types.Types.IntegerType;
Expand Down Expand Up @@ -51,6 +50,8 @@ public interface DataFile extends ContentFile<DataFile> {
IntegerType.get(), LongType.get()), "Map of column id to total count, including null and NaN");
Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122,
IntegerType.get(), LongType.get()), "Map of column id to null value count");
Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139,
IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column");
Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127,
IntegerType.get(), BinaryType.get()), "Map of column id to lower bound");
Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130,
Expand All @@ -60,8 +61,6 @@ public interface DataFile extends ContentFile<DataFile> {
"Splittable offsets");
Types.NestedField EQUALITY_IDS = optional(135, "equality_ids", ListType.ofRequired(136, IntegerType.get()),
"Equality comparison field IDs");
Types.NestedField NAN_VALUE_COUNTS = optional(137, "nan_value_counts", MapType.ofRequired(138, 139,
IntegerType.get(), LongType.get()), "Map of column id to number of NaN values in the column");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
Expand All @@ -80,12 +79,12 @@ static StructType getType(StructType partitionType) {
COLUMN_SIZES,
VALUE_COUNTS,
NULL_VALUE_COUNTS,
NAN_VALUE_COUNTS,
LOWER_BOUNDS,
UPPER_BOUNDS,
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
NAN_VALUE_COUNTS
EQUALITY_IDS
);
}

Expand All @@ -101,9 +100,4 @@ default FileContent content() {
default List<Integer> equalityFieldIds() {
return null;
}

@Override
default Map<Integer, Long> nanValueCounts() {
return null;
}
}
5 changes: 5 additions & 0 deletions api/src/test/java/org/apache/iceberg/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,11 @@ public Map<Integer, Long> nullValueCounts() {
return nullValueCounts;
}

@Override
public Map<Integer, Long> nanValueCounts() {
return null; // will be updated in a separate pr soon
}

@Override
public Map<Integer, ByteBuffer> lowerBounds() {
return lowerBounds;
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,22 +252,22 @@ public void put(int i, Object value) {
this.nullValueCounts = (Map<Integer, Long>) value;
return;
case 9:
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.nanValueCounts = (Map<Integer, Long>) value;
return;
case 10:
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
this.lowerBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 11:
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
this.upperBounds = SerializableByteBufferMap.wrap((Map<Integer, ByteBuffer>) value);
return;
case 12:
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
this.keyMetadata = ByteBuffers.toByteArray((ByteBuffer) value);
return;
case 13:
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
this.splitOffsets = ArrayUtil.toLongArray((List<Long>) value);
return;
case 14:
this.nanValueCounts = (Map<Integer, Long>) value;
this.equalityIds = ArrayUtil.toIntArray((List<Integer>) value);
return;
case 15:
this.fileOrdinal = (long) value;
Expand Down Expand Up @@ -309,17 +309,17 @@ public Object get(int i) {
case 8:
return nullValueCounts;
case 9:
return lowerBounds;
return nanValueCounts;
case 10:
return upperBounds;
return lowerBounds;
case 11:
return keyMetadata();
return upperBounds;
case 12:
return splitOffsets();
return keyMetadata();
case 13:
return equalityFieldIds();
return splitOffsets();
case 14:
return nanValueCounts;
return equalityFieldIds();
case 15:
return pos;
default:
Expand Down
16 changes: 12 additions & 4 deletions core/src/main/java/org/apache/iceberg/V1Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ static Types.StructType dataFileSchema(Types.StructType partitionType) {
DataFile.COLUMN_SIZES,
DataFile.VALUE_COUNTS,
DataFile.NULL_VALUE_COUNTS,
DataFile.NAN_VALUE_COUNTS,
DataFile.LOWER_BOUNDS,
DataFile.UPPER_BOUNDS,
DataFile.KEY_METADATA,
Expand Down Expand Up @@ -343,20 +344,22 @@ public Object get(int pos) {
case 8:
return wrapped.nullValueCounts();
case 9:
return wrapped.lowerBounds();
return wrapped.nanValueCounts();
case 10:
return wrapped.upperBounds();
return wrapped.lowerBounds();
case 11:
return wrapped.keyMetadata();
return wrapped.upperBounds();
case 12:
return wrapped.keyMetadata();
case 13:
return wrapped.splitOffsets();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}

@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot write into IndexedDataFile");
throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
}

@Override
Expand Down Expand Up @@ -419,6 +422,11 @@ public Map<Integer, Long> nullValueCounts() {
return wrapped.nullValueCounts();
}

@Override
public Map<Integer, Long> nanValueCounts() {
return wrapped.nanValueCounts();
}

@Override
public Map<Integer, ByteBuffer> lowerBounds() {
return wrapped.lowerBounds();
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/java/org/apache/iceberg/V2Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ static Types.StructType fileType(Types.StructType partitionType) {
DataFile.COLUMN_SIZES,
DataFile.VALUE_COUNTS,
DataFile.NULL_VALUE_COUNTS,
DataFile.NAN_VALUE_COUNTS,
DataFile.LOWER_BOUNDS,
DataFile.UPPER_BOUNDS,
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.NAN_VALUE_COUNTS
DataFile.EQUALITY_IDS
);
}

Expand Down Expand Up @@ -398,24 +398,24 @@ public Object get(int pos) {
case 8:
return wrapped.nullValueCounts();
case 9:
return wrapped.lowerBounds();
return wrapped.nanValueCounts();
case 10:
return wrapped.upperBounds();
return wrapped.lowerBounds();
case 11:
return wrapped.keyMetadata();
return wrapped.upperBounds();
case 12:
return wrapped.splitOffsets();
return wrapped.keyMetadata();
case 13:
return wrapped.equalityFieldIds();
return wrapped.splitOffsets();
case 14:
return wrapped.nanValueCounts();
return wrapped.equalityFieldIds();
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}

@Override
public void put(int i, Object v) {
throw new UnsupportedOperationException("Cannot write into IndexedDataFile");
throw new UnsupportedOperationException("Cannot read into IndexedDataFile");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,7 @@ private void assertFullStats(DataFile dataFile) {
Assert.assertEquals(NULL_VALUE_COUNTS, dataFile.nullValueCounts());
Assert.assertEquals(LOWER_BOUNDS, dataFile.lowerBounds());
Assert.assertEquals(UPPER_BOUNDS, dataFile.upperBounds());

if (formatVersion == 1) {
Assert.assertNull(dataFile.nanValueCounts());
} else {
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());
}
Assert.assertEquals(NAN_VALUE_COUNTS, dataFile.nanValueCounts());
}

private void assertStatsDropped(DataFile dataFile) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class TestManifestWriterVersions {
ImmutableMap.of(1, 15L, 2, 122L, 3, 4021L, 4, 9411L, 5, 15L), // sizes
ImmutableMap.of(1, 100L, 2, 100L, 3, 100L, 4, 100L, 5, 100L), // value counts
ImmutableMap.of(1, 0L, 2, 0L, 3, 0L, 4, 0L, 5, 0L), // null value counts
ImmutableMap.of(5, 10L), // null value counts
ImmutableMap.of(5, 10L), // nan value counts
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1))); // upper bounds
private static final List<Long> OFFSETS = ImmutableList.of(4L);
Expand All @@ -85,7 +85,7 @@ public class TestManifestWriterVersions {
public void testV1Write() throws IOException {
ManifestFile manifest = writeManifest(1);
checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, false);
checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
}

@Test
Expand All @@ -101,15 +101,15 @@ public void testV1WriteWithInheritance() throws IOException {
checkManifest(manifest, 0L);

// v1 should be read using sequence number 0 because it was missing from the manifest list file
checkEntry(readManifest(manifest), 0L, FileContent.DATA, false);
checkEntry(readManifest(manifest), 0L, FileContent.DATA);
}

@Test
public void testV2Write() throws IOException {
ManifestFile manifest = writeManifest(2);
checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());
checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA, true);
checkEntry(readManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.DATA);
}

@Test
Expand All @@ -119,15 +119,15 @@ public void testV2WriteWithInheritance() throws IOException {
Assert.assertEquals("Content", ManifestContent.DATA, manifest.content());

// v2 should use the correct sequence number by inheriting it
checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA, true);
checkEntry(readManifest(manifest), SEQUENCE_NUMBER, FileContent.DATA);
}

@Test
public void testV2WriteDelete() throws IOException {
ManifestFile manifest = writeDeleteManifest(2);
checkManifest(manifest, ManifestWriter.UNASSIGNED_SEQ);
Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());
checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES, true);
checkEntry(readDeleteManifest(manifest), ManifestWriter.UNASSIGNED_SEQ, FileContent.EQUALITY_DELETES);
}

@Test
Expand All @@ -137,7 +137,7 @@ public void testV2WriteDeleteWithInheritance() throws IOException {
Assert.assertEquals("Content", ManifestContent.DELETES, manifest.content());

// v2 should use the correct sequence number by inheriting it
checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES, true);
checkEntry(readDeleteManifest(manifest), SEQUENCE_NUMBER, FileContent.EQUALITY_DELETES);
}

@Test
Expand All @@ -152,8 +152,7 @@ public void testV2ManifestListRewriteWithInheritance() throws IOException {
checkManifest(manifest2, 0L);

// should not inherit the v2 sequence number because it was a rewrite
// NaN count also won't be present since v1 manifest doesn't have this information
checkEntry(readManifest(manifest2), 0L, FileContent.DATA, false);
checkEntry(readManifest(manifest2), 0L, FileContent.DATA);
}

@Test
Expand All @@ -172,26 +171,24 @@ public void testV2ManifestRewriteWithInheritance() throws IOException {
checkRewrittenManifest(manifest2, SEQUENCE_NUMBER, 0L);

// should not inherit the v2 sequence number because it was written into the v2 manifest
// NaN count also won't be present since v1 manifest doesn't have this information
checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA, false);
checkRewrittenEntry(readManifest(manifest2), 0L, FileContent.DATA);
}

void checkEntry(ManifestEntry<?> entry, Long expectedSequenceNumber, FileContent content, boolean hasNaNCount) {
void checkEntry(ManifestEntry<?> entry, Long expectedSequenceNumber, FileContent content) {
Assert.assertEquals("Status", ManifestEntry.Status.ADDED, entry.status());
Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
checkDataFile(entry.file(), content, hasNaNCount);
checkDataFile(entry.file(), content);
}

void checkRewrittenEntry(ManifestEntry<DataFile> entry, Long expectedSequenceNumber,
FileContent content, boolean hasNaNCount) {
void checkRewrittenEntry(ManifestEntry<DataFile> entry, Long expectedSequenceNumber, FileContent content) {
Assert.assertEquals("Status", ManifestEntry.Status.EXISTING, entry.status());
Assert.assertEquals("Snapshot ID", (Long) SNAPSHOT_ID, entry.snapshotId());
Assert.assertEquals("Sequence number", expectedSequenceNumber, entry.sequenceNumber());
checkDataFile(entry.file(), content, hasNaNCount);
checkDataFile(entry.file(), content);
}

void checkDataFile(ContentFile<?> dataFile, FileContent content, boolean hasNaNCount) {
void checkDataFile(ContentFile<?> dataFile, FileContent content) {
// DataFile is the superclass of DeleteFile, so this method can check both
Assert.assertEquals("Content", content, dataFile.content());
Assert.assertEquals("Path", PATH, dataFile.path());
Expand All @@ -201,18 +198,14 @@ void checkDataFile(ContentFile<?> dataFile, FileContent content, boolean hasNaNC
Assert.assertEquals("Column sizes", METRICS.columnSizes(), dataFile.columnSizes());
Assert.assertEquals("Value counts", METRICS.valueCounts(), dataFile.valueCounts());
Assert.assertEquals("Null value counts", METRICS.nullValueCounts(), dataFile.nullValueCounts());
Assert.assertEquals("NaN value counts", METRICS.nanValueCounts(), dataFile.nanValueCounts());
Assert.assertEquals("Lower bounds", METRICS.lowerBounds(), dataFile.lowerBounds());
Assert.assertEquals("Upper bounds", METRICS.upperBounds(), dataFile.upperBounds());
if (dataFile.content() == FileContent.EQUALITY_DELETES) {
Assert.assertEquals(EQUALITY_IDS, dataFile.equalityFieldIds());
} else {
Assert.assertNull(dataFile.equalityFieldIds());
}
if (hasNaNCount) {
Assert.assertEquals("NaN", METRICS.nanValueCounts(), dataFile.nanValueCounts());
} else {
Assert.assertNull("NaN", dataFile.nanValueCounts());
}
}

void checkManifest(ManifestFile manifest, long expectedSequenceNumber) {
Expand Down

0 comments on commit 4fc8067

Please sign in to comment.