diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index 0dbbfa8d15aa..68b21464d63c 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -157,6 +157,10 @@ public static Comparator unsignedBytes() { return UnsignedByteBufComparator.INSTANCE; } + public static Comparator unsignedByteArray() { + return UnsignedByteArrayComparator.INSTANCE; + } + public static Comparator signedBytes() { return Comparator.naturalOrder(); } @@ -272,6 +276,30 @@ public int compare(ByteBuffer buf1, ByteBuffer buf2) { } } + private static class UnsignedByteArrayComparator implements Comparator { + private static final UnsignedByteArrayComparator INSTANCE = new UnsignedByteArrayComparator(); + + private UnsignedByteArrayComparator() { + } + + @Override + public int compare(byte[] array1, byte[] array2) { + int len = Math.min(array1.length, array2.length); + + // find the first difference and return + for (int i = 0; i < len; i += 1) { + // Conversion to int is what Byte.toUnsignedInt would do + int cmp = Integer.compare(((int) array1[i]) & 0xff, ((int) array2[i]) & 0xff); + if (cmp != 0) { + return cmp; + } + } + + // if there are no differences, then the shorter seq is first + return Integer.compare(array1.length, array2.length); + } + } + private static class CharSeqComparator implements Comparator { private static final CharSeqComparator INSTANCE = new CharSeqComparator(); diff --git a/core/src/main/java/org/apache/iceberg/FieldMetrics.java b/core/src/main/java/org/apache/iceberg/FieldMetrics.java index d67faa94f1ab..6d13153932d1 100644 --- a/core/src/main/java/org/apache/iceberg/FieldMetrics.java +++ b/core/src/main/java/org/apache/iceberg/FieldMetrics.java @@ -20,8 +20,6 @@ package org.apache.iceberg; -import java.nio.ByteBuffer; - /** * Iceberg internally tracked field level metrics. */ @@ -30,15 +28,15 @@ public class FieldMetrics { private final long valueCount; private final long nullValueCount; private final long nanValueCount; - private final ByteBuffer lowerBound; - private final ByteBuffer upperBound; + private final Object lowerBound; + private final Object upperBound; public FieldMetrics(int id, long valueCount, long nullValueCount, long nanValueCount, - ByteBuffer lowerBound, - ByteBuffer upperBound) { + Object lowerBound, + Object upperBound) { this.id = id; this.valueCount = valueCount; this.nullValueCount = nullValueCount; @@ -78,14 +76,14 @@ public long nanValueCount() { /** * Returns the lower bound value of this field. */ - public ByteBuffer lowerBound() { + public Object lowerBound() { return lowerBound; } /** * Returns the upper bound value of this field. */ - public ByteBuffer upperBound() { + public Object upperBound() { return upperBound; } } diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 5a34eedc80d5..2aac1065723f 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -28,6 +28,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -37,7 +38,10 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetadataColumns; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.StructLike; @@ -100,8 +104,9 @@ public static class WriteBuilder { private final Map metadata = Maps.newLinkedHashMap(); private org.apache.iceberg.Schema schema = null; private String name = "table"; - private Function> createWriterFunc = null; + private Function> createWriterFunc = null; private boolean overwrite; + private MetricsConfig metricsConfig; private WriteBuilder(OutputFile file) { this.file = file; @@ -123,7 +128,7 @@ public WriteBuilder named(String newName) { return this; } - public WriteBuilder createWriterFunc(Function> writerFunction) { + public WriteBuilder createWriterFunc(Function> writerFunction) { this.createWriterFunc = writerFunction; return this; } @@ -148,6 +153,11 @@ public WriteBuilder meta(Map properties) { return this; } + public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + this.metricsConfig = newMetricsConfig; + return this; + } + public WriteBuilder overwrite() { return overwrite(true); } @@ -170,7 +180,7 @@ public FileAppender build() throws IOException { Preconditions.checkNotNull(schema, "Schema is required"); Preconditions.checkNotNull(name, "Table name is required and cannot be null"); - Function> writerFunc; + Function> writerFunc; if (createWriterFunc != null) { writerFunc = createWriterFunc; } else { @@ -181,7 +191,7 @@ public FileAppender build() throws IOException { meta("iceberg.schema", SchemaParser.toJson(schema)); return new AvroFileAppender<>( - AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite); + schema, AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, metricsConfig, overwrite); } } @@ -192,7 +202,7 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) { public static class DeleteWriteBuilder { private final WriteBuilder appenderBuilder; private final String location; - private Function> createWriterFunc = null; + private Function> createWriterFunc = null; private org.apache.iceberg.Schema rowSchema; private PartitionSpec spec; private StructLike partition; @@ -240,7 +250,7 @@ public DeleteWriteBuilder overwrite(boolean enabled) { return this; } - public DeleteWriteBuilder createWriterFunc(Function> writerFunction) { + public DeleteWriteBuilder createWriterFunc(Function> writerFunction) { this.createWriterFunc = writerFunction; return this; } @@ -320,9 +330,11 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { /** * A {@link DatumWriter} implementation that wraps another to produce position deletes. */ - private static class PositionDatumWriter implements DatumWriter> { - private static final ValueWriter PATH_WRITER = ValueWriters.strings(); - private static final ValueWriter POS_WRITER = ValueWriters.longs(); + private static class PositionDatumWriter implements MetricsAwareDatumWriter> { + private static final ValueWriter PATH_WRITER = + ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId()); + private static final ValueWriter POS_WRITER = + ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId()); @Override public void setSchema(Schema schema) { @@ -333,6 +345,11 @@ public void write(PositionDelete delete, Encoder out) throws IOException { PATH_WRITER.write(delete.path(), out); POS_WRITER.write(delete.pos(), out); } + + @Override + public Stream metrics() { + return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics()); + } } /** @@ -340,9 +357,12 @@ public void write(PositionDelete delete, Encoder out) throws IOException { * * @param the type of datum written as a deleted row */ - private static class PositionAndRowDatumWriter implements DatumWriter> { - private static final ValueWriter PATH_WRITER = ValueWriters.strings(); - private static final ValueWriter POS_WRITER = ValueWriters.longs(); + private static class PositionAndRowDatumWriter implements MetricsAwareDatumWriter> { + private static final ValueWriter PATH_WRITER = + ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId()); + private static final ValueWriter POS_WRITER = + ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId()); + private final DatumWriter rowWriter; private PositionAndRowDatumWriter(DatumWriter rowWriter) { @@ -363,6 +383,11 @@ public void write(PositionDelete delete, Encoder out) throws IOException { POS_WRITER.write(delete.pos(), out); rowWriter.write(delete.row(), out); } + + @Override + public Stream metrics() { + return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics()); + } } public static ReadBuilder read(InputFile file) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java index a77b07d0b8e0..048c6ca9b8f8 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java @@ -27,22 +27,31 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; class AvroFileAppender implements FileAppender { private PositionOutputStream stream = null; private DataFileWriter writer = null; + private MetricsAwareDatumWriter metricsAwareDatumWriter = null; + private org.apache.iceberg.Schema icebergSchema; + private MetricsConfig metricsConfig; private long numRecords = 0L; + private boolean isClosed = false; - AvroFileAppender(Schema schema, OutputFile file, - Function> createWriterFunc, + AvroFileAppender(org.apache.iceberg.Schema icebergSchema, Schema schema, OutputFile file, + Function> createWriterFunc, CodecFactory codec, Map metadata, - boolean overwrite) throws IOException { + MetricsConfig metricsConfig, boolean overwrite) throws IOException { + this.icebergSchema = icebergSchema; this.stream = overwrite ? file.createOrOverwrite() : file.create(); - this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata); + this.metricsAwareDatumWriter = createWriterFunc.apply(schema); + this.writer = newAvroWriter(schema, stream, metricsAwareDatumWriter, codec, metadata); + this.metricsConfig = metricsConfig; } @Override @@ -57,7 +66,10 @@ public void add(D datum) { @Override public Metrics metrics() { - return new Metrics(numRecords, null, null, null); + Preconditions.checkState(isClosed, + "Cannot return metrics while appending to an open file."); + + return AvroMetrics.fromWriter(metricsAwareDatumWriter, icebergSchema, numRecords, metricsConfig); } @Override @@ -77,15 +89,16 @@ public void close() throws IOException { if (writer != null) { writer.close(); this.writer = null; + isClosed = true; } } @SuppressWarnings("unchecked") private static DataFileWriter newAvroWriter( - Schema schema, PositionOutputStream stream, Function> createWriterFunc, + Schema schema, PositionOutputStream stream, MetricsAwareDatumWriter metricsAwareDatumWriter, CodecFactory codec, Map metadata) throws IOException { DataFileWriter writer = new DataFileWriter<>( - (DatumWriter) createWriterFunc.apply(schema)); + (DatumWriter) metricsAwareDatumWriter); writer.setCodec(codec); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java b/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java new file mode 100644 index 000000000000..5db96b3f234f --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.MetricsModes; +import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Literal; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.util.BinaryUtil; +import org.apache.iceberg.util.UnicodeUtil; + +public class AvroMetrics { + + private AvroMetrics() { + } + + static Metrics fromWriter(MetricsAwareDatumWriter datumWriter, Schema schema, long numRecords, + MetricsConfig inputMetricsConfig) { + MetricsConfig metricsConfig; + if (inputMetricsConfig == null) { + metricsConfig = MetricsConfig.getDefault(); + } else { + metricsConfig = inputMetricsConfig; + } + + Map valueCounts = new HashMap<>(); + Map nullValueCounts = new HashMap<>(); + Map nanValueCounts = new HashMap<>(); + Map lowerBounds = new HashMap<>(); + Map upperBounds = new HashMap<>(); + + datumWriter.metrics().forEach(metrics -> { + String columnName = schema.findColumnName(metrics.id()); + MetricsModes.MetricsMode metricsMode = metricsConfig.columnMode(columnName); + if (metricsMode == MetricsModes.None.get()) { + return; + } + + valueCounts.put(metrics.id(), metrics.valueCount()); + nullValueCounts.put(metrics.id(), metrics.nullValueCount()); + Type type = schema.findType(metrics.id()); + + if (type.typeId() == Type.TypeID.FLOAT || type.typeId() == Type.TypeID.DOUBLE) { + nanValueCounts.put(metrics.id(), metrics.nanValueCount()); + } + + if (metricsMode == MetricsModes.Counts.get()) { + return; + } + + updateLowerBound(metrics, type, metricsMode).ifPresent(lowerBound -> lowerBounds.put(metrics.id(), lowerBound)); + updateUpperBound(metrics, type, metricsMode).ifPresent(upperBound -> upperBounds.put(metrics.id(), upperBound)); + }); + + return new Metrics(numRecords, null, + valueCounts, nullValueCounts, nanValueCounts, lowerBounds, upperBounds); + } + + private static Optional updateLowerBound(FieldMetrics metrics, Type type, + MetricsModes.MetricsMode metricsMode) { + if (metrics.lowerBound() == null) { + return Optional.empty(); + } + + Object lowerBound = metrics.lowerBound(); + if (metricsMode instanceof MetricsModes.Truncate) { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + lowerBound = UnicodeUtil.truncateStringMin( + Literal.of((CharSequence) metrics.lowerBound()), truncateLength).value(); + break; + case FIXED: + case BINARY: + lowerBound = BinaryUtil.truncateBinaryMin( + Literal.of((ByteBuffer) metrics.lowerBound()), truncateLength).value(); + break; + default: + break; + } + } + + return Optional.ofNullable(Conversions.toByteBuffer(type, lowerBound)); + } + + private static Optional updateUpperBound(FieldMetrics metrics, Type type, + MetricsModes.MetricsMode metricsMode) { + if (metrics.upperBound() == null) { + return Optional.empty(); + } + + Object upperBound = null; + if (metricsMode instanceof MetricsModes.Truncate) { + MetricsModes.Truncate truncateMode = (MetricsModes.Truncate) metricsMode; + int truncateLength = truncateMode.length(); + switch (type.typeId()) { + case STRING: + upperBound = Optional.ofNullable( + UnicodeUtil.truncateStringMax(Literal.of((CharSequence) metrics.upperBound()), truncateLength)) + .map(Literal::value) + .orElse(null); + break; + case FIXED: + case BINARY: + upperBound = Optional.ofNullable( + BinaryUtil.truncateBinaryMax(Literal.of((ByteBuffer) metrics.upperBound()), truncateLength)) + .map(Literal::value) + .orElse(null); + break; + default: + break; + } + } + + if (upperBound == null) { + upperBound = metrics.upperBound(); + } + + return Optional.ofNullable(Conversions.toByteBuffer(type, upperBound)); + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java index 929fb5f1414f..54f4397e0a9e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -244,6 +245,28 @@ static boolean hasProperty(Schema schema, String propertyName) { return schema.getObjectProp(propertyName) != null; } + public static int fieldId(Schema currentSchema, Schema parentSchema, Supplier fieldNameGetter) { + Preconditions.checkNotNull(parentSchema, "Detected no parent schema for schema %s", currentSchema); + + switch (parentSchema.getType()) { + case RECORD: + String fieldName = fieldNameGetter.get(); + Schema.Field field = parentSchema.getField(fieldName); + Preconditions.checkNotNull(field, + "Cannot get field id with field name %s from schema %s for current schema %s", + fieldName, parentSchema, currentSchema); + + return getFieldId(field); + case ARRAY: + return getElementId(parentSchema); + case MAP: + return getValueId(parentSchema); + default: + throw new IllegalStateException(String.format( + "Cannot retrieve field ID of current schema %s from parent schema %s ", currentSchema, parentSchema)); + } + } + public static int getKeyId(Schema schema) { Preconditions.checkArgument(schema.getType() == MAP, "Cannot get map key id for non-map schema: %s", schema); @@ -408,4 +431,11 @@ private static String sanitize(char character) { } return "_x" + Integer.toHexString(character).toUpperCase(); } + + static boolean isMetricSupportedType(Schema.Type type) { + // ENUM will not be created by converting iceberg schema to avro schema, and thus not included + return type == Schema.Type.BOOLEAN || type == Schema.Type.INT || type == Schema.Type.LONG || + type == Schema.Type.FLOAT || type == Schema.Type.DOUBLE || type == Schema.Type.STRING || + type == Schema.Type.FIXED || type == Schema.Type.BYTES; + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java index 281d45b51c2a..5691ea25567b 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java @@ -35,6 +35,7 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { "Cannot process recursive Avro record %s", name); visitor.recordLevels.push(name); + visitor.parentSchemas.push(schema); List fields = schema.getFields(); List names = Lists.newArrayListWithExpectedSize(fields.size()); @@ -46,6 +47,7 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { } visitor.recordLevels.pop(); + visitor.parentSchemas.pop(); return visitor.record(schema, names, results); @@ -59,13 +61,20 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { case ARRAY: if (schema.getLogicalType() instanceof LogicalMap) { - return visitor.array(schema, visit(schema.getElementType(), visitor)); + T result = visit(schema.getElementType(), visitor); + return visitor.array(schema, result); } else { - return visitor.array(schema, visitWithName("element", schema.getElementType(), visitor)); + visitor.parentSchemas.push(schema); + T result = visitWithName("element", schema.getElementType(), visitor); + visitor.parentSchemas.pop(); + return visitor.array(schema, result); } case MAP: - return visitor.map(schema, visitWithName("value", schema.getValueType(), visitor)); + visitor.parentSchemas.push(schema); + T result = visitWithName("value", schema.getValueType(), visitor); + visitor.parentSchemas.pop(); + return visitor.map(schema, result); default: return visitor.primitive(schema); @@ -74,11 +83,20 @@ public static T visit(Schema schema, AvroSchemaVisitor visitor) { private Deque recordLevels = Lists.newLinkedList(); private Deque fieldNames = Lists.newLinkedList(); + private Deque parentSchemas = Lists.newLinkedList(); protected Deque fieldNames() { return fieldNames; } + protected String lastFieldName() { + return fieldNames.peekLast(); + } + + protected Schema parentSchema() { + return parentSchemas.peek(); + } + private static T visitWithName(String name, Schema schema, AvroSchemaVisitor visitor) { try { visitor.fieldNames.addLast(name); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java index 2b28f54aaaaa..8d6f424f76e2 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroWithPartnerByStructureVisitor.java @@ -51,7 +51,11 @@ public static T visit(P partner, Schema schema, AvroWithPartnerByStructur Preconditions.checkArgument( visitor.isStringType(keyType), "Invalid map: %s is not a string", keyType); - return visitor.map(partner, schema, visit(visitor.mapValueType(partner), schema.getValueType(), visitor)); + + visitor.parentSchemas.push(schema); + T result = visitWithName("value", visitor.mapValueType(partner), schema.getValueType(), visitor); + visitor.parentSchemas.pop(); + return visitor.map(partner, schema, result); default: return visitor.primitive(partner, schema); @@ -68,6 +72,7 @@ private static T visitRecord(P struct, Schema record, AvroWithPartnerBySt List fields = record.getFields(); visitor.recordLevels.push(name); + visitor.parentSchemas.push(record); List names = Lists.newArrayListWithExpectedSize(fields.size()); List results = Lists.newArrayListWithExpectedSize(fields.size()); @@ -77,11 +82,12 @@ private static T visitRecord(P struct, Schema record, AvroWithPartnerBySt Schema.Field field = fields.get(i); Preconditions.checkArgument(AvroSchemaUtil.makeCompatibleName(fieldName).equals(field.name()), "Structs do not match: field %s != %s", fieldName, field.name()); - results.add(visit(nameAndType.second(), field.schema(), visitor)); + results.add(visitWithName(field.name(), nameAndType.second(), field.schema(), visitor)); names.add(fieldName); } visitor.recordLevels.pop(); + visitor.parentSchemas.pop(); return visitor.record(struct, record, names, results); } @@ -102,17 +108,36 @@ private static T visitUnion(P type, Schema union, AvroWithPartnerByStruct } private static T visitArray(P type, Schema array, AvroWithPartnerByStructureVisitor visitor) { + T result; + if (array.getLogicalType() instanceof LogicalMap || visitor.isMapType(type)) { Preconditions.checkState( AvroSchemaUtil.isKeyValueSchema(array.getElementType()), "Cannot visit invalid logical map type: %s", array); List keyValueFields = array.getElementType().getFields(); - return visitor.map(type, array, - visit(visitor.mapKeyType(type), keyValueFields.get(0).schema(), visitor), - visit(visitor.mapValueType(type), keyValueFields.get(1).schema(), visitor)); + visitor.parentSchemas.push(array.getElementType()); + result = visitor.map(type, array, + visitWithName("key", visitor.mapKeyType(type), keyValueFields.get(0).schema(), visitor), + visitWithName("value", visitor.mapValueType(type), keyValueFields.get(1).schema(), visitor)); } else { - return visitor.array(type, array, visit(visitor.arrayElementType(type), array.getElementType(), visitor)); + visitor.parentSchemas.push(array); + + result = visitor.array(type, array, + visitWithName("element", visitor.arrayElementType(type), array.getElementType(), visitor)); + } + + visitor.parentSchemas.pop(); + return result; + } + + private static T visitWithName(String name, P partner, Schema schema, + AvroWithPartnerByStructureVisitor visitor) { + try { + visitor.fieldNames.addLast(name); + return visit(partner, schema, visitor); + } finally { + visitor.fieldNames.removeLast(); } } @@ -161,4 +186,17 @@ public T map(P sMap, Schema map, T value) { public T primitive(P type, Schema primitive) { return null; } + + // ---------------------------------- Helpers --------------------------------------------- + + private Deque fieldNames = Lists.newLinkedList(); + private Deque parentSchemas = Lists.newLinkedList(); + + protected String lastFieldName() { + return fieldNames.peekLast(); + } + + protected Schema parentSchema() { + return parentSchemas.peek(); + } } diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index 0498069a6bfb..8083495f2a57 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -21,14 +21,15 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class GenericAvroWriter implements DatumWriter { +class GenericAvroWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; GenericAvroWriter(Schema schema) { @@ -46,6 +47,11 @@ public void write(T datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroSchemaVisitor> { private WriteBuilder() { } @@ -62,9 +68,9 @@ public ValueWriter union(Schema union, List> options) { Preconditions.checkArgument(options.size() == 2, "Cannot create writer for non-option union: %s", union); if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); + return ValueWriters.option(0, options.get(1), union.getTypes().get(1).getType()); } else { - return ValueWriters.option(1, options.get(0)); + return ValueWriters.option(1, options.get(0), union.getTypes().get(0).getType()); } } @@ -80,29 +86,32 @@ public ValueWriter array(Schema array, ValueWriter elementWriter) { @Override public ValueWriter map(Schema map, ValueWriter valueWriter) { - return ValueWriters.map(ValueWriters.strings(), valueWriter); + int keyId = AvroSchemaUtil.getKeyId(map); + return ValueWriters.map(ValueWriters.strings(keyId), valueWriter); } @Override public ValueWriter primitive(Schema primitive) { + int fieldId = AvroSchemaUtil.fieldId(primitive, parentSchema(), this::lastFieldName); + LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { case "date": - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case "time-micros": - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case "timestamp-micros": - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + return ValueWriters.decimal(fieldId, decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return ValueWriters.uuids(fieldId); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); @@ -113,21 +122,21 @@ public ValueWriter primitive(Schema primitive) { case NULL: return ValueWriters.nulls(); case BOOLEAN: - return ValueWriters.booleans(); + return ValueWriters.booleans(fieldId); case INT: - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case LONG: - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case FLOAT: - return ValueWriters.floats(); + return ValueWriters.floats(fieldId); case DOUBLE: - return ValueWriters.doubles(); + return ValueWriters.doubles(fieldId); case STRING: - return ValueWriters.strings(); + return ValueWriters.strings(fieldId); case FIXED: - return ValueWriters.genericFixed(primitive.getFixedSize()); + return ValueWriters.genericFixed(fieldId, primitive.getFixedSize()); case BYTES: - return ValueWriters.byteBuffers(); + return ValueWriters.byteBuffers(fieldId); default: throw new IllegalArgumentException("Unsupported type: " + primitive); } diff --git a/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java b/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java new file mode 100644 index 000000000000..08aa194993ba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.stream.Stream; +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.FieldMetrics; + +/** + * Wrapper writer around {@link DatumWriter} with metrics support. + */ +public interface MetricsAwareDatumWriter extends DatumWriter { + + /** + * Returns a stream of {@link FieldMetrics} that this MetricsAwareDatumWriter keeps track of. + */ + Stream metrics(); +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java index 1753adfdae1f..3570c0b6cb38 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java @@ -20,8 +20,12 @@ package org.apache.iceberg.avro; import java.io.IOException; +import java.util.stream.Stream; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; public interface ValueWriter { void write(D datum, Encoder encoder) throws IOException; + + Stream metrics(); } diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java index 4e4d375ddd30..f387887db383 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriters.java @@ -24,18 +24,28 @@ import java.math.BigDecimal; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DecimalUtil; +import org.apache.iceberg.util.NaNUtil; public class ValueWriters { private ValueWriters() { @@ -45,68 +55,68 @@ public static ValueWriter nulls() { return NullWriter.INSTANCE; } - public static ValueWriter booleans() { - return BooleanWriter.INSTANCE; + public static ValueWriter booleans(int id) { + return new BooleanWriter(id); } - public static ValueWriter tinyints() { - return ByteToIntegerWriter.INSTANCE; + public static ValueWriter tinyints(int id) { + return new ByteToIntegerWriter(id); } - public static ValueWriter shorts() { - return ShortToIntegerWriter.INSTANCE; + public static ValueWriter shorts(int id) { + return new ShortToIntegerWriter(id); } - public static ValueWriter ints() { - return IntegerWriter.INSTANCE; + public static ValueWriter ints(int id) { + return new IntegerWriter(id); } - public static ValueWriter longs() { - return LongWriter.INSTANCE; + public static ValueWriter longs(int id) { + return new LongWriter(id); } - public static ValueWriter floats() { - return FloatWriter.INSTANCE; + public static ValueWriter floats(int id) { + return new FloatWriter(id); } - public static ValueWriter doubles() { - return DoubleWriter.INSTANCE; + public static ValueWriter doubles(int id) { + return new DoubleWriter(id); } - public static ValueWriter strings() { - return StringWriter.INSTANCE; + public static ValueWriter strings(int id) { + return new StringWriter(id); } - public static ValueWriter utf8s() { - return Utf8Writer.INSTANCE; + public static ValueWriter utf8s(int id) { + return new Utf8Writer(id); } - public static ValueWriter uuids() { - return UUIDWriter.INSTANCE; + public static ValueWriter uuids(int id) { + return new UUIDWriter(id); } - public static ValueWriter fixed(int length) { - return new FixedWriter(length); + public static ValueWriter fixed(int id, int length) { + return new FixedWriter(id, length); } - public static ValueWriter genericFixed(int length) { - return new GenericFixedWriter(length); + public static ValueWriter genericFixed(int id, int length) { + return new GenericFixedWriter(id, length); } - public static ValueWriter bytes() { - return BytesWriter.INSTANCE; + public static ValueWriter bytes(int id) { + return new BytesWriter(id); } - public static ValueWriter byteBuffers() { - return ByteBufferWriter.INSTANCE; + public static ValueWriter byteBuffers(int id) { + return new ByteBufferWriter(id); } - public static ValueWriter decimal(int precision, int scale) { - return new DecimalWriter(precision, scale); + public static ValueWriter decimal(int id, int precision, int scale) { + return new DecimalWriter(id, precision, scale); } - public static ValueWriter option(int nullIndex, ValueWriter writer) { - return new OptionWriter<>(nullIndex, writer); + public static ValueWriter option(int nullIndex, ValueWriter writer, Schema.Type type) { + return new OptionWriter<>(nullIndex, writer, type); } public static ValueWriter> array(ValueWriter elementWriter) { @@ -127,6 +137,11 @@ public static ValueWriter record(List> writers) { return new RecordWriter(writers); } + /** + * NullWriter is created as a placeholder so that when building writers from schema, + * visitor could use the existence of NullWriter for input verification when constructing optional fields. + * The actual writing of null values is handled by {@link OptionWriter}. + */ private static class NullWriter implements ValueWriter { private static final NullWriter INSTANCE = new NullWriter(); @@ -135,109 +150,106 @@ private NullWriter() { @Override public void write(Void ignored, Encoder encoder) throws IOException { - encoder.writeNull(); + throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro"); } - } - private static class BooleanWriter implements ValueWriter { - private static final BooleanWriter INSTANCE = new BooleanWriter(); + @Override + public Stream metrics() { + throw new IllegalStateException("[BUG] NullWriter shouldn't be used for writing nulls for Avro"); + } + } - private BooleanWriter() { + private static class BooleanWriter extends ComparableWriter { + private BooleanWriter(int id) { + super(id); } @Override - public void write(Boolean bool, Encoder encoder) throws IOException { + protected void writeVal(Boolean bool, Encoder encoder) throws IOException { encoder.writeBoolean(bool); } } - private static class ByteToIntegerWriter implements ValueWriter { - private static final ByteToIntegerWriter INSTANCE = new ByteToIntegerWriter(); - - private ByteToIntegerWriter() { + private static class ByteToIntegerWriter extends MetricsAwareTransformWriter { + private ByteToIntegerWriter(int id) { + super(id, Integer::compareTo, Byte::intValue); } @Override - public void write(Byte b, Encoder encoder) throws IOException { - encoder.writeInt(b.intValue()); + protected void writeVal(Integer intVal, Encoder encoder) throws IOException { + encoder.writeInt(intVal); } } - private static class ShortToIntegerWriter implements ValueWriter { - private static final ShortToIntegerWriter INSTANCE = new ShortToIntegerWriter(); - - private ShortToIntegerWriter() { + private static class ShortToIntegerWriter extends MetricsAwareTransformWriter { + private ShortToIntegerWriter(int id) { + super(id, Integer::compareTo, Short::intValue); } @Override - public void write(Short s, Encoder encoder) throws IOException { - encoder.writeInt(s.intValue()); + protected void writeVal(Integer intValue, Encoder encoder) throws IOException { + encoder.writeInt(intValue); } } - private static class IntegerWriter implements ValueWriter { - private static final IntegerWriter INSTANCE = new IntegerWriter(); - - private IntegerWriter() { + private static class IntegerWriter extends ComparableWriter { + private IntegerWriter(int id) { + super(id); } @Override - public void write(Integer i, Encoder encoder) throws IOException { + protected void writeVal(Integer i, Encoder encoder) throws IOException { encoder.writeInt(i); } } - private static class LongWriter implements ValueWriter { - private static final LongWriter INSTANCE = new LongWriter(); - - private LongWriter() { + private static class LongWriter extends ComparableWriter { + private LongWriter(int id) { + super(id); } @Override - public void write(Long l, Encoder encoder) throws IOException { + protected void writeVal(Long l, Encoder encoder) throws IOException { encoder.writeLong(l); } } - private static class FloatWriter implements ValueWriter { - private static final FloatWriter INSTANCE = new FloatWriter(); - - private FloatWriter() { + private static class FloatWriter extends FloatingPointWriter { + private FloatWriter(int id) { + super(id); } @Override - public void write(Float f, Encoder encoder) throws IOException { + protected void writeVal(Float f, Encoder encoder) throws IOException { encoder.writeFloat(f); } } - private static class DoubleWriter implements ValueWriter { - private static final DoubleWriter INSTANCE = new DoubleWriter(); - - private DoubleWriter() { + private static class DoubleWriter extends FloatingPointWriter { + private DoubleWriter(int id) { + super(id); } @Override - public void write(Double d, Encoder encoder) throws IOException { + protected void writeVal(Double d, Encoder encoder) throws IOException { encoder.writeDouble(d); } } - private static class StringWriter implements ValueWriter { - private static final StringWriter INSTANCE = new StringWriter(); - - private StringWriter() { + private static class StringWriter extends MetricsAwareWriter { + private StringWriter(int id) { + super(id, Comparators.charSequences()); } @Override - public void write(Object s, Encoder encoder) throws IOException { + public void write(CharSequence s, Encoder encoder) throws IOException { // use getBytes because it may return the backing byte array if available. // otherwise, it copies to a new byte array, which is still cheaper than Avro // calling toString, which incurs encoding costs if (s instanceof Utf8) { - encoder.writeString((Utf8) s); + super.write(s, encoder); } else if (s instanceof String) { - encoder.writeString(new Utf8((String) s)); + super.write(new Utf8((String) s), encoder); } else if (s == null) { throw new IllegalArgumentException("Cannot write null to required string column"); } else { @@ -245,35 +257,38 @@ public void write(Object s, Encoder encoder) throws IOException { "Cannot write unknown string type: " + s.getClass().getName() + ": " + s.toString()); } } - } - private static class Utf8Writer implements ValueWriter { - private static final Utf8Writer INSTANCE = new Utf8Writer(); + @Override + protected void writeVal(CharSequence s, Encoder encoder) throws IOException { + encoder.writeString((Utf8) s); + } + } - private Utf8Writer() { + private static class Utf8Writer extends ComparableWriter { + private Utf8Writer(int id) { + super(id); } @Override - public void write(Utf8 s, Encoder encoder) throws IOException { + protected void writeVal(Utf8 s, Encoder encoder) throws IOException { encoder.writeString(s); } } - private static class UUIDWriter implements ValueWriter { + private static class UUIDWriter extends MetricsAwareWriter { private static final ThreadLocal BUFFER = ThreadLocal.withInitial(() -> { ByteBuffer buffer = ByteBuffer.allocate(16); buffer.order(ByteOrder.BIG_ENDIAN); return buffer; }); - private static final UUIDWriter INSTANCE = new UUIDWriter(); - - private UUIDWriter() { + private UUIDWriter(int id) { + super(id, Comparators.forType(Types.UUIDType.get())); } @Override @SuppressWarnings("ByteBufferBackingArray") - public void write(UUID uuid, Encoder encoder) throws IOException { + protected void writeVal(UUID uuid, Encoder encoder) throws IOException { // TODO: direct conversion from string to byte buffer ByteBuffer buffer = BUFFER.get(); buffer.rewind(); @@ -283,73 +298,80 @@ public void write(UUID uuid, Encoder encoder) throws IOException { } } - private static class FixedWriter implements ValueWriter { + private static class FixedWriter extends MetricsAwareByteArrayWriter { private final int length; - private FixedWriter(int length) { + private FixedWriter(int id, int length) { + super(id); this.length = length; } @Override - public void write(byte[] bytes, Encoder encoder) throws IOException { + protected void writeVal(byte[] bytes, Encoder encoder) throws IOException { Preconditions.checkArgument(bytes.length == length, "Cannot write byte array of length %s as fixed[%s]", bytes.length, length); encoder.writeFixed(bytes); } } - private static class GenericFixedWriter implements ValueWriter { + private static class GenericFixedWriter extends ComparableWriter { private final int length; - private GenericFixedWriter(int length) { + private GenericFixedWriter(int id, int length) { + super(id); this.length = length; } @Override - public void write(GenericData.Fixed datum, Encoder encoder) throws IOException { + protected void writeVal(GenericData.Fixed datum, Encoder encoder) throws IOException { Preconditions.checkArgument(datum.bytes().length == length, "Cannot write byte array of length %s as fixed[%s]", datum.bytes().length, length); encoder.writeFixed(datum.bytes()); } - } - private static class BytesWriter implements ValueWriter { - private static final BytesWriter INSTANCE = new BytesWriter(); + @Override + public Stream metrics() { + // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics. + return metrics(fixed -> ByteBuffer.wrap(fixed.bytes())); + } + } - private BytesWriter() { + private static class BytesWriter extends MetricsAwareByteArrayWriter { + private BytesWriter(int id) { + super(id); } @Override - public void write(byte[] bytes, Encoder encoder) throws IOException { + protected void writeVal(byte[] bytes, Encoder encoder) throws IOException { encoder.writeBytes(bytes); } } - private static class ByteBufferWriter implements ValueWriter { - private static final ByteBufferWriter INSTANCE = new ByteBufferWriter(); - - private ByteBufferWriter() { + private static class ByteBufferWriter extends MetricsAwareWriter { + private ByteBufferWriter(int id) { + super(id, Comparators.unsignedBytes()); } @Override - public void write(ByteBuffer bytes, Encoder encoder) throws IOException { + protected void writeVal(ByteBuffer bytes, Encoder encoder) throws IOException { encoder.writeBytes(bytes); } } - private static class DecimalWriter implements ValueWriter { + private static class DecimalWriter extends ComparableWriter { private final int precision; private final int scale; private final ThreadLocal bytes; - private DecimalWriter(int precision, int scale) { + private DecimalWriter(int id, int precision, int scale) { + super(id); this.precision = precision; this.scale = scale; this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]); } @Override - public void write(BigDecimal decimal, Encoder encoder) throws IOException { + protected void writeVal(BigDecimal decimal, Encoder encoder) throws IOException { encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, decimal, bytes.get())); } } @@ -358,8 +380,10 @@ private static class OptionWriter implements ValueWriter { private final int nullIndex; private final int valueIndex; private final ValueWriter valueWriter; + private final Schema.Type type; + private long nullValueCount; - private OptionWriter(int nullIndex, ValueWriter valueWriter) { + private OptionWriter(int nullIndex, ValueWriter valueWriter, Schema.Type type) { this.nullIndex = nullIndex; if (nullIndex == 0) { this.valueIndex = 1; @@ -369,17 +393,42 @@ private OptionWriter(int nullIndex, ValueWriter valueWriter) { throw new IllegalArgumentException("Invalid option index: " + nullIndex); } this.valueWriter = valueWriter; + this.type = type; + this.nullValueCount = 0; } @Override public void write(T option, Encoder encoder) throws IOException { if (option == null) { encoder.writeIndex(nullIndex); + nullValueCount++; } else { encoder.writeIndex(valueIndex); valueWriter.write(option, encoder); } } + + @Override + public Stream metrics() { + if (AvroSchemaUtil.isMetricSupportedType(type)) { + return mergeNullCountIntoMetric(); + } else { + return valueWriter.metrics(); + } + } + + private Stream mergeNullCountIntoMetric() { + List fieldMetricsFromWriter = valueWriter.metrics().collect(Collectors.toList()); + Preconditions.checkState(fieldMetricsFromWriter.size() == 1, + "Optional field for type % shouldn't vend more than one field metrics", type); + + FieldMetrics metrics = fieldMetricsFromWriter.get(0); + return Stream.of( + new FieldMetrics(metrics.id(), + metrics.valueCount() + nullValueCount, nullValueCount, + metrics.nanValueCount(), metrics.lowerBound(), metrics.upperBound()) + ); + } } private static class CollectionWriter implements ValueWriter> { @@ -402,6 +451,11 @@ public void write(Collection array, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return elementWriter.metrics(); + } } private static class ArrayMapWriter implements ValueWriter> { @@ -428,6 +482,11 @@ public void write(Map map, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } private static class MapWriter implements ValueWriter> { @@ -454,6 +513,11 @@ public void write(Map map, Encoder encoder) throws IOException { } encoder.writeMapEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } public abstract static class StructWriter implements ValueWriter { @@ -479,6 +543,11 @@ public void write(S row, Encoder encoder) throws IOException { writers[i].write(get(row, i), encoder); } } + + @Override + public Stream metrics() { + return Arrays.stream(writers).flatMap(ValueWriter::metrics); + } } private static class RecordWriter extends StructWriter { @@ -492,4 +561,153 @@ protected Object get(IndexedRecord struct, int pos) { return struct.get(pos); } } + + private abstract static class FloatingPointWriter> + extends ComparableWriter { + private long nanValueCount; + + FloatingPointWriter(int id) { + super(id); + } + + @Override + public void write(T datum, Encoder encoder) throws IOException { + valueCount++; + + if (datum == null) { + nullValueCount++; + } else if (NaNUtil.isNaN(datum)) { + nanValueCount++; + } else { + if (max == null || datum.compareTo(max) > 0) { + this.max = datum; + } + if (min == null || datum.compareTo(min) < 0) { + this.min = datum; + } + } + + writeVal(datum, encoder); + } + + @Override + public Stream metrics() { + return Stream.of(new FieldMetrics(id, valueCount, nullValueCount, nanValueCount, min, max)); + } + } + + public abstract static class MetricsAwareStringWriter> extends ComparableWriter { + public MetricsAwareStringWriter(int id) { + super(id); + } + + @Override + public Stream metrics() { + // convert min/max to string to allow upper/lower bound truncation when gathering metrics, + // as in different implementations there's no guarantee that input to string writer will be char sequence + return metrics(Object::toString); + } + } + + private abstract static class MetricsAwareByteArrayWriter extends MetricsAwareWriter { + MetricsAwareByteArrayWriter(int id) { + super(id, Comparators.unsignedByteArray()); + } + + @Override + public Stream metrics() { + // convert min/max to byte buffer to allow upper/lower bound truncation when gathering metrics. + return metrics(ByteBuffer::wrap); + } + } + + public abstract static class ComparableWriter> extends MetricsAwareWriter { + public ComparableWriter(int id) { + super(id, Comparable::compareTo); + } + } + + /** + * A value writer wrapper that keeps track of column statistics (metrics) during writing. + * + * @param Input type + */ + public abstract static class MetricsAwareWriter extends MetricsAwareTransformWriter { + public MetricsAwareWriter(int id, Comparator comparator) { + super(id, comparator, Function.identity()); + } + + /** + * Helper class to transform the input type when collecting metrics. + * The transform function converts the stats information from the specific type that the underlying writer + * understands to a more general type that could be transformed to binary following iceberg single-value + * serialization spec. + * + * @param func tranformation function + * @return a stream of field metrics with bounds converted by the given transformation + */ + protected Stream metrics(Function func) { + return Stream.of(new FieldMetrics(id, valueCount, nullValueCount, 0, + updateBound(min, func), updateBound(max, func))); + } + + private T4 updateBound(T3 bound, Function func) { + return bound == null ? null : func.apply(bound); + } + } + + /** + * A value writer wrapper that keeps track of column statistics (metrics) during writing, and accepts a + * transformation in its constructor. + * The transformation will apply to the input data to produce the type that the underlying writer accepts. + * Stats will also be tracked with the type after transformation. + * + * @param Input type + * @param Type after transformation + */ + @SuppressWarnings("checkstyle:VisibilityModifier") + public abstract static class MetricsAwareTransformWriter implements ValueWriter { + protected final int id; + protected long valueCount; + protected long nullValueCount; + protected T2 max; + protected T2 min; + protected final Function transformation; + + private final Comparator comparator; + + public MetricsAwareTransformWriter(int id, Comparator comparator, Function func) { + this.id = id; + this.comparator = comparator; + this.transformation = func; + } + + @Override + public void write(T1 datum, Encoder encoder) throws IOException { + valueCount++; + if (datum == null) { + nullValueCount++; + writeVal(null, encoder); + + } else { + T2 transformedDatum = transformation.apply(datum); + if (max == null || comparator.compare(transformedDatum, max) > 0) { + max = transformedDatum; + } + if (min == null || comparator.compare(transformedDatum, min) < 0) { + min = transformedDatum; + } + writeVal(transformedDatum, encoder); + + } + } + + protected abstract void writeVal(T2 datum, Encoder encoder) throws IOException; + + @Override + public Stream metrics() { + return Stream.of(new FieldMetrics(id, valueCount, nullValueCount, 0, min, max)); + } + } + } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index aa6cc6fcd339..5a475bd7914b 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -21,19 +21,21 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.avro.AvroSchemaVisitor; import org.apache.iceberg.avro.LogicalMap; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements DatumWriter { +public class DataWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; public static DataWriter create(Schema schema) { @@ -59,6 +61,11 @@ protected ValueWriter createStructWriter(List> fields) { return GenericWriters.struct(fields); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private class WriteBuilder extends AvroSchemaVisitor> { @Override public ValueWriter record(Schema record, List names, List> fields) { @@ -72,9 +79,9 @@ public ValueWriter union(Schema union, List> options) { Preconditions.checkArgument(options.size() == 2, "Cannot create writer for non-option union: %s", union); if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); + return ValueWriters.option(0, options.get(1), union.getTypes().get(1).getType()); } else { - return ValueWriters.option(1, options.get(0)); + return ValueWriters.option(1, options.get(0), union.getTypes().get(0).getType()); } } @@ -90,32 +97,35 @@ public ValueWriter array(Schema array, ValueWriter elementWriter) { @Override public ValueWriter map(Schema map, ValueWriter valueWriter) { - return ValueWriters.map(ValueWriters.strings(), valueWriter); + int keyId = AvroSchemaUtil.getKeyId(map); + return ValueWriters.map(ValueWriters.strings(keyId), valueWriter); } @Override public ValueWriter primitive(Schema primitive) { + int fieldId = AvroSchemaUtil.fieldId(primitive, parentSchema(), this::lastFieldName); + LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { case "date": - return GenericWriters.dates(); + return GenericWriters.dates(fieldId); case "time-micros": - return GenericWriters.times(); + return GenericWriters.times(fieldId); case "timestamp-micros": if (AvroSchemaUtil.isTimestamptz(primitive)) { - return GenericWriters.timestamptz(); + return GenericWriters.timestamptz(fieldId); } - return GenericWriters.timestamps(); + return GenericWriters.timestamps(fieldId); case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return ValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + return ValueWriters.decimal(fieldId, decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return ValueWriters.uuids(fieldId); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); @@ -126,21 +136,21 @@ public ValueWriter primitive(Schema primitive) { case NULL: return ValueWriters.nulls(); case BOOLEAN: - return ValueWriters.booleans(); + return ValueWriters.booleans(fieldId); case INT: - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case LONG: - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case FLOAT: - return ValueWriters.floats(); + return ValueWriters.floats(fieldId); case DOUBLE: - return ValueWriters.doubles(); + return ValueWriters.doubles(fieldId); case STRING: - return ValueWriters.strings(); + return ValueWriters.strings(fieldId); case FIXED: - return ValueWriters.fixed(primitive.getFixedSize()); + return ValueWriters.fixed(fieldId, primitive.getFixedSize()); case BYTES: - return ValueWriters.byteBuffers(); + return ValueWriters.byteBuffers(fieldId); default: throw new IllegalArgumentException("Unsupported type: " + primitive); } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java index f8638946ccaf..17587509e878 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/GenericWriters.java @@ -37,20 +37,20 @@ class GenericWriters { private GenericWriters() { } - static ValueWriter dates() { - return DateWriter.INSTANCE; + static ValueWriter dates(int id) { + return new DateWriter(id); } - static ValueWriter times() { - return TimeWriter.INSTANCE; + static ValueWriter times(int id) { + return new TimeWriter(id); } - static ValueWriter timestamps() { - return TimestampWriter.INSTANCE; + static ValueWriter timestamps(int id) { + return new TimestampWriter(id); } - static ValueWriter timestamptz() { - return TimestamptzWriter.INSTANCE; + static ValueWriter timestamptz(int id) { + return new TimestamptzWriter(id); } static ValueWriter struct(List> writers) { @@ -60,51 +60,49 @@ static ValueWriter struct(List> writers) { private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate(); - private static class DateWriter implements ValueWriter { - private static final DateWriter INSTANCE = new DateWriter(); - - private DateWriter() { + private static class DateWriter extends ValueWriters.MetricsAwareTransformWriter { + private DateWriter(int id) { + super(id, Comparable::compareTo, date -> (int) ChronoUnit.DAYS.between(EPOCH_DAY, date)); } @Override - public void write(LocalDate date, Encoder encoder) throws IOException { - encoder.writeInt((int) ChronoUnit.DAYS.between(EPOCH_DAY, date)); + protected void writeVal(Integer date, Encoder encoder) throws IOException { + encoder.writeInt(date); } } - private static class TimeWriter implements ValueWriter { - private static final TimeWriter INSTANCE = new TimeWriter(); - - private TimeWriter() { + private static class TimeWriter extends ValueWriters.MetricsAwareTransformWriter { + private TimeWriter(int id) { + super(id, Comparable::compareTo, time -> time.toNanoOfDay() / 1000); } @Override - public void write(LocalTime time, Encoder encoder) throws IOException { - encoder.writeLong(time.toNanoOfDay() / 1000); + protected void writeVal(Long time, Encoder encoder) throws IOException { + encoder.writeLong(time); } } - private static class TimestampWriter implements ValueWriter { - private static final TimestampWriter INSTANCE = new TimestampWriter(); - - private TimestampWriter() { + private static class TimestampWriter extends ValueWriters.MetricsAwareTransformWriter { + private TimestampWriter(int id) { + super(id, Comparable::compareTo, + timestamp -> ChronoUnit.MICROS.between(EPOCH, timestamp.atOffset(ZoneOffset.UTC))); } @Override - public void write(LocalDateTime timestamp, Encoder encoder) throws IOException { - encoder.writeLong(ChronoUnit.MICROS.between(EPOCH, timestamp.atOffset(ZoneOffset.UTC))); + protected void writeVal(Long timestamp, Encoder encoder) throws IOException { + encoder.writeLong(timestamp); } } - private static class TimestamptzWriter implements ValueWriter { - private static final TimestamptzWriter INSTANCE = new TimestamptzWriter(); - - private TimestamptzWriter() { + private static class TimestamptzWriter extends ValueWriters.MetricsAwareTransformWriter { + private TimestamptzWriter(int id) { + super(id, Comparable::compareTo, + timestamptz -> ChronoUnit.MICROS.between(EPOCH, timestamptz)); } @Override - public void write(OffsetDateTime timestamptz, Encoder encoder) throws IOException { - encoder.writeLong(ChronoUnit.MICROS.between(EPOCH, timestamptz)); + protected void writeVal(Long timestamptz, Encoder encoder) throws IOException { + encoder.writeLong(timestamptz); } } diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index f1a65f6202fd..7642242a42fd 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -274,7 +274,11 @@ public void testMetricsForNestedStructFields() throws IOException { assertBounds(6, BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); assertCounts(7, 1L, 0L, 1L, metrics); - assertBounds(7, DoubleType.get(), Double.NaN, Double.NaN, metrics); + if (fileFormat() == FileFormat.AVRO) { + assertBounds(7, Types.DoubleType.get(), null, null, metrics); + } else { + assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); + } } private Record buildNestedTestRecord() { @@ -325,11 +329,20 @@ public void testMetricsForListAndMapElements() throws IOException { assertCounts(4, null, null, metrics); assertCounts(6, null, null, metrics); } - assertBounds(1, IntegerType.get(), null, null, metrics); - assertBounds(2, StringType.get(), null, null, metrics); - assertBounds(4, IntegerType.get(), null, null, metrics); - assertBounds(6, StringType.get(), null, null, metrics); - assertBounds(7, structType, null, null, metrics); + + if (fileFormat() == FileFormat.AVRO) { + assertBounds(1, IntegerType.get(), 1, 1, metrics); + assertBounds(2, StringType.get(), CharBuffer.wrap("BBB"), CharBuffer.wrap("BBB"), metrics); + assertBounds(4, IntegerType.get(), 10, 12, metrics); + assertBounds(6, StringType.get(), CharBuffer.wrap("4"), CharBuffer.wrap("4"), metrics); + assertBounds(7, structType, null, null, metrics); + } else { + assertBounds(1, IntegerType.get(), null, null, metrics); + assertBounds(2, StringType.get(), null, null, metrics); + assertBounds(4, IntegerType.get(), null, null, metrics); + assertBounds(6, StringType.get(), null, null, metrics); + assertBounds(7, structType, null, null, metrics); + } } @Test @@ -355,8 +368,13 @@ public void testMetricsForNaNColumns() throws IOException { assertCounts(1, 2L, 0L, 2L, metrics); assertCounts(2, 2L, 0L, 2L, metrics); // below: current behavior; will be null once NaN is excluded from upper/lower bound - assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); - assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); + if (fileFormat() == FileFormat.AVRO) { + assertBounds(1, FloatType.get(), null, null, metrics); + assertBounds(2, DoubleType.get(), null, null, metrics); + } else { + assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); + assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); + } } @Test @@ -372,9 +390,12 @@ public void testColumnBoundsWithNaNValueAtFront() throws IOException { if (fileFormat() == FileFormat.ORC) { assertBounds(1, FloatType.get(), Float.NaN, Float.NaN, metrics); assertBounds(2, DoubleType.get(), Double.NaN, Double.NaN, metrics); - } else { + } else if (fileFormat() == FileFormat.PARQUET) { assertBounds(1, FloatType.get(), 1.2F, Float.NaN, metrics); assertBounds(2, DoubleType.get(), 3.4D, Double.NaN, metrics); + } else { // avro + assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); + assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } } @@ -388,7 +409,7 @@ public void testColumnBoundsWithNaNValueInMiddle() throws IOException { // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's // behaviors differ due to their implementation of comparison being different. - if (fileFormat() == FileFormat.ORC) { + if (fileFormat() != FileFormat.PARQUET) { assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } else { @@ -407,7 +428,7 @@ public void testColumnBoundsWithNaNValueAtEnd() throws IOException { // below: current behavior; will be non-NaN values once NaN is excluded from upper/lower bound. ORC and Parquet's // behaviors differ due to their implementation of comparison being different. - if (fileFormat() == FileFormat.ORC) { + if (fileFormat() != FileFormat.PARQUET) { assertBounds(1, FloatType.get(), 1.2F, 5.6F, metrics); assertBounds(2, DoubleType.get(), 3.4D, 7.8D, metrics); } else { @@ -516,7 +537,7 @@ public void testNoneMetricsMode() throws IOException { MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "none")), buildNestedTestRecord()); Assert.assertEquals(1L, (long) metrics.recordCount()); - Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + assertNonNullColumnSizes(metrics); assertCounts(1, null, null, metrics); assertBounds(1, Types.IntegerType.get(), null, null, metrics); assertCounts(3, null, null, metrics); @@ -536,7 +557,7 @@ public void testCountsMetricsMode() throws IOException { MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "counts")), buildNestedTestRecord()); Assert.assertEquals(1L, (long) metrics.recordCount()); - Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + assertNonNullColumnSizes(metrics); assertCounts(1, 1L, 0L, metrics); assertBounds(1, Types.IntegerType.get(), null, null, metrics); assertCounts(3, 1L, 0L, metrics); @@ -556,7 +577,7 @@ public void testFullMetricsMode() throws IOException { MetricsConfig.fromProperties(ImmutableMap.of("write.metadata.metrics.default", "full")), buildNestedTestRecord()); Assert.assertEquals(1L, (long) metrics.recordCount()); - Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + assertNonNullColumnSizes(metrics); assertCounts(1, 1L, 0L, metrics); assertBounds(1, Types.IntegerType.get(), Integer.MAX_VALUE, Integer.MAX_VALUE, metrics); assertCounts(3, 1L, 0L, metrics); @@ -567,7 +588,11 @@ public void testFullMetricsMode() throws IOException { assertBounds(6, Types.BinaryType.get(), ByteBuffer.wrap("A".getBytes()), ByteBuffer.wrap("A".getBytes()), metrics); assertCounts(7, 1L, 0L, 1L, metrics); - assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); + if (fileFormat() == FileFormat.AVRO) { + assertBounds(7, Types.DoubleType.get(), null, null, metrics); + } else { + assertBounds(7, Types.DoubleType.get(), Double.NaN, Double.NaN, metrics); + } } @Test @@ -589,7 +614,7 @@ public void testTruncateStringMetricsMode() throws IOException { CharBuffer expectedMinBound = CharBuffer.wrap("Lorem ipsu"); CharBuffer expectedMaxBound = CharBuffer.wrap("Lorem ipsv"); Assert.assertEquals(1L, (long) metrics.recordCount()); - Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + assertNonNullColumnSizes(metrics); assertCounts(1, 1L, 0L, metrics); assertBounds(1, Types.StringType.get(), expectedMinBound, expectedMaxBound, metrics); } @@ -613,7 +638,7 @@ public void testTruncateBinaryMetricsMode() throws IOException { ByteBuffer expectedMinBounds = ByteBuffer.wrap(new byte[]{ 0x1, 0x2, 0x3, 0x4, 0x5 }); ByteBuffer expectedMaxBounds = ByteBuffer.wrap(new byte[]{ 0x1, 0x2, 0x3, 0x4, 0x6 }); Assert.assertEquals(1L, (long) metrics.recordCount()); - Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + assertNonNullColumnSizes(metrics); assertCounts(1, 1L, 0L, metrics); assertBounds(1, Types.BinaryType.get(), expectedMinBounds, expectedMaxBounds, metrics); } @@ -645,4 +670,10 @@ protected void assertBounds(int fieldId, Type type, T lowerBound, T upperBou upperBounds.containsKey(fieldId) ? fromByteBuffer(type, upperBounds.get(fieldId)) : null); } + private void assertNonNullColumnSizes(Metrics metrics) { + if (fileFormat() != FileFormat.AVRO) { + Assert.assertTrue(metrics.columnSizes().values().stream().allMatch(Objects::nonNull)); + } + } + } diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index 96217229d879..aa1721a047de 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -90,7 +90,7 @@ public abstract class TestMergingMetrics { @Parameterized.Parameters(name = "fileFormat = {0}") public static Object[] parameters() { - return new Object[] {FileFormat.PARQUET }; + return new Object[] {FileFormat.PARQUET, FileFormat.AVRO}; } public TestMergingMetrics(FileFormat fileFormat) { diff --git a/data/src/test/java/org/apache/iceberg/avro/TestAvroMetrics.java b/data/src/test/java/org/apache/iceberg/avro/TestAvroMetrics.java new file mode 100644 index 000000000000..622fdd464f76 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/avro/TestAvroMetrics.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; +import org.apache.iceberg.TestMetrics; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class TestAvroMetrics extends TestMetrics { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Override + public FileFormat fileFormat() { + return FileFormat.AVRO; + } + + @Override + public Metrics getMetrics(Schema schema, MetricsConfig metricsConfig, Record... records) throws IOException { + return getMetrics(schema, createOutputFile(), ImmutableMap.of(), metricsConfig, records); + } + + @Override + public Metrics getMetrics(Schema schema, Record... records) throws IOException { + return getMetrics(schema, MetricsConfig.getDefault(), records); + } + + private Metrics getMetrics(Schema schema, OutputFile file, Map properties, + MetricsConfig metricsConfig, Record... records) throws IOException { + FileAppender writer = Avro.write(file) + .schema(schema) + .setAll(properties) + .createWriterFunc(DataWriter::create) + .metricsConfig(metricsConfig) + .build(); + try (FileAppender appender = writer) { + appender.addAll(Lists.newArrayList(records)); + } + return writer.metrics(); + } + + @Override + protected Metrics getMetricsForRecordsWithSmallRowGroups(Schema schema, OutputFile outputFile, Record... records) { + throw new UnsupportedOperationException("supportsSmallRowGroups = " + supportsSmallRowGroups()); + } + + + @Override + public int splitCount(InputFile inputFile) throws IOException { + return 0; + } + + @Override + protected OutputFile createOutputFile() throws IOException { + File tmpFolder = temp.newFolder("avro"); + String filename = UUID.randomUUID().toString(); + return Files.localOutput(new File(tmpFolder, FileFormat.AVRO.addExtension(filename))); + } +} diff --git a/data/src/test/java/org/apache/iceberg/avro/TestAvroMetricsBounds.java b/data/src/test/java/org/apache/iceberg/avro/TestAvroMetricsBounds.java new file mode 100644 index 000000000000..a8b55f81a43c --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/avro/TestAvroMetricsBounds.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.types.Conversions; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DateTimeUtil; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public abstract class TestAvroMetricsBounds { + // all supported types, except for UUID which is on deprecation path: see https://github.com/apache/iceberg/pull/1611 + protected static final Types.NestedField INT_FIELD = optional(1, "id", Types.IntegerType.get()); + protected static final Types.NestedField STRING_FIELD = optional(2, "string", Types.StringType.get()); + protected static final Types.NestedField FLOAT_FIELD = optional(3, "float", Types.FloatType.get()); + protected static final Types.NestedField DOUBLE_FIELD = optional(4, "double", Types.DoubleType.get()); + protected static final Types.NestedField DECIMAL_FIELD = optional(5, "decimal", Types.DecimalType.of(5, 3)); + protected static final Types.NestedField FIXED_FIELD = optional(6, "fixed", Types.FixedType.ofLength(4)); + protected static final Types.NestedField BINARY_FIELD = optional(7, "binary", Types.BinaryType.get()); + protected static final Types.NestedField LONG_FIELD = optional(8, "long", Types.LongType.get()); + protected static final Types.NestedField BOOLEAN_FIELD = optional(9, "boolean", Types.BooleanType.get()); + protected static final Types.NestedField DATE_FIELD = optional(10, "date", Types.DateType.get()); + protected static final Types.NestedField TIME_FIELD = optional(11, "time", Types.TimeType.get()); + protected static final Types.NestedField TIMESTAMPZ_FIELD = optional(12, "timestampz", + Types.TimestampType.withZone()); + protected static final Types.NestedField TIMESTAMP_FIELD = optional(13, "timestamp", + Types.TimestampType.withoutZone()); + + protected static final Map SINGLE_FIELD_SCHEMA_MAP = populateSingleFieldSchemaMap(); + + private static Map populateSingleFieldSchemaMap() { + Set allTypes = new HashSet<>(); + allTypes.add(INT_FIELD); + allTypes.add(STRING_FIELD); + allTypes.add(FLOAT_FIELD); + allTypes.add(DOUBLE_FIELD); + allTypes.add(DECIMAL_FIELD); + allTypes.add(FIXED_FIELD); + allTypes.add(BINARY_FIELD); + allTypes.add(LONG_FIELD); + allTypes.add(BOOLEAN_FIELD); + allTypes.add(DATE_FIELD); + allTypes.add(TIME_FIELD); + allTypes.add(TIMESTAMPZ_FIELD); + allTypes.add(TIMESTAMP_FIELD); + return allTypes.stream().collect(Collectors.toMap(Function.identity(), Schema::new)); + } + + protected Metrics writeAndGetMetrics(Types.NestedField field, Record... records) throws IOException { + return writeAndGetMetrics(SINGLE_FIELD_SCHEMA_MAP.get(field), records); + } + + protected abstract Metrics writeAndGetMetrics(Schema schema, Record... records) throws IOException; + + protected boolean supportTimeWithoutZoneField() { + return true; + } + + protected boolean supportTimeField() { + return true; + } + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void testIntFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(INT_FIELD, + createRecord(INT_FIELD, 3), + createRecord(INT_FIELD, 8), + createRecord(INT_FIELD, -1), + createRecord(INT_FIELD, 0)); + assertBounds(INT_FIELD, -1, 8, metrics); + } + + @Test + public void testStringFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(STRING_FIELD, + createRecord(STRING_FIELD, "aaa"), + createRecord(STRING_FIELD, "aab"), + createRecord(STRING_FIELD, "az"), + createRecord(STRING_FIELD, "a")); + assertBounds(STRING_FIELD, CharBuffer.wrap("a"), CharBuffer.wrap("az"), metrics); + } + + @Test + public void testFloatFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(FLOAT_FIELD, + createRecord(FLOAT_FIELD, 3.2F), + createRecord(FLOAT_FIELD, 8.9F), + createRecord(FLOAT_FIELD, -1.1F), + createRecord(FLOAT_FIELD, Float.NaN), + createRecord(FLOAT_FIELD, 0F)); + assertBounds(FLOAT_FIELD, -1.1F, 8.9F, metrics); + } + + @Test + public void testDoubleFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(DOUBLE_FIELD, + createRecord(DOUBLE_FIELD, 3.2D), + createRecord(DOUBLE_FIELD, 8.9D), + createRecord(DOUBLE_FIELD, -1.1D), + createRecord(DOUBLE_FIELD, Double.NaN), + createRecord(DOUBLE_FIELD, 0D)); + assertBounds(DOUBLE_FIELD, -1.1D, 8.9D, metrics); + } + + @Test + public void testDecimalFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(DECIMAL_FIELD, + createRecord(DECIMAL_FIELD, new BigDecimal("3.599")), + createRecord(DECIMAL_FIELD, new BigDecimal("3.600")), + createRecord(DECIMAL_FIELD, new BigDecimal("-25.613")), + createRecord(DECIMAL_FIELD, new BigDecimal("-25.614"))); + assertBounds(DECIMAL_FIELD, new BigDecimal("-25.614"), new BigDecimal("3.600"), metrics); + } + + @Test + public void testFixedFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(FIXED_FIELD, + createRecord(FIXED_FIELD, bytes("abcd")), + createRecord(FIXED_FIELD, bytes("abcz")), + createRecord(FIXED_FIELD, bytes("abce")), + createRecord(FIXED_FIELD, bytes("aacd"))); + assertBounds(FIXED_FIELD, byteBuffer("aacd"), byteBuffer("abcz"), metrics); + } + + @Test + public void testBinaryFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(BINARY_FIELD, + createRecord(BINARY_FIELD, ByteBuffer.wrap(new byte[] { 0x0A, 0x01})), + createRecord(BINARY_FIELD, ByteBuffer.wrap(new byte[] { 0x01, 0x0A, 0x0B })), + createRecord(BINARY_FIELD, ByteBuffer.wrap(new byte[] { 0x10 })), + createRecord(BINARY_FIELD, ByteBuffer.wrap(new byte[] { 0x10, 0x01 }))); + assertBounds(BINARY_FIELD, + ByteBuffer.wrap(new byte[] { 0x01, 0x0A, 0x0B }), + ByteBuffer.wrap(new byte[] { 0x10, 0x01 }), + metrics); + } + + @Test + public void testLongFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(LONG_FIELD, + createRecord(LONG_FIELD, 52L), + createRecord(LONG_FIELD, 8L), + createRecord(LONG_FIELD, -1L), + createRecord(LONG_FIELD, 0L)); + assertBounds(LONG_FIELD, -1L, 52L, metrics); + } + + @Test + public void testBooleanFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(BOOLEAN_FIELD, + createRecord(BOOLEAN_FIELD, false), + createRecord(BOOLEAN_FIELD, false), + createRecord(BOOLEAN_FIELD, false), + createRecord(BOOLEAN_FIELD, true)); + assertBounds(BOOLEAN_FIELD, false, true, metrics); + } + + @Test + public void testDateFieldBound() throws IOException { + Metrics metrics = writeAndGetMetrics(DATE_FIELD, + createRecord(DATE_FIELD, DateTimeUtil.dateFromDays(-1)), + createRecord(DATE_FIELD, DateTimeUtil.dateFromDays(32)), + createRecord(DATE_FIELD, DateTimeUtil.dateFromDays(0)), + createRecord(DATE_FIELD, DateTimeUtil.dateFromDays(1276))); + assertBounds(DATE_FIELD, -1, 1276, metrics); + } + + @Test + public void testDateTimeBound() throws IOException { + Assume.assumeTrue("Skip test for engine that do not support time field", supportTimeField()); + + Metrics metrics = writeAndGetMetrics(TIME_FIELD, + createRecord(TIME_FIELD, DateTimeUtil.timeFromMicros(3000L)), + createRecord(TIME_FIELD, DateTimeUtil.timeFromMicros(80000000000L)), + createRecord(TIME_FIELD, DateTimeUtil.timeFromMicros(0L)), + createRecord(TIME_FIELD, DateTimeUtil.timeFromMicros(27000L))); + assertBounds(TIME_FIELD, 0L, 80000000000L, metrics); + } + + @Test + public void testDateTimestampBound() throws IOException { + Assume.assumeTrue("Skip test for engine that do not support timestamp without time zone field", + supportTimeWithoutZoneField()); + Metrics metrics = writeAndGetMetrics(TIMESTAMP_FIELD, + createRecord(TIMESTAMP_FIELD, DateTimeUtil.timestampFromMicros(3000L)), + createRecord(TIMESTAMP_FIELD, DateTimeUtil.timestampFromMicros(80000000000L)), + createRecord(TIMESTAMP_FIELD, DateTimeUtil.timestampFromMicros(-1L)), + createRecord(TIMESTAMP_FIELD, DateTimeUtil.timestampFromMicros(3000L))); + assertBounds(TIMESTAMP_FIELD, -1L, 80000000000L, metrics); + } + + @Test + public void testDateTimestampWithZoneBound() throws IOException { + Metrics metrics = writeAndGetMetrics(TIMESTAMPZ_FIELD, + createRecord(TIMESTAMPZ_FIELD, DateTimeUtil.timestamptzFromMicros(3000L)), + createRecord(TIMESTAMPZ_FIELD, DateTimeUtil.timestamptzFromMicros(80000000000L)), + createRecord(TIMESTAMPZ_FIELD, DateTimeUtil.timestamptzFromMicros(-1L)), + createRecord(TIMESTAMPZ_FIELD, DateTimeUtil.timestamptzFromMicros(3000L))); + assertBounds(TIMESTAMPZ_FIELD, -1L, 80000000000L, metrics); + } + + @Test + public void testArrayBound() throws IOException { + Types.NestedField listType = optional(1, "list", + Types.ListType.ofRequired(2, Types.IntegerType.get())); + Schema schema = new Schema(listType); + Metrics metrics = writeAndGetMetrics(schema, + createRecord(schema, "list", ImmutableList.of(3, 2, 1)), + createRecord(schema, "list", ImmutableList.of(6, 4, 5)), + createRecord(schema, "list", ImmutableList.of(8, 9, 7))); + + assertNullBounds(1, metrics); + assertBounds(2, Types.IntegerType.get(), 1, 9, metrics); + } + + @Test + public void testMapBound() throws IOException { + Types.NestedField mapField = optional(1, "map", + Types.MapType.ofOptional(2, 3, Types.StringType.get(), + Types.ListType.ofRequired(4, Types.IntegerType.get()))); + Schema schema = new Schema(mapField); + Metrics metrics = writeAndGetMetrics(schema, + createRecord(schema, "map", ImmutableMap.of("az", ImmutableList.of(3, 2, 1))), + createRecord(schema, "map", ImmutableMap.of("aaaaa", ImmutableList.of(6, 4, 5))), + createRecord(schema, "map", ImmutableMap.of("ab", ImmutableList.of(8, 9, 7)))); + + assertNullBounds(1, metrics); + assertBounds(2, Types.StringType.get(), CharBuffer.wrap("aaaaa"), CharBuffer.wrap("az"), metrics); + assertNullBounds(3, metrics); + assertBounds(4, Types.IntegerType.get(), 1, 9, metrics); + } + + @Test + public void testStructBound() throws IOException { + Types.StructType leafField = Types.StructType.of( + optional(2, "string", Types.StringType.get()), + optional(3, "list", Types.ListType.ofRequired(4, Types.IntegerType.get()))); + Types.NestedField structField = optional(1, "struct", leafField); + + Schema schema = new Schema(structField); + + Record leafStruct1 = GenericRecord.create(leafField); + leafStruct1.setField("string", "az"); + leafStruct1.setField("list", ImmutableList.of(3, 2, 1)); + + Record leafStruct2 = GenericRecord.create(leafField); + leafStruct2.setField("string", "aaaaa"); + leafStruct2.setField("list", ImmutableList.of(6, 5, 4)); + + Record leafStruct3 = GenericRecord.create(leafField); + leafStruct3.setField("string", "ab"); + leafStruct3.setField("list", ImmutableList.of(8, 9, 7)); + + Metrics metrics = writeAndGetMetrics(schema, + createRecord(schema, "struct", leafStruct1), + createRecord(schema, "struct", leafStruct2), + createRecord(schema, "struct", leafStruct3)); + + assertNullBounds(1, metrics); + assertBounds(2, Types.StringType.get(), CharBuffer.wrap("aaaaa"), CharBuffer.wrap("az"), metrics); + assertNullBounds(3, metrics); + assertBounds(4, Types.IntegerType.get(), 1, 9, metrics); + } + + private ByteBuffer byteBuffer(String str) { + return ByteBuffer.wrap(bytes(str)); + } + + private byte[] bytes(String str) { + return str.getBytes(StandardCharsets.UTF_8); + } + + private Record createRecord(Types.NestedField field, Object value) { + return createRecord(SINGLE_FIELD_SCHEMA_MAP.get(field), field.name(), value); + } + + private Record createRecord(Schema schema, String name, Object value) { + Record record = GenericRecord.create(schema); + record.setField(name, value); + return record; + + } + + private void assertBounds(Types.NestedField field, T lowerBound, T upperBound, Metrics metrics) { + assertBounds(field.fieldId(), field.type(), lowerBound, upperBound, metrics); + } + + private void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { + Map lowerBounds = metrics.lowerBounds(); + Map upperBounds = metrics.upperBounds(); + + Assert.assertEquals( + lowerBound, + lowerBounds.containsKey(fieldId) ? Conversions.fromByteBuffer(type, lowerBounds.get(fieldId)) : null); + Assert.assertEquals( + upperBound, + upperBounds.containsKey(fieldId) ? Conversions.fromByteBuffer(type, upperBounds.get(fieldId)) : null); + } + + private void assertNullBounds(int fieldId, Metrics metrics) { + Assert.assertNull(metrics.lowerBounds().get(fieldId)); + Assert.assertNull(metrics.upperBounds().get(fieldId)); + } +} diff --git a/data/src/test/java/org/apache/iceberg/avro/TestGenericAvroAppenderMetricsBounds.java b/data/src/test/java/org/apache/iceberg/avro/TestGenericAvroAppenderMetricsBounds.java new file mode 100644 index 000000000000..6100019d29a9 --- /dev/null +++ b/data/src/test/java/org/apache/iceberg/avro/TestGenericAvroAppenderMetricsBounds.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppender; + +public class TestGenericAvroAppenderMetricsBounds extends TestAvroMetricsBounds { + + @Override + protected Metrics writeAndGetMetrics(Schema schema, Record... records) throws IOException { + FileAppender appender = new GenericAppenderFactory(schema).newAppender( + org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.AVRO); + try (FileAppender fileAppender = appender) { + Arrays.stream(records).forEach(fileAppender::add); + } + return appender.metrics(); + } + +} diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java index 0dc4e039f46c..29399008df71 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java @@ -23,18 +23,21 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class FlinkAvroWriter implements DatumWriter { +public class FlinkAvroWriter implements MetricsAwareDatumWriter { private final RowType rowType; private ValueWriter writer = null; @@ -54,6 +57,11 @@ public void write(RowData datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroWithFlinkSchemaVisitor> { @Override public ValueWriter record(LogicalType struct, Schema record, List names, List> fields) { @@ -68,9 +76,9 @@ public ValueWriter union(LogicalType type, Schema union, List> Preconditions.checkArgument(options.size() == 2, "Cannot create writer for non-option union: %s", union); if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); + return ValueWriters.option(0, options.get(1), union.getTypes().get(1).getType()); } else { - return ValueWriters.option(1, options.get(0)); + return ValueWriters.option(1, options.get(0), union.getTypes().get(0).getType()); } } @@ -81,7 +89,8 @@ public ValueWriter array(LogicalType sArray, Schema array, ValueWriter ele @Override public ValueWriter map(LogicalType sMap, Schema map, ValueWriter valueReader) { - return FlinkValueWriters.map(FlinkValueWriters.strings(), mapKeyType(sMap), valueReader, mapValueType(sMap)); + int keyId = AvroSchemaUtil.getKeyId(map); + return FlinkValueWriters.map(FlinkValueWriters.strings(keyId), mapKeyType(sMap), valueReader, mapValueType(sMap)); } @Override @@ -91,24 +100,26 @@ public ValueWriter map(LogicalType sMap, Schema map, ValueWriter keyWriter @Override public ValueWriter primitive(LogicalType type, Schema primitive) { + int fieldId = AvroSchemaUtil.fieldId(primitive, parentSchema(), this::lastFieldName); + org.apache.avro.LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { case "date": - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case "time-micros": - return FlinkValueWriters.timeMicros(); + return FlinkValueWriters.timeMicros(fieldId); case "timestamp-micros": - return FlinkValueWriters.timestampMicros(); + return FlinkValueWriters.timestampMicros(fieldId); case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return FlinkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + return FlinkValueWriters.decimal(fieldId, decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return ValueWriters.uuids(fieldId); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); @@ -119,28 +130,28 @@ public ValueWriter primitive(LogicalType type, Schema primitive) { case NULL: return ValueWriters.nulls(); case BOOLEAN: - return ValueWriters.booleans(); + return ValueWriters.booleans(fieldId); case INT: switch (type.getTypeRoot()) { case TINYINT: - return ValueWriters.tinyints(); + return ValueWriters.tinyints(fieldId); case SMALLINT: - return ValueWriters.shorts(); + return ValueWriters.shorts(fieldId); default: - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); } case LONG: - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case FLOAT: - return ValueWriters.floats(); + return ValueWriters.floats(fieldId); case DOUBLE: - return ValueWriters.doubles(); + return ValueWriters.doubles(fieldId); case STRING: - return FlinkValueWriters.strings(); + return FlinkValueWriters.strings(fieldId); case FIXED: - return ValueWriters.fixed(primitive.getFixedSize()); + return ValueWriters.fixed(fieldId, primitive.getFixedSize()); case BYTES: - return ValueWriters.bytes(); + return ValueWriters.bytes(fieldId); default: throw new IllegalArgumentException("Unsupported type: " + primitive); } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java index d17978fbbb05..1aca4460521a 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkValueWriters.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.lang.reflect.Array; +import java.util.Arrays; import java.util.List; +import java.util.stream.Stream; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; import org.apache.flink.table.data.ArrayData; @@ -31,7 +33,9 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.avro.ValueWriter; +import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.util.DecimalUtil; @@ -40,20 +44,20 @@ public class FlinkValueWriters { private FlinkValueWriters() { } - static ValueWriter strings() { - return StringWriter.INSTANCE; + static ValueWriter strings(int id) { + return new StringWriter(id); } - static ValueWriter timeMicros() { - return TimeMicrosWriter.INSTANCE; + static ValueWriter timeMicros(int id) { + return new TimeMicrosWriter(id); } - static ValueWriter timestampMicros() { - return TimestampMicrosWriter.INSTANCE; + static ValueWriter timestampMicros(int id) { + return new TimestampMicrosWriter(id); } - static ValueWriter decimal(int precision, int scale) { - return new DecimalWriter(precision, scale); + static ValueWriter decimal(int id, int precision, int scale) { + return new DecimalWriter(id, precision, scale); } static ValueWriter array(ValueWriter elementWriter, LogicalType elementType) { @@ -74,52 +78,61 @@ static ValueWriter row(List> writers, List return new RowWriter(writers, types); } - private static class StringWriter implements ValueWriter { - private static final StringWriter INSTANCE = new StringWriter(); - - private StringWriter() { + private static class StringWriter extends ValueWriters.MetricsAwareStringWriter { + private StringWriter(int id) { + super(id); } @Override - public void write(StringData s, Encoder encoder) throws IOException { + protected void writeVal(StringData s, Encoder encoder) throws IOException { // toBytes is cheaper than Avro calling toString, which incurs encoding costs encoder.writeString(new Utf8(s.toBytes())); } } - private static class DecimalWriter implements ValueWriter { + private static class DecimalWriter extends ValueWriters.ComparableWriter { private final int precision; private final int scale; private final ThreadLocal bytes; - private DecimalWriter(int precision, int scale) { + private DecimalWriter(int id, int precision, int scale) { + super(id); this.precision = precision; this.scale = scale; this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]); } @Override - public void write(DecimalData d, Encoder encoder) throws IOException { + protected void writeVal(DecimalData d, Encoder encoder) throws IOException { encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toBigDecimal(), bytes.get())); } + + @Override + public Stream metrics() { + return metrics(DecimalData::toBigDecimal); + } } - private static class TimeMicrosWriter implements ValueWriter { - private static final TimeMicrosWriter INSTANCE = new TimeMicrosWriter(); + private static class TimeMicrosWriter extends ValueWriters.MetricsAwareTransformWriter { + private TimeMicrosWriter(int id) { + super(id, Long::compareTo, timeMills -> (long) timeMills * 1000); + } @Override - public void write(Integer timeMills, Encoder encoder) throws IOException { - encoder.writeLong(timeMills * 1000); + protected void writeVal(Long time, Encoder encoder) throws IOException { + encoder.writeLong(time); } } - private static class TimestampMicrosWriter implements ValueWriter { - private static final TimestampMicrosWriter INSTANCE = new TimestampMicrosWriter(); + private static class TimestampMicrosWriter extends ValueWriters.MetricsAwareTransformWriter { + private TimestampMicrosWriter(int id) { + super(id, Long::compareTo, + timestampData -> timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000); + } @Override - public void write(TimestampData timestampData, Encoder encoder) throws IOException { - long micros = timestampData.getMillisecond() * 1000 + timestampData.getNanoOfMillisecond() / 1000; - encoder.writeLong(micros); + protected void writeVal(Long datum, Encoder encoder) throws IOException { + encoder.writeLong(datum); } } @@ -144,6 +157,11 @@ public void write(ArrayData array, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return elementWriter.metrics(); + } } private static class ArrayMapWriter implements ValueWriter { @@ -175,6 +193,11 @@ public void write(MapData map, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } private static class MapWriter implements ValueWriter { @@ -206,6 +229,11 @@ public void write(MapData map, Encoder encoder) throws IOException { } encoder.writeMapEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } static class RowWriter implements ValueWriter { @@ -237,5 +265,11 @@ private void write(RowData row, int pos, ValueWriter writer, Encoder enco throws IOException { writer.write((T) getters[pos].getFieldOrNull(row), encoder); } + + @Override + public Stream metrics() { + return Arrays.stream(writers).flatMap(ValueWriter::metrics); + } + } } diff --git a/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkAvroMetricsBounds.java b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkAvroMetricsBounds.java new file mode 100644 index 000000000000..3a481a3fa79e --- /dev/null +++ b/flink/src/test/java/org/apache/iceberg/flink/source/TestFlinkAvroMetricsBounds.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.flink.source; + +import java.io.IOException; +import java.util.Arrays; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.TestAvroMetricsBounds; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.flink.RowDataConverter; +import org.apache.iceberg.flink.sink.FlinkAppenderFactory; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; + +public class TestFlinkAvroMetricsBounds extends TestAvroMetricsBounds { + + @Override + protected Metrics writeAndGetMetrics(Schema schema, Record... records) throws IOException { + RowType flinkSchema = FlinkSchemaUtil.convert(schema); + + FileAppender appender = + new FlinkAppenderFactory(schema, flinkSchema, ImmutableMap.of(), PartitionSpec.unpartitioned()) + .newAppender(org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.AVRO); + try (FileAppender fileAppender = appender) { + Arrays.stream(records).map(r -> RowDataConverter.convert(schema, r)).forEach(fileAppender::add); + } + + return appender.metrics(); + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java index b1625d7df9b1..19740f523525 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java @@ -23,11 +23,14 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,7 +40,7 @@ import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StructType; -public class SparkAvroWriter implements DatumWriter { +public class SparkAvroWriter implements MetricsAwareDatumWriter { private final StructType dsSchema; private ValueWriter writer = null; @@ -57,6 +60,11 @@ public void write(InternalRow datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroWithSparkSchemaVisitor> { @Override public ValueWriter record(DataType struct, Schema record, List names, List> fields) { @@ -71,9 +79,9 @@ public ValueWriter union(DataType type, Schema union, List> op Preconditions.checkArgument(options.size() == 2, "Cannot create writer for non-option union: %s", union); if (union.getTypes().get(0).getType() == Schema.Type.NULL) { - return ValueWriters.option(0, options.get(1)); + return ValueWriters.option(0, options.get(1), union.getTypes().get(1).getType()); } else { - return ValueWriters.option(1, options.get(0)); + return ValueWriters.option(1, options.get(0), union.getTypes().get(0).getType()); } } @@ -84,7 +92,9 @@ public ValueWriter array(DataType sArray, Schema array, ValueWriter elemen @Override public ValueWriter map(DataType sMap, Schema map, ValueWriter valueReader) { - return SparkValueWriters.map(SparkValueWriters.strings(), mapKeyType(sMap), valueReader, mapValueType(sMap)); + return SparkValueWriters.map( + SparkValueWriters.strings(AvroSchemaUtil.getKeyId(map)), mapKeyType(sMap), + valueReader, mapValueType(sMap)); } @Override @@ -94,23 +104,25 @@ public ValueWriter map(DataType sMap, Schema map, ValueWriter keyWriter, V @Override public ValueWriter primitive(DataType type, Schema primitive) { + int fieldId = AvroSchemaUtil.fieldId(primitive, parentSchema(), this::lastFieldName); + LogicalType logicalType = primitive.getLogicalType(); if (logicalType != null) { switch (logicalType.getName()) { case "date": // Spark uses the same representation - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case "timestamp-micros": // Spark uses the same representation - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case "decimal": LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; - return SparkValueWriters.decimal(decimal.getPrecision(), decimal.getScale()); + return SparkValueWriters.decimal(fieldId, decimal.getPrecision(), decimal.getScale()); case "uuid": - return ValueWriters.uuids(); + return ValueWriters.uuids(fieldId); default: throw new IllegalArgumentException("Unsupported logical type: " + logicalType); @@ -121,26 +133,26 @@ public ValueWriter primitive(DataType type, Schema primitive) { case NULL: return ValueWriters.nulls(); case BOOLEAN: - return ValueWriters.booleans(); + return ValueWriters.booleans(fieldId); case INT: if (type instanceof ByteType) { - return ValueWriters.tinyints(); + return ValueWriters.tinyints(fieldId); } else if (type instanceof ShortType) { - return ValueWriters.shorts(); + return ValueWriters.shorts(fieldId); } - return ValueWriters.ints(); + return ValueWriters.ints(fieldId); case LONG: - return ValueWriters.longs(); + return ValueWriters.longs(fieldId); case FLOAT: - return ValueWriters.floats(); + return ValueWriters.floats(fieldId); case DOUBLE: - return ValueWriters.doubles(); + return ValueWriters.doubles(fieldId); case STRING: - return SparkValueWriters.strings(); + return SparkValueWriters.strings(fieldId); case FIXED: - return ValueWriters.fixed(primitive.getFixedSize()); + return ValueWriters.fixed(fieldId, primitive.getFixedSize()); case BYTES: - return ValueWriters.bytes(); + return ValueWriters.bytes(fieldId); default: throw new IllegalArgumentException("Unsupported type: " + primitive); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java index 24a69c1d7f11..4fe2ed88aa21 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkValueWriters.java @@ -21,14 +21,17 @@ import java.io.IOException; import java.lang.reflect.Array; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; +import java.util.Arrays; import java.util.List; -import java.util.UUID; +import java.util.stream.Stream; import org.apache.avro.io.Encoder; import org.apache.avro.util.Utf8; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.avro.ValueWriter; +import org.apache.iceberg.avro.ValueWriters; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; import org.apache.iceberg.util.DecimalUtil; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.util.ArrayData; @@ -42,16 +45,12 @@ public class SparkValueWriters { private SparkValueWriters() { } - static ValueWriter strings() { - return StringWriter.INSTANCE; + static ValueWriter strings(int id) { + return new StringWriter(id); } - static ValueWriter uuids() { - return UUIDWriter.INSTANCE; - } - - static ValueWriter decimal(int precision, int scale) { - return new DecimalWriter(precision, scale); + static ValueWriter decimal(int id, int precision, int scale) { + return new DecimalWriter(id, precision, scale); } static ValueWriter array(ValueWriter elementWriter, DataType elementType) { @@ -72,14 +71,13 @@ static ValueWriter struct(List> writers, List { - private static final StringWriter INSTANCE = new StringWriter(); - - private StringWriter() { + private static class StringWriter extends ValueWriters.MetricsAwareStringWriter { + private StringWriter(int id) { + super(id); } @Override - public void write(UTF8String s, Encoder encoder) throws IOException { + protected void writeVal(UTF8String s, Encoder encoder) throws IOException { // use getBytes because it may return the backing byte array if available. // otherwise, it copies to a new byte array, which is still cheaper than Avro // calling toString, which incurs encoding costs @@ -87,46 +85,27 @@ public void write(UTF8String s, Encoder encoder) throws IOException { } } - private static class UUIDWriter implements ValueWriter { - private static final ThreadLocal BUFFER = ThreadLocal.withInitial(() -> { - ByteBuffer buffer = ByteBuffer.allocate(16); - buffer.order(ByteOrder.BIG_ENDIAN); - return buffer; - }); - - private static final UUIDWriter INSTANCE = new UUIDWriter(); - - private UUIDWriter() { - } - - @Override - @SuppressWarnings("ByteBufferBackingArray") - public void write(UTF8String s, Encoder encoder) throws IOException { - // TODO: direct conversion from string to byte buffer - UUID uuid = UUID.fromString(s.toString()); - ByteBuffer buffer = BUFFER.get(); - buffer.rewind(); - buffer.putLong(uuid.getMostSignificantBits()); - buffer.putLong(uuid.getLeastSignificantBits()); - encoder.writeFixed(buffer.array()); - } - } - - private static class DecimalWriter implements ValueWriter { + private static class DecimalWriter extends ValueWriters.MetricsAwareWriter { private final int precision; private final int scale; private final ThreadLocal bytes; - private DecimalWriter(int precision, int scale) { + private DecimalWriter(int id, int precision, int scale) { + super(id, Comparators.forType(Types.DecimalType.of(precision, scale))); this.precision = precision; this.scale = scale; this.bytes = ThreadLocal.withInitial(() -> new byte[TypeUtil.decimalRequiredBytes(precision)]); } @Override - public void write(Decimal d, Encoder encoder) throws IOException { + protected void writeVal(Decimal d, Encoder encoder) throws IOException { encoder.writeFixed(DecimalUtil.toReusedFixLengthBytes(precision, scale, d.toJavaBigDecimal(), bytes.get())); } + + @Override + public Stream metrics() { + return metrics(Decimal::toJavaBigDecimal); + } } private static class ArrayWriter implements ValueWriter { @@ -150,6 +129,11 @@ public void write(ArrayData array, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return elementWriter.metrics(); + } } private static class ArrayMapWriter implements ValueWriter { @@ -181,6 +165,11 @@ public void write(MapData map, Encoder encoder) throws IOException { } encoder.writeArrayEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } private static class MapWriter implements ValueWriter { @@ -212,6 +201,11 @@ public void write(MapData map, Encoder encoder) throws IOException { } encoder.writeMapEnd(); } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } static class StructWriter implements ValueWriter { @@ -243,6 +237,11 @@ public void write(InternalRow row, Encoder encoder) throws IOException { } } + @Override + public Stream metrics() { + return Arrays.stream(writers).flatMap(ValueWriter::metrics); + } + @SuppressWarnings("unchecked") private void write(InternalRow row, int pos, ValueWriter writer, Encoder encoder) throws IOException { diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAvroMetricsBounds.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAvroMetricsBounds.java new file mode 100644 index 000000000000..676b05428a7a --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkAvroMetricsBounds.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.TestAvroMetricsBounds; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.spark.sql.catalyst.InternalRow; + +public class TestSparkAvroMetricsBounds extends TestAvroMetricsBounds { + + @Override + protected boolean supportTimeWithoutZoneField() { + return false; + } + + @Override + protected boolean supportTimeField() { + return false; + } + + @Override + protected Metrics writeAndGetMetrics(Schema schema, Record... records) throws IOException { + FileAppender appender = + new SparkAppenderFactory(new HashMap<>(), schema, SparkSchemaUtil.convert(schema)).newAppender( + org.apache.iceberg.Files.localOutput(temp.newFile()), FileFormat.AVRO); + try (FileAppender fileAppender = appender) { + Arrays.stream(records).map(r -> new StructInternalRow(schema.asStruct()).setStruct(r)).forEach(fileAppender::add); + } + + return appender.metrics(); + } +}