Skip to content

Commit

Permalink
create MetricsAwareDatumWriter and some refactors for Avro
Browse files Browse the repository at this point in the history
  • Loading branch information
yyanyy committed Jan 5, 2021
1 parent 0f2a164 commit 6b6a15a
Show file tree
Hide file tree
Showing 12 changed files with 185 additions and 17 deletions.
28 changes: 28 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ public static Comparator<ByteBuffer> unsignedBytes() {
return UnsignedByteBufComparator.INSTANCE;
}

public static Comparator<byte[]> unsignedByteArrays() {
return UnsignedByteArrayComparator.INSTANCE;
}

public static Comparator<ByteBuffer> signedBytes() {
return Comparator.naturalOrder();
}
Expand Down Expand Up @@ -272,6 +276,30 @@ public int compare(ByteBuffer buf1, ByteBuffer buf2) {
}
}

private static class UnsignedByteArrayComparator implements Comparator<byte[]> {
private static final UnsignedByteArrayComparator INSTANCE = new UnsignedByteArrayComparator();

private UnsignedByteArrayComparator() {
}

@Override
public int compare(byte[] array1, byte[] array2) {
int len = Math.min(array1.length, array2.length);

// find the first difference and return
for (int i = 0; i < len; i += 1) {
// Conversion to int is what Byte.toUnsignedInt would do
int cmp = Integer.compare(((int) array1[i]) & 0xff, ((int) array2[i]) & 0xff);
if (cmp != 0) {
return cmp;
}
}

// if there are no differences, then the shorter seq is first
return Integer.compare(array1.length, array2.length);
}
}

private static class CharSeqComparator implements Comparator<CharSequence> {
private static final CharSeqComparator INSTANCE = new CharSeqComparator();

Expand Down
26 changes: 23 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
Expand All @@ -37,7 +38,9 @@
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.specific.SpecificData;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.StructLike;
Expand Down Expand Up @@ -102,6 +105,7 @@ public static class WriteBuilder {
private String name = "table";
private Function<Schema, DatumWriter<?>> createWriterFunc = null;
private boolean overwrite;
private MetricsConfig metricsConfig;

private WriteBuilder(OutputFile file) {
this.file = file;
Expand Down Expand Up @@ -148,6 +152,11 @@ public WriteBuilder meta(Map<String, String> properties) {
return this;
}

public WriteBuilder metricsConfig(MetricsConfig newMetricsConfig) {
this.metricsConfig = newMetricsConfig;
return this;
}

public WriteBuilder overwrite() {
return overwrite(true);
}
Expand Down Expand Up @@ -181,7 +190,7 @@ public <D> FileAppender<D> build() throws IOException {
meta("iceberg.schema", SchemaParser.toJson(schema));

return new AvroFileAppender<>(
AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, overwrite);
schema, AvroSchemaUtil.convert(schema, name), file, writerFunc, codec(), metadata, metricsConfig, overwrite);
}
}

Expand Down Expand Up @@ -320,7 +329,7 @@ public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes.
*/
private static class PositionDatumWriter implements DatumWriter<PositionDelete<?>> {
private static class PositionDatumWriter implements MetricsAwareDatumWriter<PositionDelete<?>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();

Expand All @@ -333,16 +342,22 @@ public void write(PositionDelete<?> delete, Encoder out) throws IOException {
PATH_WRITER.write(delete.path(), out);
POS_WRITER.write(delete.pos(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
}
}

/**
* A {@link DatumWriter} implementation that wraps another to produce position deletes with row data.
*
* @param <D> the type of datum written as a deleted row
*/
private static class PositionAndRowDatumWriter<D> implements DatumWriter<PositionDelete<D>> {
private static class PositionAndRowDatumWriter<D> implements MetricsAwareDatumWriter<PositionDelete<D>> {
private static final ValueWriter<Object> PATH_WRITER = ValueWriters.strings();
private static final ValueWriter<Long> POS_WRITER = ValueWriters.longs();

private final DatumWriter<D> rowWriter;

private PositionAndRowDatumWriter(DatumWriter<D> rowWriter) {
Expand All @@ -363,6 +378,11 @@ public void write(PositionDelete<D> delete, Encoder out) throws IOException {
POS_WRITER.write(delete.pos(), out);
rowWriter.write(delete.row(), out);
}

@Override
public Stream<FieldMetrics> metrics() {
return Stream.concat(PATH_WRITER.metrics(), POS_WRITER.metrics());
}
}

public static ReadBuilder read(InputFile file) {
Expand Down
25 changes: 19 additions & 6 deletions core/src/main/java/org/apache/iceberg/avro/AvroFileAppender.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,31 @@
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.io.PositionOutputStream;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class AvroFileAppender<D> implements FileAppender<D> {
private PositionOutputStream stream = null;
private DataFileWriter<D> writer = null;
private DatumWriter<?> datumWriter = null;
private org.apache.iceberg.Schema icebergSchema;
private MetricsConfig metricsConfig;
private long numRecords = 0L;
private boolean isClosed = false;

AvroFileAppender(Schema schema, OutputFile file,
AvroFileAppender(org.apache.iceberg.Schema icebergSchema, Schema schema, OutputFile file,
Function<Schema, DatumWriter<?>> createWriterFunc,
CodecFactory codec, Map<String, String> metadata,
boolean overwrite) throws IOException {
MetricsConfig metricsConfig, boolean overwrite) throws IOException {
this.icebergSchema = icebergSchema;
this.stream = overwrite ? file.createOrOverwrite() : file.create();
this.writer = newAvroWriter(schema, stream, createWriterFunc, codec, metadata);
this.datumWriter = createWriterFunc.apply(schema);
this.writer = newAvroWriter(schema, stream, datumWriter, codec, metadata);
this.metricsConfig = metricsConfig;
}

@Override
Expand All @@ -57,7 +66,10 @@ public void add(D datum) {

@Override
public Metrics metrics() {
return new Metrics(numRecords, null, null, null);
Preconditions.checkState(isClosed,
"Cannot return metrics while appending to an open file.");

return AvroMetrics.fromWriter(datumWriter, icebergSchema, numRecords, metricsConfig);
}

@Override
Expand All @@ -77,15 +89,16 @@ public void close() throws IOException {
if (writer != null) {
writer.close();
this.writer = null;
isClosed = true;
}
}

@SuppressWarnings("unchecked")
private static <D> DataFileWriter<D> newAvroWriter(
Schema schema, PositionOutputStream stream, Function<Schema, DatumWriter<?>> createWriterFunc,
Schema schema, PositionOutputStream stream, DatumWriter<?> metricsAwareDatumWriter,
CodecFactory codec, Map<String, String> metadata) throws IOException {
DataFileWriter<D> writer = new DataFileWriter<>(
(DatumWriter<D>) createWriterFunc.apply(schema));
(DatumWriter<D>) metricsAwareDatumWriter);

writer.setCodec(codec);

Expand Down
37 changes: 37 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/AvroMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.Metrics;
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.Schema;

public class AvroMetrics {

private AvroMetrics() {
}

static Metrics fromWriter(DatumWriter<?> datumWriter, Schema schema, long numRecords,
MetricsConfig inputMetricsConfig) {
// TODO will populate in following PRs if datum writer is a MetricsAwareDatumWriter
return new Metrics(numRecords, null, null, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

class GenericAvroWriter<T> implements DatumWriter<T> {
class GenericAvroWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

GenericAvroWriter(Schema schema) {
Expand All @@ -46,6 +47,11 @@ public void write(T datum, Encoder out) throws IOException {
writer.write(datum, out);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private static class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
private WriteBuilder() {
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.avro;

import java.util.stream.Stream;
import org.apache.avro.io.DatumWriter;
import org.apache.iceberg.FieldMetrics;

/**
* Wrapper writer around {@link DatumWriter} with metrics support.
*/
public interface MetricsAwareDatumWriter<D> extends DatumWriter<D> {

/**
* Returns a stream of {@link FieldMetrics} that this MetricsAwareDatumWriter keeps track of.
*/
Stream<FieldMetrics> metrics();
}
6 changes: 6 additions & 0 deletions core/src/main/java/org/apache/iceberg/avro/ValueWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,14 @@
package org.apache.iceberg.avro;

import java.io.IOException;
import java.util.stream.Stream;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;

public interface ValueWriter<D> {
void write(D datum, Encoder encoder) throws IOException;

default Stream<FieldMetrics> metrics() {
return Stream.empty(); // TODO will populate in following PRs
}
}
11 changes: 9 additions & 2 deletions core/src/main/java/org/apache/iceberg/data/avro/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,21 @@

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaVisitor;
import org.apache.iceberg.avro.LogicalMap;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class DataWriter<T> implements DatumWriter<T> {
public class DataWriter<T> implements MetricsAwareDatumWriter<T> {
private ValueWriter<T> writer = null;

public static <D> DataWriter<D> create(Schema schema) {
Expand All @@ -59,6 +61,11 @@ protected ValueWriter<?> createStructWriter(List<ValueWriter<?>> fields) {
return GenericWriters.struct(fields);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private class WriteBuilder extends AvroSchemaVisitor<ValueWriter<?>> {
@Override
public ValueWriter<?> record(Schema record, List<String> names, List<ValueWriter<?>> fields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public FileAppender<Record> newAppender(OutputFile outputFile, FileFormat fileFo
return Avro.write(outputFile)
.schema(schema)
.createWriterFunc(DataWriter::create)
.metricsConfig(metricsConfig)
.setAll(config)
.overwrite()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.FieldMetrics;
import org.apache.iceberg.avro.MetricsAwareDatumWriter;
import org.apache.iceberg.avro.ValueWriter;
import org.apache.iceberg.avro.ValueWriters;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public class FlinkAvroWriter implements DatumWriter<RowData> {
public class FlinkAvroWriter implements MetricsAwareDatumWriter<RowData> {
private final RowType rowType;
private ValueWriter<RowData> writer = null;

Expand All @@ -54,6 +56,11 @@ public void write(RowData datum, Encoder out) throws IOException {
writer.write(datum, out);
}

@Override
public Stream<FieldMetrics> metrics() {
return writer.metrics();
}

private static class WriteBuilder extends AvroWithFlinkSchemaVisitor<ValueWriter<?>> {
@Override
public ValueWriter<?> record(LogicalType struct, Schema record, List<String> names, List<ValueWriter<?>> fields) {
Expand Down
Loading

0 comments on commit 6b6a15a

Please sign in to comment.