Skip to content

Commit

Permalink
Dynamically support Spark native engine in Iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
Huaxin Gao committed Feb 13, 2024
1 parent 50fb400 commit be8c23c
Show file tree
Hide file tree
Showing 17 changed files with 472 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.iceberg.arrow.vectorized;

import java.util.List;
import org.apache.iceberg.parquet.BaseBatchReader;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

Expand All @@ -27,9 +28,17 @@
* holders. This class owns the Arrow vectors and is responsible for closing the Arrow vectors.
*/
class ArrowBatchReader extends BaseBatchReader<ColumnarBatch> {
private VectorHolder[] vectorHolders;

ArrowBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
ArrowBatchReader() {}

@Override
public void initialize(List<VectorizedReader<?>> readers) {
this.readers =
readers.stream()
.map(VectorizedArrowReader.class::cast)
.toArray(VectorizedArrowReader[]::new);
this.vectorHolders = new VectorHolder[readers.size()];
}

@Override
Expand All @@ -43,7 +52,7 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {

ColumnVector[] columnVectors = new ColumnVector[readers.length];
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
vectorHolders[i] = ((VectorizedArrowReader) readers[i]).read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
Expand All @@ -55,4 +64,22 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) {
}
return new ColumnarBatch(numRowsToRead, columnVectors);
}

protected void closeVectors() {
for (int i = 0; i < vectorHolders.length; i++) {
if (vectorHolders[i] != null) {
// Release any resources used by the vector
if (vectorHolders[i].vector() != null) {
vectorHolders[i].vector().close();
}
vectorHolders[i] = null;
}
}
}

@Override
public void close() {
super.close();
closeVectors();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,20 @@ private InputFile getInputFile(FileScanTask task) {
*/
private static ArrowBatchReader buildReader(
Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) {
VectorizedReaderBuilder vectorizedReaderBuilder = new VectorizedReaderBuilder();
vectorizedReaderBuilder.initialize(
expectedSchema,
fileSchema,
setArrowValidityVector,
ImmutableMap.of(),
readers -> {
ArrowBatchReader batchReader = new ArrowBatchReader();
batchReader.initialize(readers);
return batchReader;
});
return (ArrowBatchReader)
TypeWithSchemaVisitor.visit(
expectedSchema.asStruct(),
fileSchema,
new VectorizedReaderBuilder(
expectedSchema,
fileSchema,
setArrowValidityVector,
ImmutableMap.of(),
ArrowBatchReader::new));
expectedSchema.asStruct(), fileSchema, vectorizedReaderBuilder);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,18 @@
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;

@SuppressWarnings({"VisibilityModifier", "checkstyle:HiddenField"})
public class VectorizedReaderBuilder extends TypeWithSchemaVisitor<VectorizedReader<?>> {
private final MessageType parquetSchema;
private final Schema icebergSchema;
private final BufferAllocator rootAllocator;
private final Map<Integer, ?> idToConstant;
private final boolean setArrowValidityVector;
private final Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory;
protected MessageType parquetSchema;
protected Schema icebergSchema;
protected BufferAllocator rootAllocator;
protected Map<Integer, ?> idToConstant;
protected boolean setArrowValidityVector;
protected Function<List<VectorizedReader<?>>, VectorizedReader<?>> readerFactory;

public VectorizedReaderBuilder(
public VectorizedReaderBuilder() {}

public void initialize(
Schema expectedSchema,
MessageType parquetSchema,
boolean setArrowValidityVector,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.parquet;

import java.util.List;
import java.util.Map;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;

/** A base BatchReader class that contains common functionality */
@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class BaseBatchReader<T> implements VectorizedReader<T> {
protected VectorizedReader[] readers;

public BaseBatchReader() {}

public void initialize(List<VectorizedReader<?>> readerList) {}

@Override
public void setRowGroupInfo(
PageReadStore pageStore, Map<ColumnPath, ColumnChunkMetaData> metaData, long rowPosition) {
for (VectorizedReader reader : readers) {
if (reader != null) {
reader.setRowGroupInfo(pageStore, metaData, rowPosition);
}
}
}

@Override
public void close() {
for (VectorizedReader<?> reader : readers) {
if (reader != null) {
reader.close();
}
}
}

@Override
public void setBatchSize(int batchSize) {
for (VectorizedReader reader : readers) {
if (reader != null) {
reader.setBatchSize(batchSize);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ class SparkConfParser {
this.options = options;
}

public RuntimeConfig getSessionConf() {
return sessionConf;
}

public BooleanConfParser booleanConf() {
return new BooleanConfParser();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.iceberg.PlanningMode.LOCAL;

import java.util.Map;
import java.util.Properties;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -270,6 +271,38 @@ public boolean preserveDataGrouping() {
.parse();
}

public String getCustomizedVectorizationImpl() {
return confParser
.stringConf()
.sessionConf(SparkSQLProperties.CUSTOMIZED_VECTORIZATION_IMPL)
.defaultValue("")
.parse();
}

public String getCustomizedVectorizationPropertyPrefix() {
return confParser
.stringConf()
.sessionConf(SparkSQLProperties.CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX)
.defaultValue("")
.parse();
}

public Properties getCustomizedVectorizationProperties() {
Properties properties = new Properties();

java.util.Map<String, String> map =
scala.collection.JavaConverters.mapAsJavaMapConverter(confParser.getSessionConf().getAll())
.asJava();

for (Map.Entry<String, String> entry : map.entrySet()) {
if (entry.getKey().contains(getCustomizedVectorizationPropertyPrefix())) {
properties.put(entry.getKey(), entry.getValue());
}
}

return properties;
}

public boolean aggregatePushDownEnabled() {
return confParser
.booleanConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ private SparkSQLProperties() {}
// Controls whether vectorized reads are enabled
public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled";

public static final String CUSTOMIZED_VECTORIZATION_IMPL =
"spark.sql.iceberg.customized.vectorization.impl";

public static final String CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX =
"spark.sql.iceberg.customized.vectorization.property.prefix";

// Controls whether to perform the nullability check during writes
public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability";
public static final boolean CHECK_NULLABILITY_DEFAULT = true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.data.vectorized;

import java.util.List;
import java.util.Properties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.parquet.BaseBatchReader;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.spark.sql.catalyst.InternalRow;

/** A base ColumnBatchReader class that contains common functionality */
@SuppressWarnings("checkstyle:VisibilityModifier")
public abstract class BaseColumnarBatchReader<T> extends BaseBatchReader<T> {
protected boolean hasIsDeletedColumn;
protected DeleteFilter<InternalRow> deletes = null;
protected long rowStartPosInBatch = 0;

protected Schema schema;

public BaseColumnarBatchReader() {}

public void initialize(
List<VectorizedReader<?>> readers, Schema schema1, Properties properties) {}

public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
this.deletes = deleteFilter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.iceberg.arrow.vectorized.BaseBatchReader;
import java.util.Properties;
import org.apache.iceberg.Schema;
import org.apache.iceberg.arrow.vectorized.VectorHolder;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader;
import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader;
import org.apache.iceberg.data.DeleteFilter;
import org.apache.iceberg.deletes.PositionDeleteIndex;
import org.apache.iceberg.parquet.VectorizedReader;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -41,13 +42,18 @@
* read path. The {@link ColumnarBatch} returned is created by passing in the Arrow vectors
* populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}.
*/
public class ColumnarBatchReader extends BaseBatchReader<ColumnarBatch> {
private final boolean hasIsDeletedColumn;
private DeleteFilter<InternalRow> deletes = null;
private long rowStartPosInBatch = 0;
public class ColumnarBatchReader extends BaseColumnarBatchReader<ColumnarBatch> {
private VectorHolder[] vectorHolders;

public ColumnarBatchReader(List<VectorizedReader<?>> readers) {
super(readers);
public ColumnarBatchReader() {}

@Override
public void initialize(List<VectorizedReader<?>> readers, Schema schema, Properties properties) {
this.readers =
readers.stream()
.map(VectorizedArrowReader.class::cast)
.toArray(VectorizedArrowReader[]::new);
this.vectorHolders = new VectorHolder[readers.size()];
this.hasIsDeletedColumn =
readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader);
}
Expand All @@ -59,8 +65,22 @@ public void setRowGroupInfo(
this.rowStartPosInBatch = rowPosition;
}

public void setDeleteFilter(DeleteFilter<InternalRow> deleteFilter) {
this.deletes = deleteFilter;
protected void closeVectors() {
for (int i = 0; i < vectorHolders.length; i++) {
if (vectorHolders[i] != null) {
// Release any resources used by the vector
if (vectorHolders[i].vector() != null) {
vectorHolders[i].vector().close();
}
vectorHolders[i] = null;
}
}
}

@Override
public void close() {
super.close();
closeVectors();
}

@Override
Expand Down Expand Up @@ -120,7 +140,8 @@ ColumnVector[] readDataToColumnVectors() {

ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder();
for (int i = 0; i < readers.length; i += 1) {
vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead);
vectorHolders[i] =
((VectorizedArrowReader) readers[i]).read(vectorHolders[i], numRowsToRead);
int numRowsInVector = vectorHolders[i].numValues();
Preconditions.checkState(
numRowsInVector == numRowsToRead,
Expand Down
Loading

0 comments on commit be8c23c

Please sign in to comment.