diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java index 51edf742fc71..bfa30f358330 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowBatchReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.arrow.vectorized; import java.util.List; +import org.apache.iceberg.parquet.BaseBatchReader; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -27,9 +28,17 @@ * holders. This class owns the Arrow vectors and is responsible for closing the Arrow vectors. */ class ArrowBatchReader extends BaseBatchReader { + private VectorHolder[] vectorHolders; - ArrowBatchReader(List> readers) { - super(readers); + ArrowBatchReader() {} + + @Override + public void initialize(List> readers) { + this.readers = + readers.stream() + .map(VectorizedArrowReader.class::cast) + .toArray(VectorizedArrowReader[]::new); + this.vectorHolders = new VectorHolder[readers.size()]; } @Override @@ -43,7 +52,7 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { ColumnVector[] columnVectors = new ColumnVector[readers.length]; for (int i = 0; i < readers.length; i += 1) { - vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); + vectorHolders[i] = ((VectorizedArrowReader) readers[i]).read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); Preconditions.checkState( numRowsInVector == numRowsToRead, @@ -55,4 +64,22 @@ public final ColumnarBatch read(ColumnarBatch reuse, int numRowsToRead) { } return new ColumnarBatch(numRowsToRead, columnVectors); } + + protected void closeVectors() { + for (int i = 0; i < vectorHolders.length; i++) { + if (vectorHolders[i] != null) { + // Release any resources used by the vector + if (vectorHolders[i].vector() != null) { + vectorHolders[i].vector().close(); + } + vectorHolders[i] = null; + } + } + } + + @Override + public void close() { + super.close(); + closeVectors(); + } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java index dd2b1e0221e4..e5abbf4c14ab 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowReader.java @@ -376,16 +376,20 @@ private InputFile getInputFile(FileScanTask task) { */ private static ArrowBatchReader buildReader( Schema expectedSchema, MessageType fileSchema, boolean setArrowValidityVector) { + VectorizedReaderBuilder vectorizedReaderBuilder = new VectorizedReaderBuilder(); + vectorizedReaderBuilder.initialize( + expectedSchema, + fileSchema, + setArrowValidityVector, + ImmutableMap.of(), + readers -> { + ArrowBatchReader batchReader = new ArrowBatchReader(); + batchReader.initialize(readers); + return batchReader; + }); return (ArrowBatchReader) TypeWithSchemaVisitor.visit( - expectedSchema.asStruct(), - fileSchema, - new VectorizedReaderBuilder( - expectedSchema, - fileSchema, - setArrowValidityVector, - ImmutableMap.of(), - ArrowBatchReader::new)); + expectedSchema.asStruct(), fileSchema, vectorizedReaderBuilder); } } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java index 3915ff1f1a32..869a0ec105b7 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedReaderBuilder.java @@ -40,15 +40,18 @@ import org.apache.parquet.schema.PrimitiveType; import org.apache.parquet.schema.Type; +@SuppressWarnings({"VisibilityModifier", "checkstyle:HiddenField"}) public class VectorizedReaderBuilder extends TypeWithSchemaVisitor> { - private final MessageType parquetSchema; - private final Schema icebergSchema; - private final BufferAllocator rootAllocator; - private final Map idToConstant; - private final boolean setArrowValidityVector; - private final Function>, VectorizedReader> readerFactory; + protected MessageType parquetSchema; + protected Schema icebergSchema; + protected BufferAllocator rootAllocator; + protected Map idToConstant; + protected boolean setArrowValidityVector; + protected Function>, VectorizedReader> readerFactory; - public VectorizedReaderBuilder( + public VectorizedReaderBuilder() {} + + public void initialize( Schema expectedSchema, MessageType parquetSchema, boolean setArrowValidityVector, diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/BaseBatchReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/BaseBatchReader.java new file mode 100644 index 000000000000..a8ad322d7897 --- /dev/null +++ b/parquet/src/main/java/org/apache/iceberg/parquet/BaseBatchReader.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.parquet; + +import java.util.List; +import java.util.Map; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; + +/** A base BatchReader class that contains common functionality */ +@SuppressWarnings("checkstyle:VisibilityModifier") +public abstract class BaseBatchReader implements VectorizedReader { + protected VectorizedReader[] readers; + + public BaseBatchReader() {} + + public void initialize(List> readerList) {} + + @Override + public void setRowGroupInfo( + PageReadStore pageStore, Map metaData, long rowPosition) { + for (VectorizedReader reader : readers) { + if (reader != null) { + reader.setRowGroupInfo(pageStore, metaData, rowPosition); + } + } + } + + @Override + public void close() { + for (VectorizedReader reader : readers) { + if (reader != null) { + reader.close(); + } + } + } + + @Override + public void setBatchSize(int batchSize) { + for (VectorizedReader reader : readers) { + if (reader != null) { + reader.setBatchSize(batchSize); + } + } + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java index e3b01b8375b6..491b130cf099 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkConfParser.java @@ -50,6 +50,10 @@ class SparkConfParser { this.options = options; } + public RuntimeConfig getSessionConf() { + return sessionConf; + } + public BooleanConfParser booleanConf() { return new BooleanConfParser(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 2990d981d009..60248ec4dde4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.PlanningMode.LOCAL; import java.util.Map; +import java.util.Properties; import org.apache.iceberg.PlanningMode; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -270,6 +271,38 @@ public boolean preserveDataGrouping() { .parse(); } + public String getCustomizedVectorizationImpl() { + return confParser + .stringConf() + .sessionConf(SparkSQLProperties.CUSTOMIZED_VECTORIZATION_IMPL) + .defaultValue("") + .parse(); + } + + public String getCustomizedVectorizationPropertyPrefix() { + return confParser + .stringConf() + .sessionConf(SparkSQLProperties.CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX) + .defaultValue("") + .parse(); + } + + public Properties getCustomizedVectorizationProperties() { + Properties properties = new Properties(); + + java.util.Map map = + scala.collection.JavaConverters.mapAsJavaMapConverter(confParser.getSessionConf().getAll()) + .asJava(); + + for (Map.Entry entry : map.entrySet()) { + if (entry.getKey().contains(getCustomizedVectorizationPropertyPrefix())) { + properties.put(entry.getKey(), entry.getValue()); + } + } + + return properties; + } + public boolean aggregatePushDownEnabled() { return confParser .booleanConf() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index ea8f6fe0718b..d4d10f80653f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -27,6 +27,12 @@ private SparkSQLProperties() {} // Controls whether vectorized reads are enabled public static final String VECTORIZATION_ENABLED = "spark.sql.iceberg.vectorization.enabled"; + public static final String CUSTOMIZED_VECTORIZATION_IMPL = + "spark.sql.iceberg.customized.vectorization.impl"; + + public static final String CUSTOMIZED_VECTORIZATION_PROPERTY_PREFIX = + "spark.sql.iceberg.customized.vectorization.property.prefix"; + // Controls whether to perform the nullability check during writes public static final String CHECK_NULLABILITY = "spark.sql.iceberg.check-nullability"; public static final boolean CHECK_NULLABILITY_DEFAULT = true; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnarBatchReader.java new file mode 100644 index 000000000000..a5d9fd60d730 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/BaseColumnarBatchReader.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import java.util.List; +import java.util.Properties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.parquet.BaseBatchReader; +import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.spark.sql.catalyst.InternalRow; + +/** A base ColumnBatchReader class that contains common functionality */ +@SuppressWarnings("checkstyle:VisibilityModifier") +public abstract class BaseColumnarBatchReader extends BaseBatchReader { + protected boolean hasIsDeletedColumn; + protected DeleteFilter deletes = null; + protected long rowStartPosInBatch = 0; + + protected Schema schema; + + public BaseColumnarBatchReader() {} + + public void initialize( + List> readers, Schema schema1, Properties properties) {} + + public void setDeleteFilter(DeleteFilter deleteFilter) { + this.deletes = deleteFilter; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java index f07d8c545e35..7f1b5ef282cb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/ColumnarBatchReader.java @@ -21,10 +21,11 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.iceberg.arrow.vectorized.BaseBatchReader; +import java.util.Properties; +import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.vectorized.VectorHolder; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader; import org.apache.iceberg.arrow.vectorized.VectorizedArrowReader.DeletedVectorReader; -import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.deletes.PositionDeleteIndex; import org.apache.iceberg.parquet.VectorizedReader; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; @@ -41,13 +42,18 @@ * read path. The {@link ColumnarBatch} returned is created by passing in the Arrow vectors * populated via delegated read calls to {@linkplain VectorizedArrowReader VectorReader(s)}. */ -public class ColumnarBatchReader extends BaseBatchReader { - private final boolean hasIsDeletedColumn; - private DeleteFilter deletes = null; - private long rowStartPosInBatch = 0; +public class ColumnarBatchReader extends BaseColumnarBatchReader { + private VectorHolder[] vectorHolders; - public ColumnarBatchReader(List> readers) { - super(readers); + public ColumnarBatchReader() {} + + @Override + public void initialize(List> readers, Schema schema, Properties properties) { + this.readers = + readers.stream() + .map(VectorizedArrowReader.class::cast) + .toArray(VectorizedArrowReader[]::new); + this.vectorHolders = new VectorHolder[readers.size()]; this.hasIsDeletedColumn = readers.stream().anyMatch(reader -> reader instanceof DeletedVectorReader); } @@ -59,8 +65,22 @@ public void setRowGroupInfo( this.rowStartPosInBatch = rowPosition; } - public void setDeleteFilter(DeleteFilter deleteFilter) { - this.deletes = deleteFilter; + protected void closeVectors() { + for (int i = 0; i < vectorHolders.length; i++) { + if (vectorHolders[i] != null) { + // Release any resources used by the vector + if (vectorHolders[i].vector() != null) { + vectorHolders[i].vector().close(); + } + vectorHolders[i] = null; + } + } + } + + @Override + public void close() { + super.close(); + closeVectors(); } @Override @@ -120,7 +140,8 @@ ColumnVector[] readDataToColumnVectors() { ColumnVectorBuilder columnVectorBuilder = new ColumnVectorBuilder(); for (int i = 0; i < readers.length; i += 1) { - vectorHolders[i] = readers[i].read(vectorHolders[i], numRowsToRead); + vectorHolders[i] = + ((VectorizedArrowReader) readers[i]).read(vectorHolders[i], numRowsToRead); int numRowsInVector = vectorHolders[i].numValues(); Preconditions.checkState( numRowsInVector == numRowsToRead, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/SparkVectorizedReaderBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/SparkVectorizedReaderBuilder.java new file mode 100644 index 000000000000..d8077abf9daf --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/SparkVectorizedReaderBuilder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.function.Function; +import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.ArrowAllocation; +import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; +import org.apache.iceberg.data.DeleteFilter; +import org.apache.iceberg.parquet.VectorizedReader; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; + +@SuppressWarnings({"VisibilityModifier", "checkstyle:HiddenField"}) +public class SparkVectorizedReaderBuilder extends VectorizedReaderBuilder { + protected DeleteFilter deleteFilter; + + public SparkVectorizedReaderBuilder() {} + + public void initialize( + Schema expectedSchema, + MessageType parquetSchema, + boolean setArrowValidityVector, + Map idToConstant, + Function>, VectorizedReader> readerFactory, + DeleteFilter deleteFilter, + Properties customizedVectorizationProperties) { + this.parquetSchema = parquetSchema; + this.icebergSchema = expectedSchema; + this.rootAllocator = + ArrowAllocation.rootAllocator() + .newChildAllocator("VectorizedReadBuilder", 0, Long.MAX_VALUE); + this.setArrowValidityVector = setArrowValidityVector; + this.idToConstant = idToConstant; + this.readerFactory = readerFactory; + this.deleteFilter = deleteFilter; + } + + protected VectorizedReader vectorizedReader(List> reorderedFields) { + VectorizedReader reader = readerFactory.apply(reorderedFields); + if (deleteFilter != null) { + ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); + } + return reader; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java index e47152c79398..850b1e1ebf26 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedSparkParquetReaders.java @@ -18,12 +18,10 @@ */ package org.apache.iceberg.spark.data.vectorized; -import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.Properties; import org.apache.arrow.vector.NullCheckingForGet; import org.apache.iceberg.Schema; -import org.apache.iceberg.arrow.vectorized.VectorizedReaderBuilder; import org.apache.iceberg.data.DeleteFilter; import org.apache.iceberg.parquet.TypeWithSchemaVisitor; import org.apache.iceberg.parquet.VectorizedReader; @@ -51,22 +49,39 @@ public class VectorizedSparkParquetReaders { private VectorizedSparkParquetReaders() {} - public static ColumnarBatchReader buildReader( + public static VectorizedReader buildReader( Schema expectedSchema, MessageType fileSchema, Map idToConstant, - DeleteFilter deleteFilter) { - return (ColumnarBatchReader) - TypeWithSchemaVisitor.visit( - expectedSchema.asStruct(), - fileSchema, - new ReaderBuilder( - expectedSchema, - fileSchema, - NullCheckingForGet.NULL_CHECKING_ENABLED, - idToConstant, - ColumnarBatchReader::new, - deleteFilter)); + DeleteFilter deleteFilter, + String vectorizationImpl, + Properties customizedVectorization) { + String builderImpl = ""; + String columarBatchReaderImpl = ""; + if (!vectorizationImpl.isEmpty()) { + builderImpl = vectorizationImpl + "VectorizedReaderBuilder"; + columarBatchReaderImpl = vectorizationImpl + "ColumnarBatchReader"; + } else { + builderImpl = SparkVectorizedReaderBuilder.class.getName(); + columarBatchReaderImpl = ColumnarBatchReader.class.getName(); + } + SparkVectorizedReaderBuilder vectorizedReaderBuilder = + VectorizedUtil.getSparkVectorizedReaderBuilder(builderImpl); + BaseColumnarBatchReader reader = + VectorizedUtil.getBaseColumnarBatchReader(columarBatchReaderImpl); + vectorizedReaderBuilder.initialize( + expectedSchema, + fileSchema, + NullCheckingForGet.NULL_CHECKING_ENABLED, + idToConstant, + readers -> { + reader.initialize(readers, expectedSchema, customizedVectorization); + return reader; + }, + deleteFilter, + customizedVectorization); + return TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), fileSchema, vectorizedReaderBuilder); } // enables unsafe memory access to avoid costly checks to see if index is within bounds @@ -101,28 +116,4 @@ private static String confValue(String propName, String envName) { return System.getenv(envName); } - - private static class ReaderBuilder extends VectorizedReaderBuilder { - private final DeleteFilter deleteFilter; - - ReaderBuilder( - Schema expectedSchema, - MessageType parquetSchema, - boolean setArrowValidityVector, - Map idToConstant, - Function>, VectorizedReader> readerFactory, - DeleteFilter deleteFilter) { - super(expectedSchema, parquetSchema, setArrowValidityVector, idToConstant, readerFactory); - this.deleteFilter = deleteFilter; - } - - @Override - protected VectorizedReader vectorizedReader(List> reorderedFields) { - VectorizedReader reader = super.vectorizedReader(reorderedFields); - if (deleteFilter != null) { - ((ColumnarBatchReader) reader).setDeleteFilter(deleteFilter); - } - return reader; - } - } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedUtil.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedUtil.java new file mode 100644 index 000000000000..43a7d416fda2 --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/data/vectorized/VectorizedUtil.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.data.vectorized; + +import org.apache.iceberg.common.DynConstructors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VectorizedUtil { + + private static final Logger LOG = LoggerFactory.getLogger(VectorizedUtil.class); + + private VectorizedUtil() {} + + public static SparkVectorizedReaderBuilder getSparkVectorizedReaderBuilder(String impl) { + LOG.info("Loading SparkVectorizedReaderBuilder implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(SparkVectorizedReaderBuilder.class) + .loader(VectorizedUtil.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize SparkVectorizedReaderBuilder, missing no-arg constructor: %s", + impl), + e); + } + + SparkVectorizedReaderBuilder vectorizedReaderBuilder; + try { + vectorizedReaderBuilder = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize SparkVectorizedReaderBuilder, %s does not implement SparkVectorizedReaderBuilder.", + impl), + e); + } + + return vectorizedReaderBuilder; + } + + public static BaseColumnarBatchReader getBaseColumnarBatchReader(String impl) { + LOG.info("Loading BaseColumnarBatchReader implementation: {}", impl); + DynConstructors.Ctor ctor; + try { + ctor = + DynConstructors.builder(BaseColumnarBatchReader.class) + .loader(VectorizedUtil.class.getClassLoader()) + .impl(impl) + .buildChecked(); + } catch (NoSuchMethodException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize BaseColumnarBatchReader, missing no-arg constructor: %s", impl), + e); + } + + BaseColumnarBatchReader baseColumnarBatchReader; + try { + baseColumnarBatchReader = ctor.newInstance(); + } catch (ClassCastException e) { + throw new IllegalArgumentException( + String.format( + "Cannot initialize BaseColumnarBatchReader, %s does not implement BaseColumnarBatchReader.", + impl), + e); + } + + return baseColumnarBatchReader; + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index c05b694a60dc..116d5888cfd2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.source; import java.util.Map; +import java.util.Properties; import java.util.Set; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -39,6 +40,8 @@ abstract class BaseBatchReader extends BaseReader { private final int batchSize; + private String customizedVectorizationImpl; + private Properties customizedVectorizationProperties; BaseBatchReader( Table table, @@ -51,6 +54,15 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( InputFile inputFile, FileFormat format, @@ -88,7 +100,12 @@ private CloseableIterable newParquetIterable( .createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant, deleteFilter)) + requiredSchema, + fileSchema, + idToConstant, + deleteFilter, + customizedVectorizationImpl, + customizedVectorizationProperties)) .recordsPerBatch(batchSize) .filter(residual) .caseSensitive(caseSensitive()) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index fd6783f3e1f7..2615b7966f16 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -20,6 +20,7 @@ import java.util.List; import java.util.Objects; +import java.util.Properties; import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.MetadataColumns; @@ -115,11 +116,14 @@ private String[][] computePreferredLocations() { public PartitionReaderFactory createReaderFactory() { if (useParquetBatchReads()) { int batchSize = readConf.parquetBatchSize(); - return new SparkColumnarReaderFactory(batchSize); + return new SparkColumnarReaderFactory( + batchSize, + readConf.getCustomizedVectorizationImpl(), + readConf.getCustomizedVectorizationProperties()); } else if (useOrcBatchReads()) { int batchSize = readConf.orcBatchSize(); - return new SparkColumnarReaderFactory(batchSize); + return new SparkColumnarReaderFactory(batchSize, "", new Properties()); } else { return new SparkRowReaderFactory(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index 655e20a50e11..5f8a4d2f8fd4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -18,6 +18,7 @@ */ package org.apache.iceberg.spark.source; +import java.util.Properties; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.spark.sql.catalyst.InternalRow; @@ -28,10 +29,16 @@ class SparkColumnarReaderFactory implements PartitionReaderFactory { private final int batchSize; + private final String customizedVectorizationImpl; - SparkColumnarReaderFactory(int batchSize) { + private final Properties customizedVectorizationProperties; + + SparkColumnarReaderFactory( + int batchSize, String vectorizationImpl, Properties customizedVectorizationProperties) { Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1"); this.batchSize = batchSize; + this.customizedVectorizationImpl = vectorizationImpl; + this.customizedVectorizationProperties = customizedVectorizationProperties; } @Override @@ -49,7 +56,10 @@ public PartitionReader createColumnarReader(InputPartition inputP SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new BatchDataReader(partition, batchSize); + BatchDataReader batchDataReader = new BatchDataReader(partition, batchSize); + batchDataReader.setCustomizedVectorizationImpl(customizedVectorizationImpl); + batchDataReader.setCustomizedVectorizationProperties(customizedVectorizationProperties); + return batchDataReader; } else { throw new UnsupportedOperationException( diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java index 044ea3d93c0b..6ff2be6dae36 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetReadMetadataColumns.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.Properties; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -190,7 +191,12 @@ public void testReadRowNumbersWithDelete() throws IOException { builder.createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), deleteFilter)); + PROJECTION_SCHEMA, + fileSchema, + Maps.newHashMap(), + deleteFilter, + "", + new Properties())); builder.recordsPerBatch(RECORDS_PER_BATCH); validate(expectedRowsAfterDelete, builder); @@ -266,7 +272,7 @@ private void readAndValidate( builder.createBatchedReaderFunc( fileSchema -> VectorizedSparkParquetReaders.buildReader( - PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), null)); + PROJECTION_SCHEMA, fileSchema, Maps.newHashMap(), null, "", new Properties())); builder.recordsPerBatch(RECORDS_PER_BATCH); } else { builder = diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java index 5c4b216aff94..267797fe20d3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/parquet/vectorized/TestParquetVectorizedReads.java @@ -27,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.util.Iterator; +import java.util.Properties; import org.apache.avro.generic.GenericData; import org.apache.iceberg.Files; import org.apache.iceberg.Schema; @@ -144,7 +145,7 @@ void assertRecordsMatch( .createBatchedReaderFunc( type -> VectorizedSparkParquetReaders.buildReader( - schema, type, Maps.newHashMap(), null)); + schema, type, Maps.newHashMap(), null, "", new Properties())); if (reuseContainers) { readBuilder.reuseContainers(); } @@ -207,7 +208,9 @@ public void testNestedStruct() { new MessageType( "struct", new GroupType(Type.Repetition.OPTIONAL, "struct").withId(1)), Maps.newHashMap(), - null)) + null, + "", + new Properties())) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Vectorized reads are not supported yet for struct fields"); }