From ad044ef4294953ae02620a6b9d9d1c2cddfa2b6e Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Tue, 23 Apr 2024 20:08:58 +0200 Subject: [PATCH] # This is a combination of 16 commits. # This is the 1st commit message: flush # This is the commit message #2: flush # This is the commit message #3: First sane version without isRowDeleted # This is the commit message #4: Hack RowIndexMarkingFilters # This is the commit message #5: Add support for non-vectorized readers # This is the commit message #6: Metadata column fix # This is the commit message #7: Avoid non-deterministic UDF to filter deleted rows # This is the commit message #8: metadata with Expression ID # This is the commit message #9: Fix complex views issue # This is the commit message #10: Tests # This is the commit message #11: cleaning # This is the commit message #12: More tests and fixes # This is the commit message #13: Partial cleaning # This is the commit message #14: cleaning and improvements # This is the commit message #15: cleaning and improvements # This is the commit message #16: Clean RowIndexFilter --- .../spark/sql/delta/RowIndexFilter.java | 31 +++-- .../sql/delta/DeltaParquetFileFormat.scala | 125 ++++++++++-------- .../sql/delta/PreprocessTableWithDVs.scala | 15 ++- .../DMLWithDeletionVectorsHelper.scala | 6 +- .../RowIndexMarkingFilters.scala | 69 +++++----- .../sql/delta/sources/DeltaSQLConf.scala | 4 +- 6 files changed, 135 insertions(+), 115 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java b/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java index 3c13b0184eb..ea77bfc1072 100644 --- a/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java +++ b/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java @@ -27,29 +27,36 @@ public interface RowIndexFilter { /** * Materialize filtering information for all rows in the range [start, end) - * by filling a boolean column vector batch. + * by filling a boolean column vector batch. Assumes the indexes of the rows in the batch are + * consecutive and start from 0. * - * @param start Beginning index of the filtering range (inclusive) - * @param end End index of the filtering range (exclusive) - * @param batch The column vector for the current batch to materialize the range into + * @param start Beginning index of the filtering range (inclusive). + * @param end End index of the filtering range (exclusive). + * @param batch The column vector for the current batch to materialize the range into. */ void materializeIntoVector(long start, long end, WritableColumnVector batch); /** + * Materialize filtering information for all rows in the batch. This is achieved by probing + * the roaring bitmap with the row index of every row in the batch. * - * @param batchSize - * @param rowIndexColumn - * @param batch + * @param batchSize The size of the batch. + * @param rowIndexColumn A column vector that contains the row index of each row in the batch. + * @param batch The column vector for the current batch to materialize the range into. */ - void materializeIntoVector(int batchSize, ColumnVector rowIndexColumn, WritableColumnVector batch); + void materializeIntoVectorWithRowIndex( + int batchSize, + ColumnVector rowIndexColumn, + WritableColumnVector batch); /** + * Materialize filtering information for batches with a single row. * - * @param rowNumber - * @param rowIndex - * @param batch + * @param rowIndex The index of the row to materialize the filtering information. + * @param batch The column vector for the current batch to materialize the range into. + * We assume it contains a single row. */ - void materializeIntoVector(int rowNumber, long rowIndex, WritableColumnVector batch); + void materializeSingleRowWithRowIndex(long rowIndex, WritableColumnVector batch); /** * Value that must be materialised for a row to be kept after filtering. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 2df2d133ef1..76afa329728 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -22,8 +22,10 @@ import scala.util.control.NonFatal import org.apache.spark.sql.delta.RowIndexFilterType import org.apache.spark.sql.delta.DeltaParquetFileFormat._ import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol} +import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter} import org.apache.spark.sql.delta.schema.SchemaMergingUtils +import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.Job @@ -61,8 +63,14 @@ case class DeltaParquetFileFormat( extends ParquetFileFormat { // Validate either we have all arguments for DV enabled read or none of them. if (hasTablePath) { - // require(!isSplittable && disablePushDowns, - // "Wrong arguments for Delta table scan with deletion vectors") + SparkSession.getActiveSession.map { session => + val predicatePushdownEnabled = + session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED) + if (!predicatePushdownEnabled) { + require(tablePath.isDefined && !isSplittable && disablePushDowns, + "Wrong arguments for Delta table scan with deletion vectors") + } + } } TypeWidening.assertTableReadable(protocol, metadata) @@ -147,6 +155,10 @@ case class DeltaParquetFileFormat( filters: Seq[Filter], options: Map[String, String], hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = { + + val predicateConf = DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED + val predicatePushdownEnabled = sparkSession.sessionState.conf.getConf(predicateConf) + val parquetDataReader: PartitionedFile => Iterator[InternalRow] = super.buildReaderWithPartitionValues( sparkSession, @@ -166,19 +178,25 @@ case class DeltaParquetFileFormat( } results.headOption.map(e => ColumnMetadata(e._2, e._1)) } + val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME) - // val rowIndexColumn = findColumn(ROW_INDEX_COLUMN_NAME) - val rowIndexColumn = findColumn(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME) + val rowIndexColumnName = if (predicatePushdownEnabled) { + ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME + } else { + ROW_INDEX_COLUMN_NAME + } + val rowIndexColumn = findColumn(rowIndexColumnName) - // if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) { - if (isRowDeletedColumn.isEmpty) { - return parquetDataReader // no additional metadata is needed. + // No additional metadata is needed. + if (predicatePushdownEnabled) { + if (isRowDeletedColumn.isEmpty) return parquetDataReader } else { + if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return parquetDataReader + // verify the file splitting and filter pushdown are disabled. The new additional // metadata columns cannot be generated with file splitting and filter pushdowns - // require(!isSplittable, "Cannot generate row index related metadata with file splitting") - // require(disablePushDowns, - // "Cannot generate row index related metadata with filter pushdown") + require(!isSplittable, "Cannot generate row index related metadata with file splitting") + require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown") } if (hasTablePath && isRowDeletedColumn.isEmpty) { @@ -199,7 +217,8 @@ case class DeltaParquetFileFormat( isRowDeletedColumn, rowIndexColumn, useOffHeapBuffers, - serializableHadoopConf) + serializableHadoopConf, + predicatePushdownEnabled = predicatePushdownEnabled) iterToReturn.asInstanceOf[Iterator[InternalRow]] } catch { case NonFatal(e) => @@ -220,23 +239,21 @@ case class DeltaParquetFileFormat( } override def metadataSchemaFields: Seq[StructField] = { - // achatzis - val rowTrackingFields = - RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields) // TODO(SPARK-47731): Parquet reader in Spark has a bug where a file containing 2b+ rows // in a single rowgroup causes it to run out of the `Integer` range. - // For Delta Parquet readers don't expose the row_index field as a metadata field. - if (!RowId.isEnabled(protocol, metadata)) { - super.metadataSchemaFields // .filter(_ != ParquetFileFormat.ROW_INDEX_FIELD) + // For Delta Parquet readers don't expose the row_index field as a metadata field when it is + // not strictly required. We do expose it when Row Tracking or DVs are enabled. + // In general, having 2b+ rows in a single rowgroup is not a common use case. When the issue is + // hit an exception is thrown. + if (RowId.isEnabled(protocol, metadata)) { + super.metadataSchemaFields ++ + RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields) + } else if (deletionVectorsReadable(protocol, metadata)) { + super.metadataSchemaFields } else { - // It is fine to expose the row_index field as a metadata field when Row Tracking - // is enabled because it is needed to generate the Row ID field, and it is not a - // big problem if we use 2b+ rows in a single rowgroup, it will throw an exception and - // we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is - // not a common use case. - super.metadataSchemaFields ++ rowTrackingFields + super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD) + } } - } override def prepareWrite( sparkSession: SparkSession, @@ -279,10 +296,13 @@ case class DeltaParquetFileFormat( .updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion) } - def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = { + def disableSplittingAndPushdown( + tablePath: String, + predicatePushdownEnabled: Boolean): DeltaParquetFileFormat = { + // When predicate pushdown is enabled we allow both splits and predicate pushdown. this.copy( - // isSplittable = true, - // disablePushDowns = false, + isSplittable = predicatePushdownEnabled, + disablePushDowns = !predicatePushdownEnabled, tablePath = Some(tablePath)) } @@ -291,7 +311,8 @@ case class DeltaParquetFileFormat( * following metadata columns. * - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion vector corresponding * to this file - * - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. + * - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note, this column is only + * populated when are not using _metadata.row_index column. */ private def iteratorWithAdditionalMetadataColumns( partitionedFile: PartitionedFile, @@ -299,7 +320,8 @@ case class DeltaParquetFileFormat( isRowDeletedColumn: Option[ColumnMetadata], rowIndexColumn: Option[ColumnMetadata], useOffHeapBuffers: Boolean, - serializableHadoopConf: SerializableConfiguration): Iterator[Object] = { + serializableHadoopConf: SerializableConfiguration, + predicatePushdownEnabled: Boolean): Iterator[Object] = { val rowIndexFilter = isRowDeletedColumn.map { col => // Fetch the DV descriptor from the broadcast map and create a row index filter val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues @@ -328,18 +350,13 @@ case class DeltaParquetFileFormat( val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get) - // Achatzis - // Unfortunately there is no way to verify the Parquet index is starting from 0. - // We disable the splits, so the assumption is ParquetFileFormat respects that + // When metadata.row_index is not used there is no way to verify the Parquet index is + // starting from 0. We disable the splits, so the assumption is ParquetFileFormat respects that var rowIndex: Long = 0 // Used only when non-column row batches are received from the Parquet reader val tempVector = new OnHeapColumnVector(1, ByteType) - if (rowIndexColumn.isDefined == false) { - val c = 1 - } - val rowIndexColumnIndex = (rowIndexFilter, rowIndexColumn) match { case (Some(_: KeepAllRowsFilter.type), Some(rowIndexColumn)) => rowIndexColumn.index case (Some(_), Some(rowIndexColumn)) => rowIndexColumn.index @@ -357,8 +374,8 @@ case class DeltaParquetFileFormat( isRowDeletedColumn.foreach { columnMetadata => val isRowDeletedVector = writableVectors.head - rowIndexFilter.get - .materializeIntoVector(size, batch.column(rowIndexColumnIndex), isRowDeletedVector) + rowIndexFilter.get.materializeIntoVectorWithRowIndex( + size, batch.column(rowIndexColumnIndex), isRowDeletedVector) // rowIndexFilter.get // .materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector) indexVectorTuples += (columnMetadata.index -> isRowDeletedVector) @@ -419,23 +436,25 @@ case class DeltaParquetFileFormat( newRow */ val newRow = columnarRow.copy(); - isRowDeletedColumn.foreach { columnMetadata => - rowIndexFilter.get - .materializeIntoVector( - 0, - columnarRow.getLong(rowIndexColumnIndex), - tempVector) + if (predicatePushdownEnabled) { + isRowDeletedColumn.foreach { columnMetadata => + val rowIndex = columnarRow.getLong(rowIndexColumnIndex) + rowIndexFilter.get.materializeSingleRowWithRowIndex(rowIndex, tempVector) + newRow.setByte(columnMetadata.index, tempVector.getByte(0)) + } + } else { + isRowDeletedColumn.foreach { columnMetadata => + rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector) + newRow.setByte(columnMetadata.index, tempVector.getByte(0)) + } - newRow.setByte(columnMetadata.index, tempVector.getByte(0)) + rowIndexColumn.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex)) + rowIndex += 1 } - // rowIndexColumn - // .foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex)) - // rowIndex += 1 newRow case rest: InternalRow => // When vectorized Parquet reader is disabled - // achatzis // Temporary vector variable used to get DV values from RowIndexFilter // Currently the RowIndexFilter only supports writing into a columnar vector // and doesn't have methods to get DV value for a specific row index. @@ -451,12 +470,8 @@ case class DeltaParquetFileFormat( rest */ isRowDeletedColumn.foreach { columnMetadata => - rowIndexFilter.get - .materializeIntoVector( - 0, - rest.getLong(rowIndexColumnIndex), - tempVector) - + val rowIndex = rest.getLong(rowIndexColumnIndex) + rowIndexFilter.get.materializeSingleRowWithRowIndex(rowIndex, tempVector) rest.setByte(columnMetadata.index, tempVector.getByte(0)) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index b8b53db8aad..8b1b459bdbb 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsRe import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.Column +import org.apache.spark.sql.{Column, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} @@ -86,7 +86,8 @@ object ScanWithDeletionVectors { // a previous invocation of this rule on this table if (fileFormat.hasTablePath) return None - // See if any files actually have a DV + // See if any files actually have a DV. + val spark = SparkSession.getActiveSession.get val filesWithDVs = index .matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral)) .filter(_.deletionVector != null) @@ -97,7 +98,7 @@ object ScanWithDeletionVectors { // `LogicalRelation` that has the same output as this `LogicalRelation` val planOutput = scan.output - val newScan = createScanWithSkipRowColumn(scan, fileFormat, index, hadoopRelation) + val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation) // On top of the scan add a filter that filters out the rows which have // skip row column value non-zero @@ -112,12 +113,13 @@ object ScanWithDeletionVectors { * an extra column which indicates whether the row needs to be skipped or not. */ private def createScanWithSkipRowColumn( + spark: SparkSession, inputScan: LogicalRelation, fileFormat: DeltaParquetFileFormat, tahoeFileIndex: TahoeFileIndex, hadoopFsRelation: HadoopFsRelation): LogicalRelation = { val predicatePushdownEnabled = - spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN) + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED) // Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra // column to indicate whether to skip the row or not @@ -150,7 +152,10 @@ object ScanWithDeletionVectors { // operator after the data is read from the underlying file reader. val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) - val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString) + val newFileFormat = fileFormat.disableSplittingAndPushdown( + tablePath = tahoeFileIndex.path.toString, + predicatePushdownEnabled = predicatePushdownEnabled) + val newRelation = hadoopFsRelation.copy( fileFormat = newFileFormat, dataSchema = newDataSchema)(hadoopFsRelation.sparkSession) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala index e828948cfb6..db8ff27defc 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala @@ -86,7 +86,7 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { target: LogicalPlan, fileIndex: TahoeFileIndex): LogicalPlan = { val predicatePushdownEnabled = - spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN) + spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED) val rowIndexCol = AttributeReference(ROW_INDEX_COLUMN_NAME, ROW_INDEX_STRUCT_FIELD.dataType)() var fileMetadataCol: AttributeReference = null @@ -386,8 +386,8 @@ object DeletionVectorBitmapGenerator { condition: Expression, fileNameColumnOpt: Option[Column] = None, rowIndexColumnOpt: Option[Column] = None): Seq[DeletionVectorResult] = { - val predicatePushdownEnabled = - sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN) + val predicatePushdownConf = DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED + val predicatePushdownEnabled = sparkSession.sessionState.conf.getConf(predicatePushdownConf) val fileNameColumn = fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}")) val rowIndexColumn = if (predicatePushdownEnabled) { rowIndexColumnOpt.getOrElse(col(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}")) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala index 69fa53decac..628a4e419a6 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala @@ -26,55 +26,48 @@ import org.apache.spark.sql.execution.vectorized.WritableColumnVector /** * Base class for row index filters. - * @param bitmap Represents the deletion vector + * @param bitmap Represents the deletion vector. */ abstract sealed class RowIndexMarkingFilters(bitmap: RoaringBitmapArray) extends RowIndexFilter { val valueWhenContained: Byte val valueWhenNotContained: Byte + private def isContainedInBitmap(rowIndex: Long): Byte = { + val isContained = bitmap.contains(rowIndex) + if (isContained) { + valueWhenContained + } else { + valueWhenNotContained + } + } + override def materializeIntoVector(start: Long, end: Long, batch: WritableColumnVector): Unit = { val batchSize = (end - start).toInt var rowId = 0 while (rowId < batchSize) { - val isContained = bitmap.contains(start + rowId.toLong) - val filterOutput = if (isContained) { - valueWhenContained - } else { - valueWhenNotContained - } - batch.putByte(rowId, filterOutput) + val isContained = isContainedInBitmap(start + rowId.toLong) + batch.putByte(rowId, isContained) rowId += 1 } } - override def materializeIntoVector( + override def materializeIntoVectorWithRowIndex( batchSize: Int, rowIndexColumn: ColumnVector, batch: WritableColumnVector): Unit = { for (rowNumber <- 0 to batchSize - 1) { val rowIndex = rowIndexColumn.getLong(rowNumber) - - val isContained = bitmap.contains(rowIndex) - val filterOutput = if (isContained) { - valueWhenContained - } else { - valueWhenNotContained - } - batch.putByte(rowNumber, filterOutput) + val isContained = isContainedInBitmap(rowIndex) + batch.putByte(rowNumber, isContained) } } - override def materializeIntoVector( - rowNumber: Int, + override def materializeSingleRowWithRowIndex( rowIndex: Long, batch: WritableColumnVector): Unit = { - val isContained = bitmap.contains(rowIndex) - val filterOutput = if (isContained) { - valueWhenContained - } else { - valueWhenNotContained - } - batch.putByte(rowNumber, filterOutput) + val isContained = isContainedInBitmap(rowIndex) + // Assumes the batch has only one element. + batch.putByte(0, isContained) } } @@ -154,7 +147,7 @@ case object DropAllRowsFilter extends RowIndexFilter { } } - override def materializeIntoVector( + override def materializeIntoVectorWithRowIndex( batchSize: Int, rowIndexColumn: ColumnVector, batch: WritableColumnVector): Unit = { @@ -163,11 +156,11 @@ case object DropAllRowsFilter extends RowIndexFilter { } } - override def materializeIntoVector( - rowNumber: Int, - rowIndex: Long, - batch: WritableColumnVector): Unit = - batch.putByte(rowNumber, RowIndexFilter.DROP_ROW_VALUE) + override def materializeSingleRowWithRowIndex( + rowIndex: Long, + batch: WritableColumnVector): Unit = + // Assumes the batch has only one element. + batch.putByte(0, RowIndexFilter.DROP_ROW_VALUE) } case object KeepAllRowsFilter extends RowIndexFilter { @@ -180,7 +173,7 @@ case object KeepAllRowsFilter extends RowIndexFilter { } } - override def materializeIntoVector( + override def materializeIntoVectorWithRowIndex( batchSize: Int, rowIndexColumn: ColumnVector, batch: WritableColumnVector): Unit = { @@ -189,9 +182,9 @@ case object KeepAllRowsFilter extends RowIndexFilter { } } - override def materializeIntoVector( - rowNumber: Int, - rowIndex: Long, - batch: WritableColumnVector): Unit = - batch.putByte(rowNumber, RowIndexFilter.KEEP_ROW_VALUE) +override def materializeSingleRowWithRowIndex( + rowIndex: Long, + batch: WritableColumnVector): Unit = + // Assumes the batch has only one element. + batch.putByte(0, RowIndexFilter.KEEP_ROW_VALUE) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala index 9d08dd6b304..fd28c87fd13 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala @@ -1525,8 +1525,8 @@ trait DeltaSQLConfBase { .booleanConf .createWithDefault(false) - val DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN = - buildConf("deletionVectors.enablePredicatePushdown") + val DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED = + buildConf("deletionVectors.predicatePushdownEnabled") .internal() .doc("""Controls whether we generate pushdown predicates in scans with DVs.""") .booleanConf