From 5a1b122ef4441174aae461779fdc801d2d27ff78 Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Tue, 5 Jan 2021 14:02:49 -0800 Subject: [PATCH 1/2] Core: Implement NaN counts in ORC --- .../java/org/apache/iceberg/TestMetrics.java | 4 +- .../iceberg/data/orc/GenericOrcWriter.java | 17 ++++- .../iceberg/data/orc/GenericOrcWriters.java | 53 +++++++++++++-- .../apache/iceberg/TestMergingMetrics.java | 2 +- .../TestGenericMergingMetrics.java | 4 +- .../iceberg/flink/data/FlinkOrcWriter.java | 31 ++++++++- .../iceberg/flink/data/FlinkOrcWriters.java | 18 +++++ .../flink/data/FlinkSchemaVisitor.java | 68 +++++++++++++++++-- .../org/apache/iceberg/orc/ORCSchemaUtil.java | 2 +- .../apache/iceberg/orc/OrcFileAppender.java | 2 +- .../org/apache/iceberg/orc/OrcMetrics.java | 16 +++-- .../org/apache/iceberg/orc/OrcRowWriter.java | 7 ++ .../apache/iceberg/orc/OrcValueWriter.java | 9 +++ .../iceberg/parquet/ParquetValueWriter.java | 4 -- .../spark/data/SparkOrcValueWriter.java | 13 ++++ .../spark/data/SparkOrcValueWriters.java | 61 ++++++++++++++--- .../iceberg/spark/data/SparkOrcWriter.java | 22 +++++- 17 files changed, 293 insertions(+), 40 deletions(-) rename data/src/test/java/org/apache/iceberg/{ => parquet}/TestGenericMergingMetrics.java (92%) diff --git a/core/src/test/java/org/apache/iceberg/TestMetrics.java b/core/src/test/java/org/apache/iceberg/TestMetrics.java index f1a65f6202fd..2e7cdbf5d355 100644 --- a/core/src/test/java/org/apache/iceberg/TestMetrics.java +++ b/core/src/test/java/org/apache/iceberg/TestMetrics.java @@ -628,9 +628,7 @@ protected void assertCounts(int fieldId, Long valueCount, Long nullValueCount, L Map nanValueCounts = metrics.nanValueCounts(); Assert.assertEquals(valueCount, valueCounts.get(fieldId)); Assert.assertEquals(nullValueCount, nullValueCounts.get(fieldId)); - if (fileFormat() != FileFormat.ORC) { - Assert.assertEquals(nanValueCount, nanValueCounts.get(fieldId)); - } + Assert.assertEquals(nanValueCount, nanValueCounts.get(fieldId)); } protected void assertBounds(int fieldId, Type type, T lowerBound, T upperBound, Metrics metrics) { diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java index cf7082cd035f..a04d777df1d6 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriter.java @@ -20,8 +20,11 @@ package org.apache.iceberg.data.orc; import java.util.List; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.Record; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.orc.OrcValueWriter; @@ -79,9 +82,9 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescriptio case LONG: return GenericOrcWriters.longs(); case FLOAT: - return GenericOrcWriters.floats(); + return GenericOrcWriters.floats(ORCSchemaUtil.fieldId(primitive)); case DOUBLE: - return GenericOrcWriters.doubles(); + return GenericOrcWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case DATE: return GenericOrcWriters.dates(); case TIME: @@ -125,6 +128,11 @@ public void write(Record value, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class RecordWriter implements OrcValueWriter { private final List> writers; @@ -150,5 +158,10 @@ public void nonNullWrite(int rowId, Record data, ColumnVector output) { child.write(rowId, data.get(c, child.getJavaClass()), cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } } } diff --git a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java index b532faa8c951..91de897bdb67 100644 --- a/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java +++ b/data/src/main/java/org/apache/iceberg/data/orc/GenericOrcWriters.java @@ -32,6 +32,9 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.FloatFieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -77,12 +80,12 @@ public static OrcValueWriter longs() { return LongWriter.INSTANCE; } - public static OrcValueWriter floats() { - return FloatWriter.INSTANCE; + public static OrcValueWriter floats(int id) { + return new FloatWriter(id); } - public static OrcValueWriter doubles() { - return DoubleWriter.INSTANCE; + public static OrcValueWriter doubles(int id) { + return new DoubleWriter(id); } public static OrcValueWriter strings() { @@ -216,7 +219,13 @@ public void nonNullWrite(int rowId, Long data, ColumnVector output) { } private static class FloatWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new FloatWriter(); + private final int id; + private long nanCount; + + private FloatWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public Class getJavaClass() { @@ -226,11 +235,25 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Float data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; + if (Float.isNaN(data)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } private static class DoubleWriter implements OrcValueWriter { - private static final OrcValueWriter INSTANCE = new DoubleWriter(); + private final int id; + private long nanCount; + + private DoubleWriter(Integer id) { + this.id = id; + this.nanCount = 0; + } @Override public Class getJavaClass() { @@ -240,6 +263,14 @@ public Class getJavaClass() { @Override public void nonNullWrite(int rowId, Double data, ColumnVector output) { ((DoubleColumnVector) output).vector[rowId] = data; + if (Double.isNaN(data)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } @@ -436,6 +467,11 @@ public void nonNullWrite(int rowId, List value, ColumnVector output) { element.write((int) (e + cv.offsets[rowId]), value.get(e), cv.child); } } + + @Override + public Stream metrics() { + return element.metrics(); + } } private static class MapWriter implements OrcValueWriter> { @@ -475,5 +511,10 @@ public void nonNullWrite(int rowId, Map map, ColumnVector output) { valueWriter.write(pos, values.get(e), cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } } diff --git a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java index 96217229d879..fa588302e8e2 100644 --- a/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/TestMergingMetrics.java @@ -90,7 +90,7 @@ public abstract class TestMergingMetrics { @Parameterized.Parameters(name = "fileFormat = {0}") public static Object[] parameters() { - return new Object[] {FileFormat.PARQUET }; + return new Object[] { FileFormat.PARQUET, FileFormat.ORC }; } public TestMergingMetrics(FileFormat fileFormat) { diff --git a/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java similarity index 92% rename from data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java rename to data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java index e7181feadaf5..72db36a9908c 100644 --- a/data/src/test/java/org/apache/iceberg/TestGenericMergingMetrics.java +++ b/data/src/test/java/org/apache/iceberg/parquet/TestGenericMergingMetrics.java @@ -17,10 +17,12 @@ * under the License. */ -package org.apache.iceberg; +package org.apache.iceberg.parquet; import java.io.IOException; import java.util.List; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.TestMergingMetrics; import org.apache.iceberg.data.GenericAppenderFactory; import org.apache.iceberg.data.Record; import org.apache.iceberg.io.FileAppender; diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java index 592307ded257..9aff0c127449 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriter.java @@ -19,14 +19,18 @@ package org.apache.iceberg.flink.data; +import java.util.Deque; import java.util.List; +import java.util.stream.Stream; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; import org.apache.iceberg.data.orc.GenericOrcWriters; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcValueWriter; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -63,10 +67,27 @@ public void write(RowData row, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends FlinkSchemaVisitor> { + private final Deque fieldIds = Lists.newLinkedList(); + private WriteBuilder() { } + @Override + public void beforeField(Types.NestedField field) { + fieldIds.push(field.fieldId()); + } + + @Override + public void afterField(Types.NestedField field) { + fieldIds.pop(); + } + @Override public OrcValueWriter record(Types.StructType iStruct, List> results, @@ -101,9 +122,15 @@ public OrcValueWriter primitive(Type.PrimitiveType iPrimitive, LogicalType fl case LONG: return GenericOrcWriters.longs(); case FLOAT: - return GenericOrcWriters.floats(); + Preconditions.checkArgument(fieldIds.peek() != null, + String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id " + + "information is not properly pushed during schema visiting.", iPrimitive)); + return GenericOrcWriters.floats(fieldIds.peek()); case DOUBLE: - return GenericOrcWriters.doubles(); + Preconditions.checkArgument(fieldIds.peek() != null, + String.format("[BUG] Cannot find field id for primitive field with type %s. This is likely because id " + + "information is not properly pushed during schema visiting.", iPrimitive)); + return GenericOrcWriters.doubles(fieldIds.peek()); case DATE: return FlinkOrcWriters.dates(); case TIME: diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java index dae9cb72d60f..f157bf0c27e3 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkOrcWriters.java @@ -23,6 +23,7 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.List; +import java.util.stream.Stream; import org.apache.flink.table.data.ArrayData; import org.apache.flink.table.data.DecimalData; import org.apache.flink.table.data.MapData; @@ -30,6 +31,7 @@ import org.apache.flink.table.data.StringData; import org.apache.flink.table.data.TimestampData; import org.apache.flink.table.types.logical.LogicalType; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.orc.OrcValueWriter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -254,6 +256,12 @@ public void nonNullWrite(int rowId, ArrayData data, ColumnVector output) { elementWriter.write((int) (e + cv.offsets[rowId]), (T) value, cv.child); } } + + @Override + public Stream metrics() { + return elementWriter.metrics(); + } + } static class MapWriter implements OrcValueWriter { @@ -296,6 +304,11 @@ public void nonNullWrite(int rowId, MapData data, ColumnVector output) { valueWriter.write(pos, (V) valueGetter.getElementOrNull(valArray, e), cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } static class StructWriter implements OrcValueWriter { @@ -329,5 +342,10 @@ public void nonNullWrite(int rowId, RowData data, ColumnVector output) { writer.write(rowId, fieldGetters.get(c).getFieldOrNull(data), cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(OrcValueWriter::metrics); + } } } diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java index 363d2bde4918..0909e1b53a85 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkSchemaVisitor.java @@ -44,17 +44,39 @@ private static T visit(LogicalType flinkType, Type iType, FlinkSchemaVisitor case MAP: MapType mapType = (MapType) flinkType; Types.MapType iMapType = iType.asMapType(); - - T key = visit(mapType.getKeyType(), iMapType.keyType(), visitor); - T value = visit(mapType.getValueType(), iMapType.valueType(), visitor); + T key; + T value; + + Types.NestedField keyField = iMapType.field(iMapType.keyId()); + visitor.beforeMapKey(keyField); + try { + key = visit(mapType.getKeyType(), iMapType.keyType(), visitor); + } finally { + visitor.afterMapKey(keyField); + } + + Types.NestedField valueField = iMapType.field(iMapType.valueId()); + visitor.beforeMapValue(valueField); + try { + value = visit(mapType.getValueType(), iMapType.valueType(), visitor); + } finally { + visitor.afterMapValue(valueField); + } return visitor.map(iMapType, key, value, mapType.getKeyType(), mapType.getValueType()); case LIST: ArrayType listType = (ArrayType) flinkType; Types.ListType iListType = iType.asListType(); + T element; - T element = visit(listType.getElementType(), iListType.elementType(), visitor); + Types.NestedField elementField = iListType.field(iListType.elementId()); + visitor.beforeListElement(elementField); + try { + element = visit(listType.getElementType(), iListType.elementType(), visitor); + } finally { + visitor.afterListElement(elementField); + } return visitor.list(iListType, element, listType.getElementType()); @@ -82,7 +104,13 @@ private static T visitRecord(LogicalType flinkType, Types.StructType struct, LogicalType fieldFlinkType = rowType.getTypeAt(fieldIndex); fieldTypes.add(fieldFlinkType); - results.add(visit(fieldFlinkType, iField.type(), visitor)); + + visitor.beforeField(iField); + try { + results.add(visit(fieldFlinkType, iField.type(), visitor)); + } finally { + visitor.afterField(iField); + } } return visitor.record(struct, results, fieldTypes); @@ -103,4 +131,34 @@ public T map(Types.MapType iMap, T key, T value, LogicalType keyType, LogicalTyp public T primitive(Type.PrimitiveType iPrimitive, LogicalType flinkPrimitive) { return null; } + + public void beforeField(Types.NestedField field) { + } + + public void afterField(Types.NestedField field) { + } + + public void beforeListElement(Types.NestedField elementField) { + beforeField(elementField); + } + + public void afterListElement(Types.NestedField elementField) { + afterField(elementField); + } + + public void beforeMapKey(Types.NestedField keyField) { + beforeField(keyField); + } + + public void afterMapKey(Types.NestedField keyField) { + afterField(keyField); + } + + public void beforeMapValue(Types.NestedField valueField) { + beforeField(valueField); + } + + public void afterMapValue(Types.NestedField valueField) { + afterField(valueField); + } } diff --git a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java index b7496d1e914c..af1d2cf66a41 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java +++ b/orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java @@ -387,7 +387,7 @@ static Optional icebergID(TypeDescription orcType) { .map(Integer::parseInt); } - static int fieldId(TypeDescription orcType) { + public static int fieldId(TypeDescription orcType) { String idStr = orcType.getAttributeValue(ICEBERG_ID_ATTRIBUTE); Preconditions.checkNotNull(idStr, "Missing expected '%s' property", ICEBERG_ID_ATTRIBUTE); return Integer.parseInt(idStr); diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java index 34f71fcc232e..e946cda3f3a8 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcFileAppender.java @@ -94,7 +94,7 @@ public void add(D datum) { public Metrics metrics() { Preconditions.checkState(isClosed, "Cannot return metrics while appending to an open file."); - return OrcMetrics.fromWriter(writer, metricsConfig); + return OrcMetrics.fromWriter(writer, valueWriter.metrics(), metricsConfig); } @Override diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java index aacd621cd3bb..67b608441399 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcMetrics.java @@ -27,11 +27,14 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Stream; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Metrics; import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.MetricsModes; import org.apache.iceberg.MetricsModes.MetricsMode; +import org.apache.iceberg.MetricsUtil; import org.apache.iceberg.Schema; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Literal; @@ -83,22 +86,25 @@ public static Metrics fromInputFile(InputFile file, MetricsConfig metricsConfig, static Metrics fromInputFile(InputFile file, Configuration config, MetricsConfig metricsConfig, NameMapping mapping) { try (Reader orcReader = ORC.newFileReader(file, config)) { return buildOrcMetrics(orcReader.getNumberOfRows(), orcReader.getSchema(), orcReader.getStatistics(), - metricsConfig, mapping); + Stream.empty(), metricsConfig, mapping); } catch (IOException ioe) { throw new RuntimeIOException(ioe, "Failed to open file: %s", file.location()); } } - static Metrics fromWriter(Writer writer, MetricsConfig metricsConfig) { + static Metrics fromWriter(Writer writer, Stream fieldMetricsStream, MetricsConfig metricsConfig) { try { - return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), metricsConfig, null); + return buildOrcMetrics(writer.getNumberOfRows(), writer.getSchema(), writer.getStatistics(), + fieldMetricsStream, metricsConfig, null); } catch (IOException ioe) { throw new RuntimeIOException(ioe, "Failed to get statistics from writer"); } } private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescription orcSchema, - final ColumnStatistics[] colStats, final MetricsConfig metricsConfig, + final ColumnStatistics[] colStats, + final Stream fieldMetricsStream, + final MetricsConfig metricsConfig, final NameMapping mapping) { final TypeDescription orcSchemaWithIds = (!ORCSchemaUtil.hasIds(orcSchema) && mapping != null) ? ORCSchemaUtil.applyNameMapping(orcSchema, mapping) : orcSchema; @@ -115,6 +121,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti valueCounts, nullCounts, null, + null, null); } @@ -167,6 +174,7 @@ private static Metrics buildOrcMetrics(final long numOfRows, final TypeDescripti columnSizes, valueCounts, nullCounts, + MetricsUtil.createNanValueCounts(fieldMetricsStream, effectiveMetricsConfig, schema), lowerBounds, upperBounds); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java index df494b9cc3e1..c3463ebacdef 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -20,6 +20,8 @@ package org.apache.iceberg.orc; import java.io.IOException; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; /** @@ -35,4 +37,9 @@ public interface OrcRowWriter { * @throws IOException if there's any IO error while writing the data value. */ void write(T row, VectorizedRowBatch output) throws IOException; + + /** + * Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. + */ + Stream metrics(); } diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java index 9bbc1ddc6f0c..2f72fc20e053 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcValueWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.orc; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.ColumnVector; public interface OrcValueWriter { @@ -43,4 +45,11 @@ default void write(int rowId, T data, ColumnVector output) { } void nonNullWrite(int rowId, T data, ColumnVector output); + + /** + * Returns a stream of {@link FieldMetrics} that this OrcValueWriter keeps track of. + */ + default Stream metrics() { + return Stream.empty(); + } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java index 0c94555c9cd6..7692ee58028d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueWriter.java @@ -33,10 +33,6 @@ public interface ParquetValueWriter { /** * Returns a stream of {@link FieldMetrics} that this ParquetValueWriter keeps track of. - *

- * Since Parquet keeps track of most metrics in its footer, for now ParquetValueWriter only keeps track of NaN - * counter, and only return non-empty stream if the writer writes double or float values either by itself or - * transitively. */ default Stream metrics() { return Stream.empty(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java index ef0ccf28e95f..131a93d6f9d9 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriter.java @@ -19,6 +19,8 @@ package org.apache.iceberg.spark.data; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.orc.storage.ql.exec.vector.ColumnVector; import org.apache.spark.sql.catalyst.expressions.SpecializedGetters; @@ -43,4 +45,15 @@ default void write(int rowId, int column, SpecializedGetters data, ColumnVector } void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output); + + /** + * Returns a stream of {@link FieldMetrics} that this SparkOrcValueWriter keeps track of. + *

+ * Since ORC keeps track of most metrics via column statistics, for now SparkOrcValueWriter only keeps track of NaN + * counters, and only return non-empty stream if the writer writes double or float values either by itself or + * transitively. + */ + default Stream metrics() { + return Stream.empty(); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java index 73f8969e492b..730dce4fff0e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcValueWriters.java @@ -19,6 +19,9 @@ package org.apache.iceberg.spark.data; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.FloatFieldMetrics; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; import org.apache.orc.storage.ql.exec.vector.ColumnVector; @@ -56,12 +59,12 @@ static SparkOrcValueWriter longs() { return LongWriter.INSTANCE; } - static SparkOrcValueWriter floats() { - return FloatWriter.INSTANCE; + static SparkOrcValueWriter floats(int id) { + return new FloatWriter(id); } - static SparkOrcValueWriter doubles() { - return DoubleWriter.INSTANCE; + static SparkOrcValueWriter doubles(int id) { + return new DoubleWriter(id); } static SparkOrcValueWriter byteArrays() { @@ -138,20 +141,52 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV } private static class FloatWriter implements SparkOrcValueWriter { - private static final FloatWriter INSTANCE = new FloatWriter(); + private final int id; + private long nanCount; + + private FloatWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = data.getFloat(column); + float floatValue = data.getFloat(column); + ((DoubleColumnVector) output).vector[rowId] = floatValue; + + if (Float.isNaN(floatValue)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } private static class DoubleWriter implements SparkOrcValueWriter { - private static final DoubleWriter INSTANCE = new DoubleWriter(); + private final int id; + private long nanCount; + + private DoubleWriter(int id) { + this.id = id; + this.nanCount = 0; + } @Override public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnVector output) { - ((DoubleColumnVector) output).vector[rowId] = data.getDouble(column); + double doubleValue = data.getDouble(column); + ((DoubleColumnVector) output).vector[rowId] = doubleValue; + + if (Double.isNaN(doubleValue)) { + nanCount++; + } + } + + @Override + public Stream metrics() { + return Stream.of(new FloatFieldMetrics(id, nanCount)); } } @@ -244,6 +279,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV writer.write((int) (e + cv.offsets[rowId]), e, value, cv.child); } } + + @Override + public Stream metrics() { + return writer.metrics(); + } } private static class MapWriter implements SparkOrcValueWriter { @@ -275,5 +315,10 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV valueWriter.write(pos, e, value, cv.values); } } + + @Override + public Stream metrics() { + return Stream.concat(keyWriter.metrics(), valueWriter.metrics()); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 4508a102d447..283740eb0055 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -20,7 +20,10 @@ package org.apache.iceberg.spark.data; import java.util.List; +import java.util.stream.Stream; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.Schema; +import org.apache.iceberg.orc.ORCSchemaUtil; import org.apache.iceberg.orc.OrcRowWriter; import org.apache.iceberg.orc.OrcSchemaWithTypeVisitor; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -61,6 +64,11 @@ public void write(InternalRow value, VectorizedRowBatch output) { } } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends OrcSchemaWithTypeVisitor { private WriteBuilder() { } @@ -98,9 +106,9 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript case LONG: return SparkOrcValueWriters.longs(); case FLOAT: - return SparkOrcValueWriters.floats(); + return SparkOrcValueWriters.floats(getFieldId(primitive)); case DOUBLE: - return SparkOrcValueWriters.doubles(); + return SparkOrcValueWriters.doubles(getFieldId(primitive)); case BINARY: return SparkOrcValueWriters.byteArrays(); case STRING: @@ -115,6 +123,10 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript throw new IllegalArgumentException("Unhandled type " + primitive); } } + + private int getFieldId(TypeDescription typeDescription) { + return ORCSchemaUtil.fieldId(typeDescription); + } } private static class StructWriter implements SparkOrcValueWriter { @@ -136,5 +148,11 @@ public void nonNullWrite(int rowId, int column, SpecializedGetters data, ColumnV writers.get(c).write(rowId, c, value, cv.fields[c]); } } + + @Override + public Stream metrics() { + return writers.stream().flatMap(SparkOrcValueWriter::metrics); + } + } } From 376524e3dc10e7f1fbe2ead47b7116cbdbd707ba Mon Sep 17 00:00:00 2001 From: Yan Yan Date: Mon, 11 Jan 2021 17:48:30 -0800 Subject: [PATCH 2/2] address comments --- .../main/java/org/apache/iceberg/orc/OrcRowWriter.java | 4 +++- .../org/apache/iceberg/spark/data/SparkOrcWriter.java | 8 ++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java index c3463ebacdef..413634e3e100 100644 --- a/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java +++ b/orc/src/main/java/org/apache/iceberg/orc/OrcRowWriter.java @@ -41,5 +41,7 @@ public interface OrcRowWriter { /** * Returns a stream of {@link FieldMetrics} that this OrcRowWriter keeps track of. */ - Stream metrics(); + default Stream metrics() { + return Stream.empty(); + } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java index 283740eb0055..ce1b2bec0ec1 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkOrcWriter.java @@ -106,9 +106,9 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript case LONG: return SparkOrcValueWriters.longs(); case FLOAT: - return SparkOrcValueWriters.floats(getFieldId(primitive)); + return SparkOrcValueWriters.floats(ORCSchemaUtil.fieldId(primitive)); case DOUBLE: - return SparkOrcValueWriters.doubles(getFieldId(primitive)); + return SparkOrcValueWriters.doubles(ORCSchemaUtil.fieldId(primitive)); case BINARY: return SparkOrcValueWriters.byteArrays(); case STRING: @@ -123,10 +123,6 @@ public SparkOrcValueWriter primitive(Type.PrimitiveType iPrimitive, TypeDescript throw new IllegalArgumentException("Unhandled type " + primitive); } } - - private int getFieldId(TypeDescription typeDescription) { - return ORCSchemaUtil.fieldId(typeDescription); - } } private static class StructWriter implements SparkOrcValueWriter {