Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro metrics support: create MetricsAwareDatumWriter and some refactors for Avro #1946

Merged
merged 2 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public static Comparator<ByteBuffer> unsignedBytes() {
return UnsignedByteBufComparator.INSTANCE;
}

public static Comparator<byte[]> unsignedByteArrays() {
return UnsignedByteArrayComparator.INSTANCE;
}

public static Comparator<ByteBuffer> signedBytes() {
return Comparator.naturalOrder();
}
Expand Down Expand Up @@ -267,11 +271,35 @@ public int compare(ByteBuffer buf1, ByteBuffer buf2) {
}
}

// if there are no differences, then the shorter seq is first
// if there are no differences, then the shorter seq is smaller
return Integer.compare(buf1.remaining(), buf2.remaining());
}
}

private static class UnsignedByteArrayComparator implements Comparator<byte[]> {
private static final UnsignedByteArrayComparator INSTANCE = new UnsignedByteArrayComparator();

private UnsignedByteArrayComparator() {
}

@Override
public int compare(byte[] array1, byte[] array2) {
int len = Math.min(array1.length, array2.length);

// find the first difference and return
for (int i = 0; i < len; i += 1) {
// Conversion to int is what Byte.toUnsignedInt would do
int cmp = Integer.compare(((int) array1[i]) & 0xff, ((int) array2[i]) & 0xff);
if (cmp != 0) {
return cmp;
}
}

// if there are no differences, then the shorter seq is smaller
return Integer.compare(array1.length, array2.length);
}
}

private static class CharSeqComparator implements Comparator<CharSequence> {
private static final CharSeqComparator INSTANCE = new CharSeqComparator();

Expand Down
26 changes: 23 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -37,7 +38,9 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -102,6 +105,7 @@ public static class WriteBuilder {
private String name = "table";
private Function<Schema, DatumWriter<?>> createWriterFunc = null;
private boolean overwrite;
private MetricsConfig metricsConfig;

private WriteBuilder(OutputFile file) {
this.file = file;
Expand Down Expand Up @@ -148,6 +152,11 @@ public WriteBuilder meta(Map<String, String> properties) {
return this;
}

public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
this.metricsConfig = newMetricsConfig;
return this;
}

public WriteBuilder overwrite() {
return overwrite(true);
}
Expand Down Expand Up @@ -181,7 +190,7 @@ public <D> FileAppender<D> build() throws IOException {
meta("iceberg.schema", SchemaParser.toJson(schema));

return new AvroFileAppender<>(
AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
schema, AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, metricsConfig, overwrite);
}
}

Expand Down Expand Up @@ -320,7 +329,7 @@ public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes.
*/
private static class PositionDatumWriter implements DatumWriter<PositionDelete<?>> {
private static class PositionDatumWriter implements MetricsAwareDatumWriter<PositionDelete<?>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();

Expand All @@ -333,16 +342,22 @@ public void write(PositionDelete<?> delete, Encoder out) throws IOException {
PATH_WRITER.write(delete.path(), out);
POS_WRITER.write(delete.pos(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
}
}

/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes with row data.
*
* @param <D> the type of datum written as a deleted row
*/
private static class PositionAndRowDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
private static class PositionAndRowDatumWriter<D> implements MetricsAwareDatumWriter<PositionDelete<D>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();

private final DatumWriter<D> rowWriter;

private PositionAndRowDatumWriter(DatumWriter<D> rowWriter) {
Expand All @@ -363,6 +378,11 @@ public void write(PositionDelete<D> delete, Encoder out) throws IOException {
POS_WRITER.write(delete.pos(), out);
rowWriter.write(delete.row(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should also include metrics from the rowWriter, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be fixed in a follow-up.

}
}

public static ReadBuilder read(InputFile file) {
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,31 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class AvroFileAppender<D> implements FileAppender<D> {
private PositionOutputStream stream = null;
private DataFileWriter<D> writer = null;
private DatumWriter<?> datumWriter = null;
private org.apache.iceberg.Schema icebergSchema;
private MetricsConfig metricsConfig;
private long numRecords = 0L;
private boolean isClosed = false;

AvroFileAppender(Schema schema, OutputFile file,
AvroFileAppender(org.apache.iceberg.Schema icebergSchema, Schema schema, OutputFile file,
rdblue marked this conversation as resolved.
Show resolved Hide resolved
Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata,
boolean overwrite) throws IOException {
MetricsConfig metricsConfig, boolean overwrite) throws IOException {
this.icebergSchema = icebergSchema;
this.stream = overwrite ? file.createOrOverwrite() : file.create();
this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
this.datumWriter = createWriterFunc.apply(schema);
this.writer = newAvroWriter(schema, stream, datumWriter, codec, metadata);
this.metricsConfig = metricsConfig;
}

@Override
Expand All @@ -57,7 +66,10 @@ public void add(D datum) {

@Override
public Metrics metrics() {
return new Metrics(numRecords, null, null, null);
Preconditions.checkState(isClosed,
"Cannot return metrics while appending to an open file.");
rdblue marked this conversation as resolved.
Show resolved Hide resolved

return AvroMetrics.fromWriter(datumWriter, icebergSchema, numRecords, metricsConfig);
}

@Override
Expand All @@ -77,15 +89,16 @@ public void close() throws IOException {
if (writer != null) {
writer.close();
this.writer = null;
isClosed = true;
}
}

@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
Schema schema, PositionOutputStream stream, DatumWriter<?> metricsAwareDatumWriter,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) createWriterFunc.apply(schema));
(DatumWriter<D>) metricsAwareDatumWriter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I don't think this needs to be a MetricsAwareDatumWriter, right? It isn't in the type signature, so we should name it just datumWriter.


writer.setCodec(codec);

Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;

public class AvroMetrics {

private AvroMetrics() {
}

static Metrics fromWriter(DatumWriter<?> datumWriter, Schema schema, long numRecords,
MetricsConfig inputMetricsConfig) {
// TODO will populate in following PRs if datum writer is a MetricsAwareDatumWriter
return new Metrics(numRecords, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class GenericAvroWriter<T> implements DatumWriter<T> {
class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

GenericAvroWriter(Schema schema) {
Expand All @@ -46,6 +47,11 @@ public void write(T datum, Encoder out) throws IOException {
writer.write(datum, out);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
private WriteBuilder() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import java.util.stream.Stream;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.FieldMetrics;

/**
* Wrapper writer around {@link DatumWriter} with metrics support.
*/
public interface MetricsAwareDatumWriter<D> extends DatumWriter<D> {

/**
* Returns a stream of {@link FieldMetrics} that this MetricsAwareDatumWriter keeps track of.
*/
Stream<FieldMetrics> metrics();
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ValueWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
package org.apache.iceberg.avro;

import java.io.IOException;
import java.util.stream.Stream;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;

public interface ValueWriter<D> {
void write(D datum, Encoder encoder) throws IOException;

default Stream<FieldMetrics> metrics() {
return Stream.empty(); // TODO will populate in following PRs
}
}
11 changes: 9 additions & 2 deletions core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.LogicalMap;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataWriter<T> implements DatumWriter<T> {
public class DataWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

public static <D> DataWriter<D> create(Schema schema) {
Expand All @@ -59,6 +61,11 @@ protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
return GenericWriters.struct(fields);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
return Avro.write(outputFile)
.schema(schema)
.createWriterFunc(DataWriter::create)
.metricsConfig(metricsConfig)
.setAll(config)
.overwrite()
.build();
Expand Down
Loading