Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro metrics support #1935

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public static Comparator<ByteBuffer> unsignedBytes() {
return UnsignedByteBufComparator.INSTANCE;
}

public static Comparator<byte[]> unsignedByteArray() {
return UnsignedByteArrayComparator.INSTANCE;
}

public static Comparator<ByteBuffer> signedBytes() {
return Comparator.naturalOrder();
}
Expand Down Expand Up @@ -272,6 +276,30 @@ public int compare(ByteBuffer buf1, ByteBuffer buf2) {
}
}

private static class UnsignedByteArrayComparator implements Comparator<byte[]> {
private static final UnsignedByteArrayComparator INSTANCE = new UnsignedByteArrayComparator();

private UnsignedByteArrayComparator() {
}

@Override
public int compare(byte[] array1, byte[] array2) {
int len = Math.min(array1.length, array2.length);

// find the first difference and return
for (int i = 0; i < len; i += 1) {
// Conversion to int is what Byte.toUnsignedInt would do
int cmp = Integer.compare(((int) array1[i]) & 0xff, ((int) array2[i]) & 0xff);
if (cmp != 0) {
return cmp;
}
}

// if there are no differences, then the shorter seq is first
return Integer.compare(array1.length, array2.length);
}
}

private static class CharSeqComparator implements Comparator<CharSequence> {
private static final CharSeqComparator INSTANCE = new CharSeqComparator();

Expand Down
14 changes: 6 additions & 8 deletions core/src/main/java/org/apache/iceberg/FieldMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
package org.apache.iceberg;


import java.nio.ByteBuffer;

/**
* Iceberg internally tracked field level metrics.
*/
Expand All @@ -30,15 +28,15 @@ public class FieldMetrics {
private final long valueCount;
private final long nullValueCount;
private final long nanValueCount;
private final ByteBuffer lowerBound;
private final ByteBuffer upperBound;
private final Object lowerBound;
private final Object upperBound;

public FieldMetrics(int id,
long valueCount,
long nullValueCount,
long nanValueCount,
ByteBuffer lowerBound,
ByteBuffer upperBound) {
Object lowerBound,
Object upperBound) {
this.id = id;
this.valueCount = valueCount;
this.nullValueCount = nullValueCount;
Expand Down Expand Up @@ -78,14 +76,14 @@ public long nanValueCount() {
/**
* Returns the lower bound value of this field.
*/
public ByteBuffer lowerBound() {
public Object lowerBound() {
return lowerBound;
}

/**
* Returns the upper bound value of this field.
*/
public ByteBuffer upperBound() {
public Object upperBound() {
return upperBound;
}
}
49 changes: 37 additions & 12 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -37,7 +38,10 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -100,8 +104,9 @@ public static class WriteBuilder {
private final Map<String, String> metadata = Maps.newLinkedHashMap();
private org.apache.iceberg.Schema schema = null;
private String name = "table";
private Function<Schema, DatumWriter<?>> createWriterFunc = null;
private Function<Schema, MetricsAwareDatumWriter<?>> createWriterFunc = null;
private boolean overwrite;
private MetricsConfig metricsConfig;

private WriteBuilder(OutputFile file) {
this.file = file;
Expand All @@ -123,7 +128,7 @@ public WriteBuilder named(String newName) {
return this;
}

public WriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
public WriteBuilder createWriterFunc(Function<Schema, MetricsAwareDatumWriter<?>> writerFunction) {
this.createWriterFunc = writerFunction;
return this;
}
Expand All @@ -148,6 +153,11 @@ public WriteBuilder meta(Map<String, String> properties) {
return this;
}

public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
this.metricsConfig = newMetricsConfig;
return this;
}

public WriteBuilder overwrite() {
return overwrite(true);
}
Expand All @@ -170,7 +180,7 @@ public <D> FileAppender<D> build() throws IOException {
Preconditions.checkNotNull(schema, "Schema is required");
Preconditions.checkNotNull(name, "Table name is required and cannot be null");

Function<Schema, DatumWriter<?>> writerFunc;
Function<Schema, MetricsAwareDatumWriter<?>> writerFunc;
if (createWriterFunc != null) {
writerFunc = createWriterFunc;
} else {
Expand All @@ -181,7 +191,7 @@ public <D> FileAppender<D> build() throws IOException {
meta("iceberg.schema", SchemaParser.toJson(schema));

return new AvroFileAppender<>(
AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
schema, AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, metricsConfig, overwrite);
}
}

Expand All @@ -192,7 +202,7 @@ public static DeleteWriteBuilder writeDeletes(OutputFile file) {
public static class DeleteWriteBuilder {
private final WriteBuilder appenderBuilder;
private final String location;
private Function<Schema, DatumWriter<?>> createWriterFunc = null;
private Function<Schema, MetricsAwareDatumWriter<?>> createWriterFunc = null;
private org.apache.iceberg.Schema rowSchema;
private PartitionSpec spec;
private StructLike partition;
Expand Down Expand Up @@ -240,7 +250,7 @@ public DeleteWriteBuilder overwrite(boolean enabled) {
return this;
}

public DeleteWriteBuilder createWriterFunc(Function<Schema, DatumWriter<?>> writerFunction) {
public DeleteWriteBuilder createWriterFunc(Function<Schema, MetricsAwareDatumWriter<?>> writerFunction) {
this.createWriterFunc = writerFunction;
return this;
}
Expand Down Expand Up @@ -320,9 +330,11 @@ public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes.
*/
private static class PositionDatumWriter implements DatumWriter<PositionDelete<?>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
private static class PositionDatumWriter implements MetricsAwareDatumWriter<PositionDelete<?>> {
private static final ValueWriter<CharSequence> PATH_WRITER =
ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId());
private static final ValueWriter<Long> POS_WRITER =
ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId());

@Override
public void setSchema(Schema schema) {
Expand All @@ -333,16 +345,24 @@ public void write(PositionDelete<?> delete, Encoder out) throws IOException {
PATH_WRITER.write(delete.path(), out);
POS_WRITER.write(delete.pos(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
}
}

/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes with row data.
*
* @param <D> the type of datum written as a deleted row
*/
private static class PositionAndRowDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();
private static class PositionAndRowDatumWriter<D> implements MetricsAwareDatumWriter<PositionDelete<D>> {
private static final ValueWriter<CharSequence> PATH_WRITER =
ValueWriters.strings(MetadataColumns.DELETE_FILE_PATH.fieldId());
private static final ValueWriter<Long> POS_WRITER =
ValueWriters.longs(MetadataColumns.DELETE_FILE_POS.fieldId());

private final DatumWriter<D> rowWriter;

private PositionAndRowDatumWriter(DatumWriter<D> rowWriter) {
Expand All @@ -363,6 +383,11 @@ public void write(PositionDelete<D> delete, Encoder out) throws IOException {
POS_WRITER.write(delete.pos(), out);
rowWriter.write(delete.row(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
}
}

public static ReadBuilder read(InputFile file) {
Expand Down
27 changes: 20 additions & 7 deletions core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,31 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class AvroFileAppender<D> implements FileAppender<D> {
private PositionOutputStream stream = null;
private DataFileWriter<D> writer = null;
private MetricsAwareDatumWriter<?> metricsAwareDatumWriter = null;
private org.apache.iceberg.Schema icebergSchema;
private MetricsConfig metricsConfig;
private long numRecords = 0L;
private boolean isClosed = false;

AvroFileAppender(Schema schema, OutputFile file,
Function<Schema, DatumWriter<?>> createWriterFunc,
AvroFileAppender(org.apache.iceberg.Schema icebergSchema, Schema schema, OutputFile file,
Function<Schema, MetricsAwareDatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata,
boolean overwrite) throws IOException {
MetricsConfig metricsConfig, boolean overwrite) throws IOException {
this.icebergSchema = icebergSchema;
this.stream = overwrite ? file.createOrOverwrite() : file.create();
this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
this.metricsAwareDatumWriter = createWriterFunc.apply(schema);
this.writer = newAvroWriter(schema, stream, metricsAwareDatumWriter, codec, metadata);
this.metricsConfig = metricsConfig;
}

@Override
Expand All @@ -57,7 +66,10 @@ public void add(D datum) {

@Override
public Metrics metrics() {
return new Metrics(numRecords, null, null, null);
Preconditions.checkState(isClosed,
"Cannot return metrics while appending to an open file.");

return AvroMetrics.fromWriter(metricsAwareDatumWriter, icebergSchema, numRecords, metricsConfig);
}

@Override
Expand All @@ -77,15 +89,16 @@ public void close() throws IOException {
if (writer != null) {
writer.close();
this.writer = null;
isClosed = true;
}
}

@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
Schema schema, PositionOutputStream stream, MetricsAwareDatumWriter<?> metricsAwareDatumWriter,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) createWriterFunc.apply(schema));
(DatumWriter<D>) metricsAwareDatumWriter);

writer.setCodec(codec);

Expand Down
Loading