diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index 0dbbfa8d15aa..13ea6b9bb0af 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -157,6 +157,10 @@ public static Comparator unsignedBytes() { return UnsignedByteBufComparator.INSTANCE; } + public static Comparator unsignedByteArrays() { + return UnsignedByteArrayComparator.INSTANCE; + } + public static Comparator signedBytes() { return Comparator.naturalOrder(); } @@ -272,6 +276,30 @@ public int compare(ByteBuffer buf1, ByteBuffer buf2) { } } + private static class UnsignedByteArrayComparator implements Comparator { + private static final UnsignedByteArrayComparator INSTANCE = new UnsignedByteArrayComparator(); + + private UnsignedByteArrayComparator() { + } + + @Override + public int compare(byte[] array1, byte[] array2) { + int len = Math.min(array1.length, array2.length); + + // find the first difference and return + for (int i = 0; i < len; i += 1) { + // Conversion to int is what Byte.toUnsignedInt would do + int cmp = Integer.compare(((int) array1[i]) & 0xff, ((int) array2[i]) & 0xff); + if (cmp != 0) { + return cmp; + } + } + + // if there are no differences, then the shorter seq is first + return Integer.compare(array1.length, array2.length); + } + } + private static class CharSeqComparator implements Comparator { private static final CharSeqComparator INSTANCE = new CharSeqComparator(); diff --git a/core/src/main/java/org/apache/iceberg/avro/Avro.java b/core/src/main/java/org/apache/iceberg/avro/Avro.java index 5a34eedc80d5..b7e56a82b2b5 100644 --- a/core/src/main/java/org/apache/iceberg/avro/Avro.java +++ b/core/src/main/java/org/apache/iceberg/avro/Avro.java @@ -28,6 +28,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; @@ -37,7 +38,9 @@ import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.avro.specific.SpecificData; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.StructLike; @@ -102,6 +105,7 @@ public static class WriteBuilder { private String name = "table"; private Function> createWriterFunc = null; private boolean overwrite; + private MetricsConfig metricsConfig; private WriteBuilder(OutputFile file) { this.file = file; @@ -148,6 +152,11 @@ public WriteBuilder meta(Map properties) { return this; } + public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) { + this.metricsConfig = newMetricsConfig; + return this; + } + public WriteBuilder overwrite() { return overwrite(true); } @@ -181,7 +190,7 @@ public FileAppender build() throws IOException { meta("iceberg.schema", SchemaParser.toJson(schema)); return new AvroFileAppender<>( - AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite); + schema, AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, metricsConfig, overwrite); } } @@ -320,7 +329,7 @@ public PositionDeleteWriter buildPositionWriter() throws IOException { /** * A {@link DatumWriter} implementation that wraps another to produce position deletes. */ - private static class PositionDatumWriter implements DatumWriter> { + private static class PositionDatumWriter implements MetricsAwareDatumWriter> { private static final ValueWriter PATH_WRITER = ValueWriters.strings(); private static final ValueWriter POS_WRITER = ValueWriters.longs(); @@ -333,6 +342,11 @@ public void write(PositionDelete delete, Encoder out) throws IOException { PATH_WRITER.write(delete.path(), out); POS_WRITER.write(delete.pos(), out); } + + @Override + public Stream metrics() { + return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics()); + } } /** @@ -340,9 +354,10 @@ public void write(PositionDelete delete, Encoder out) throws IOException { * * @param the type of datum written as a deleted row */ - private static class PositionAndRowDatumWriter implements DatumWriter> { + private static class PositionAndRowDatumWriter implements MetricsAwareDatumWriter> { private static final ValueWriter PATH_WRITER = ValueWriters.strings(); private static final ValueWriter POS_WRITER = ValueWriters.longs(); + private final DatumWriter rowWriter; private PositionAndRowDatumWriter(DatumWriter rowWriter) { @@ -363,6 +378,11 @@ public void write(PositionDelete delete, Encoder out) throws IOException { POS_WRITER.write(delete.pos(), out); rowWriter.write(delete.row(), out); } + + @Override + public Stream metrics() { + return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics()); + } } public static ReadBuilder read(InputFile file) { diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java index a77b07d0b8e0..f4f6c8482917 100644 --- a/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java +++ b/core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java @@ -27,22 +27,31 @@ import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.FileAppender; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.io.PositionOutputStream; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; class AvroFileAppender implements FileAppender { private PositionOutputStream stream = null; private DataFileWriter writer = null; + private DatumWriter datumWriter = null; + private org.apache.iceberg.Schema icebergSchema; + private MetricsConfig metricsConfig; private long numRecords = 0L; + private boolean isClosed = false; - AvroFileAppender(Schema schema, OutputFile file, + AvroFileAppender(org.apache.iceberg.Schema icebergSchema, Schema schema, OutputFile file, Function> createWriterFunc, CodecFactory codec, Map metadata, - boolean overwrite) throws IOException { + MetricsConfig metricsConfig, boolean overwrite) throws IOException { + this.icebergSchema = icebergSchema; this.stream = overwrite ? file.createOrOverwrite() : file.create(); - this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata); + this.datumWriter = createWriterFunc.apply(schema); + this.writer = newAvroWriter(schema, stream, datumWriter, codec, metadata); + this.metricsConfig = metricsConfig; } @Override @@ -57,7 +66,10 @@ public void add(D datum) { @Override public Metrics metrics() { - return new Metrics(numRecords, null, null, null); + Preconditions.checkState(isClosed, + "Cannot return metrics while appending to an open file."); + + return AvroMetrics.fromWriter(datumWriter, icebergSchema, numRecords, metricsConfig); } @Override @@ -77,15 +89,16 @@ public void close() throws IOException { if (writer != null) { writer.close(); this.writer = null; + isClosed = true; } } @SuppressWarnings("unchecked") private static DataFileWriter newAvroWriter( - Schema schema, PositionOutputStream stream, Function> createWriterFunc, + Schema schema, PositionOutputStream stream, DatumWriter metricsAwareDatumWriter, CodecFactory codec, Map metadata) throws IOException { DataFileWriter writer = new DataFileWriter<>( - (DatumWriter) createWriterFunc.apply(schema)); + (DatumWriter) metricsAwareDatumWriter); writer.setCodec(codec); diff --git a/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java b/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java new file mode 100644 index 000000000000..83a35666e5a0 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.Metrics; +import org.apache.iceberg.MetricsConfig; +import org.apache.iceberg.Schema; + +public class AvroMetrics { + + private AvroMetrics() { + } + + static Metrics fromWriter(DatumWriter datumWriter, Schema schema, long numRecords, + MetricsConfig inputMetricsConfig) { + // TODO will populate in following PRs if datum writer is a MetricsAwareDatumWriter + return new Metrics(numRecords, null, null, null); + } +} diff --git a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java index 0498069a6bfb..9ee5de32fe7d 100644 --- a/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/GenericAvroWriter.java @@ -21,14 +21,15 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -class GenericAvroWriter implements DatumWriter { +class GenericAvroWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; GenericAvroWriter(Schema schema) { @@ -46,6 +47,11 @@ public void write(T datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroSchemaVisitor> { private WriteBuilder() { } diff --git a/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java b/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java new file mode 100644 index 000000000000..08aa194993ba --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/avro/MetricsAwareDatumWriter.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.avro; + +import java.util.stream.Stream; +import org.apache.avro.io.DatumWriter; +import org.apache.iceberg.FieldMetrics; + +/** + * Wrapper writer around {@link DatumWriter} with metrics support. + */ +public interface MetricsAwareDatumWriter extends DatumWriter { + + /** + * Returns a stream of {@link FieldMetrics} that this MetricsAwareDatumWriter keeps track of. + */ + Stream metrics(); +} diff --git a/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java b/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java index 1753adfdae1f..5059d125658e 100644 --- a/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java +++ b/core/src/main/java/org/apache/iceberg/avro/ValueWriter.java @@ -20,8 +20,14 @@ package org.apache.iceberg.avro; import java.io.IOException; +import java.util.stream.Stream; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; public interface ValueWriter { void write(D datum, Encoder encoder) throws IOException; + + default Stream metrics() { + return Stream.empty(); // TODO will populate in following PRs + } } diff --git a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java index aa6cc6fcd339..bf4d9e6cdd19 100644 --- a/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java +++ b/core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java @@ -21,19 +21,21 @@ import java.io.IOException; import java.util.List; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.avro.AvroSchemaVisitor; import org.apache.iceberg.avro.LogicalMap; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class DataWriter implements DatumWriter { +public class DataWriter implements MetricsAwareDatumWriter { private ValueWriter writer = null; public static DataWriter create(Schema schema) { @@ -59,6 +61,11 @@ protected ValueWriter createStructWriter(List> fields) { return GenericWriters.struct(fields); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private class WriteBuilder extends AvroSchemaVisitor> { @Override public ValueWriter record(Schema record, List names, List> fields) { diff --git a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java index b9b77fd571b5..b764567fda83 100644 --- a/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java +++ b/data/src/main/java/org/apache/iceberg/data/GenericAppenderFactory.java @@ -92,6 +92,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat fileFo return Avro.write(outputFile) .schema(schema) .createWriterFunc(DataWriter::create) + .metricsConfig(metricsConfig) .setAll(config) .overwrite() .build(); diff --git a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java index 0dc4e039f46c..b069a35d3bbb 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java +++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkAvroWriter.java @@ -23,18 +23,20 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.LogicalType; import org.apache.flink.table.types.logical.RowType; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class FlinkAvroWriter implements DatumWriter { +public class FlinkAvroWriter implements MetricsAwareDatumWriter { private final RowType rowType; private ValueWriter writer = null; @@ -54,6 +56,11 @@ public void write(RowData datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroWithFlinkSchemaVisitor> { @Override public ValueWriter record(LogicalType struct, Schema record, List names, List> fields) { diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java index 1ddf0929fbdc..59d7f76c92aa 100644 --- a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java +++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkAppenderFactory.java @@ -102,6 +102,7 @@ public FileAppender newAppender(OutputFile outputFile, FileFormat forma .createWriterFunc(ignore -> new FlinkAvroWriter(flinkSchema)) .setAll(props) .schema(schema) + .metricsConfig(metricsConfig) .overwrite() .build(); diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java index b1625d7df9b1..7582125128a7 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkAvroWriter.java @@ -23,11 +23,13 @@ import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; -import org.apache.avro.io.DatumWriter; import org.apache.avro.io.Encoder; +import org.apache.iceberg.FieldMetrics; +import org.apache.iceberg.avro.MetricsAwareDatumWriter; import org.apache.iceberg.avro.ValueWriter; import org.apache.iceberg.avro.ValueWriters; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -37,7 +39,7 @@ import org.apache.spark.sql.types.ShortType; import org.apache.spark.sql.types.StructType; -public class SparkAvroWriter implements DatumWriter { +public class SparkAvroWriter implements MetricsAwareDatumWriter { private final StructType dsSchema; private ValueWriter writer = null; @@ -57,6 +59,11 @@ public void write(InternalRow datum, Encoder out) throws IOException { writer.write(datum, out); } + @Override + public Stream metrics() { + return writer.metrics(); + } + private static class WriteBuilder extends AvroWithSparkSchemaVisitor> { @Override public ValueWriter record(DataType struct, Schema record, List names, List> fields) {