Skip to content

Commit

Permalink
Core: Add a way to set sort order in writer classes (apache#2214)
Browse files Browse the repository at this point in the history
  • Loading branch information
yyanyy authored and aokolnychyi committed Apr 24, 2021
1 parent 1fa858c commit cb2ec36
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 3 deletions.
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.iceberg.MetricsConfig;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.deletes.EqualityDeleteWriter;
Expand Down Expand Up @@ -207,6 +208,7 @@ public static class DeleteWriteBuilder {
private StructLike partition;
private EncryptionKeyMetadata keyMetadata = null;
private int[] equalityFieldIds = null;
private SortOrder sortOrder;

private DeleteWriteBuilder(OutputFile file) {
this.appenderBuilder = write(file);
Expand Down Expand Up @@ -284,6 +286,11 @@ public DeleteWriteBuilder equalityFieldIds(int... fieldIds) {
return this;
}

public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
this.sortOrder = newSortOrder;
return this;
}

public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Expand All @@ -300,7 +307,8 @@ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
appenderBuilder.createWriterFunc(createWriterFunc);

return new EqualityDeleteWriter<>(
appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, equalityFieldIds);
appenderBuilder.build(), FileFormat.AVRO, location, spec, partition, keyMetadata, sortOrder,
equalityFieldIds);
}

public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.io.FileAppender;
Expand All @@ -39,17 +40,19 @@ public class EqualityDeleteWriter<T> implements Closeable {
private final StructLike partition;
private final ByteBuffer keyMetadata;
private final int[] equalityFieldIds;
private final SortOrder sortOrder;
private DeleteFile deleteFile = null;

public EqualityDeleteWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata,
int... equalityFieldIds) {
SortOrder sortOrder, int... equalityFieldIds) {
this.appender = appender;
this.format = format;
this.location = location;
this.spec = spec;
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
this.sortOrder = sortOrder;
this.equalityFieldIds = equalityFieldIds;
}

Expand Down Expand Up @@ -77,6 +80,7 @@ public void close() throws IOException {
.withEncryptionKeyMetadata(keyMetadata)
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withSortOrder(sortOrder)
.build();
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/java/org/apache/iceberg/io/DataWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.encryption.EncryptionKeyMetadata;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -37,16 +38,23 @@ public class DataWriter<T> implements Closeable {
private final PartitionSpec spec;
private final StructLike partition;
private final ByteBuffer keyMetadata;
private final SortOrder sortOrder;
private DataFile dataFile = null;

public DataWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata) {
this(appender, format, location, spec, partition, keyMetadata, null);
}

public DataWriter(FileAppender<T> appender, FileFormat format, String location,
PartitionSpec spec, StructLike partition, EncryptionKeyMetadata keyMetadata, SortOrder sortOrder) {
this.appender = appender;
this.format = format;
this.location = location;
this.spec = spec;
this.partition = partition;
this.keyMetadata = keyMetadata != null ? keyMetadata.buffer() : null;
this.sortOrder = sortOrder;
}

public void add(T row) {
Expand All @@ -69,6 +77,7 @@ public void close() throws IOException {
.withFileSizeInBytes(appender.length())
.withMetrics(appender.metrics())
.withSplitOffsets(appender.splitOffsets())
.withSortOrder(sortOrder)
.build();
}
}
Expand Down
10 changes: 9 additions & 1 deletion parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.avro.AvroSchemaUtil;
Expand Down Expand Up @@ -280,6 +281,7 @@ public static class DeleteWriteBuilder {
private StructLike partition = null;
private EncryptionKeyMetadata keyMetadata = null;
private int[] equalityFieldIds = null;
private SortOrder sortOrder;
private Function<CharSequence, ?> pathTransformFunc = Function.identity();

private DeleteWriteBuilder(OutputFile file) {
Expand Down Expand Up @@ -365,6 +367,11 @@ public DeleteWriteBuilder transformPaths(Function<CharSequence, ?> newPathTransf
return this;
}

public DeleteWriteBuilder withSortOrder(SortOrder newSortOrder) {
this.sortOrder = newSortOrder;
return this;
}

public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
Preconditions.checkState(rowSchema != null, "Cannot create equality delete file without a schema`");
Preconditions.checkState(equalityFieldIds != null, "Cannot create equality delete file without delete field ids");
Expand All @@ -381,7 +388,8 @@ public <T> EqualityDeleteWriter<T> buildEqualityWriter() throws IOException {
appenderBuilder.createWriterFunc(createWriterFunc);

return new EqualityDeleteWriter<>(
appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata, equalityFieldIds);
appenderBuilder.build(), FileFormat.PARQUET, location, spec, partition, keyMetadata,
sortOrder, equalityFieldIds);
}

public <T> PositionDeleteWriter<T> buildPositionWriter() throws IOException {
Expand Down

0 comments on commit cb2ec36

Please sign in to comment.