Skip to content

Commit

Permalink
# This is a combination of 16 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix

# This is the commit message delta-io#7:

Avoid non-deterministic UDF to filter deleted rows

# This is the commit message delta-io#8:

metadata with Expression ID

# This is the commit message delta-io#9:

Fix complex views issue

# This is the commit message delta-io#10:

Tests

# This is the commit message delta-io#11:

cleaning

# This is the commit message delta-io#12:

More tests and fixes

# This is the commit message delta-io#13:

Partial cleaning

# This is the commit message delta-io#14:

cleaning and improvements

# This is the commit message delta-io#15:

cleaning and improvements

# This is the commit message delta-io#16:

Clean RowIndexFilter
  • Loading branch information
andreaschat-db committed Apr 26, 2024
1 parent 1165c7e commit 0826e53
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 115 deletions.
31 changes: 19 additions & 12 deletions spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,29 +27,36 @@ public interface RowIndexFilter {

/**
* Materialize filtering information for all rows in the range [start, end)
* by filling a boolean column vector batch.
* by filling a boolean column vector batch. Assumes the indexes of the rows in the batch are
* consecutive and start from 0.
*
* @param start Beginning index of the filtering range (inclusive)
* @param end End index of the filtering range (exclusive)
* @param batch The column vector for the current batch to materialize the range into
* @param start Beginning index of the filtering range (inclusive).
* @param end End index of the filtering range (exclusive).
* @param batch The column vector for the current batch to materialize the range into.
*/
void materializeIntoVector(long start, long end, WritableColumnVector batch);

/**
* Materialize filtering information for all rows in the batch. This is achieved by probing
* the roaring bitmap with the row index of every row in the batch.
*
* @param batchSize
* @param rowIndexColumn
* @param batch
* @param batchSize The size of the batch.
* @param rowIndexColumn A column vector that contains the row index of each row in the batch.
* @param batch The column vector for the current batch to materialize the range into.
*/
void materializeIntoVector(int batchSize, ColumnVector rowIndexColumn, WritableColumnVector batch);
void materializeIntoVectorWithRowIndex(
int batchSize,
ColumnVector rowIndexColumn,
WritableColumnVector batch);

/**
* Materialize filtering information for batches with a single row.
*
* @param rowNumber
* @param rowIndex
* @param batch
* @param rowIndex The index of the row to materialize the filtering information.
* @param batch The column vector for the current batch to materialize the range into.
* We assume it contains a single row.
*/
void materializeIntoVector(int rowNumber, long rowIndex, WritableColumnVector batch);
void materializeSingleRowWithRowIndex(long rowIndex, WritableColumnVector batch);

/**
* Value that must be materialised for a row to be kept after filtering.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import scala.util.control.NonFatal
import org.apache.spark.sql.delta.RowIndexFilterType
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol}
import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsReadable
import org.apache.spark.sql.delta.deletionvectors.{DropMarkedRowsFilter, KeepAllRowsFilter, KeepMarkedRowsFilter}
import org.apache.spark.sql.delta.schema.SchemaMergingUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.Job
Expand Down Expand Up @@ -61,8 +63,14 @@ case class DeltaParquetFileFormat(
extends ParquetFileFormat {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
// require(!isSplittable && disablePushDowns,
// "Wrong arguments for Delta table scan with deletion vectors")
SparkSession.getActiveSession.map { session =>
val predicatePushdownEnabled =
session.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED)
if (!predicatePushdownEnabled) {
require(tablePath.isDefined && !isSplittable && disablePushDowns,
"Wrong arguments for Delta table scan with deletion vectors")
}
}
}

TypeWidening.assertTableReadable(protocol, metadata)
Expand Down Expand Up @@ -147,6 +155,10 @@ case class DeltaParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {

val predicateConf = DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED
val predicatePushdownEnabled = sparkSession.sessionState.conf.getConf(predicateConf)

val parquetDataReader: PartitionedFile => Iterator[InternalRow] =
super.buildReaderWithPartitionValues(
sparkSession,
Expand All @@ -166,19 +178,25 @@ case class DeltaParquetFileFormat(
}
results.headOption.map(e => ColumnMetadata(e._2, e._1))
}

val isRowDeletedColumn = findColumn(IS_ROW_DELETED_COLUMN_NAME)
// val rowIndexColumn = findColumn(ROW_INDEX_COLUMN_NAME)
val rowIndexColumn = findColumn(ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
val rowIndexColumnName = if (predicatePushdownEnabled) {
ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
} else {
ROW_INDEX_COLUMN_NAME
}
val rowIndexColumn = findColumn(rowIndexColumnName)

// if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) {
if (isRowDeletedColumn.isEmpty) {
return parquetDataReader // no additional metadata is needed.
// No additional metadata is needed.
if (predicatePushdownEnabled) {
if (isRowDeletedColumn.isEmpty) return parquetDataReader
} else {
if (isRowDeletedColumn.isEmpty && rowIndexColumn.isEmpty) return parquetDataReader

// verify the file splitting and filter pushdown are disabled. The new additional
// metadata columns cannot be generated with file splitting and filter pushdowns
// require(!isSplittable, "Cannot generate row index related metadata with file splitting")
// require(disablePushDowns,
// "Cannot generate row index related metadata with filter pushdown")
require(!isSplittable, "Cannot generate row index related metadata with file splitting")
require(disablePushDowns, "Cannot generate row index related metadata with filter pushdown")
}

if (hasTablePath && isRowDeletedColumn.isEmpty) {
Expand All @@ -199,7 +217,8 @@ case class DeltaParquetFileFormat(
isRowDeletedColumn,
rowIndexColumn,
useOffHeapBuffers,
serializableHadoopConf)
serializableHadoopConf,
predicatePushdownEnabled = predicatePushdownEnabled)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
} catch {
case NonFatal(e) =>
Expand All @@ -220,23 +239,21 @@ case class DeltaParquetFileFormat(
}

override def metadataSchemaFields: Seq[StructField] = {
// achatzis
val rowTrackingFields =
RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields)
// TODO(SPARK-47731): Parquet reader in Spark has a bug where a file containing 2b+ rows
// in a single rowgroup causes it to run out of the `Integer` range.
// For Delta Parquet readers don't expose the row_index field as a metadata field.
if (!RowId.isEnabled(protocol, metadata)) {
super.metadataSchemaFields // .filter(_ != ParquetFileFormat.ROW_INDEX_FIELD)
// For Delta Parquet readers don't expose the row_index field as a metadata field when it is
// not strictly required. We do expose it when Row Tracking or DVs are enabled.
// In general, having 2b+ rows in a single rowgroup is not a common use case. When the issue is
// hit an exception is thrown.
if (RowId.isEnabled(protocol, metadata)) {
super.metadataSchemaFields ++
RowTracking.createMetadataStructFields(protocol, metadata, nullableRowTrackingFields)
} else if (deletionVectorsReadable(protocol, metadata)) {
super.metadataSchemaFields
} else {
// It is fine to expose the row_index field as a metadata field when Row Tracking
// is enabled because it is needed to generate the Row ID field, and it is not a
// big problem if we use 2b+ rows in a single rowgroup, it will throw an exception and
// we can then use less rows per rowgroup. Also, 2b+ rows in a single rowgroup is
// not a common use case.
super.metadataSchemaFields ++ rowTrackingFields
super.metadataSchemaFields.filter(_ != ParquetFileFormat.ROW_INDEX_FIELD)
}
}
}

override def prepareWrite(
sparkSession: SparkSession,
Expand Down Expand Up @@ -279,10 +296,13 @@ case class DeltaParquetFileFormat(
.updated(DefaultRowCommitVersion.METADATA_STRUCT_FIELD_NAME, extractDefaultRowCommitVersion)
}

def disableSplittingAndPushdown(tablePath: String): DeltaParquetFileFormat = {
def disableSplittingAndPushdown(
tablePath: String,
predicatePushdownEnabled: Boolean): DeltaParquetFileFormat = {
// When predicate pushdown is enabled we allow both splits and predicate pushdown.
this.copy(
// isSplittable = true,
// disablePushDowns = false,
isSplittable = predicatePushdownEnabled,
disablePushDowns = !predicatePushdownEnabled,
tablePath = Some(tablePath))
}

Expand All @@ -291,15 +311,17 @@ case class DeltaParquetFileFormat(
* following metadata columns.
* - [[IS_ROW_DELETED_COLUMN_NAME]] - row deleted status from deletion vector corresponding
* to this file
* - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file.
* - [[ROW_INDEX_COLUMN_NAME]] - index of the row within the file. Note, this column is only
* populated when are not using _metadata.row_index column.
*/
private def iteratorWithAdditionalMetadataColumns(
partitionedFile: PartitionedFile,
iterator: Iterator[Object],
isRowDeletedColumn: Option[ColumnMetadata],
rowIndexColumn: Option[ColumnMetadata],
useOffHeapBuffers: Boolean,
serializableHadoopConf: SerializableConfiguration): Iterator[Object] = {
serializableHadoopConf: SerializableConfiguration,
predicatePushdownEnabled: Boolean): Iterator[Object] = {
val rowIndexFilter = isRowDeletedColumn.map { col =>
// Fetch the DV descriptor from the broadcast map and create a row index filter
val dvDescriptorOpt = partitionedFile.otherConstantMetadataColumnValues
Expand Down Expand Up @@ -328,18 +350,13 @@ case class DeltaParquetFileFormat(

val metadataColumns = Seq(isRowDeletedColumn, rowIndexColumn).filter(_.nonEmpty).map(_.get)

// Achatzis
// Unfortunately there is no way to verify the Parquet index is starting from 0.
// We disable the splits, so the assumption is ParquetFileFormat respects that
// When metadata.row_index is not used there is no way to verify the Parquet index is
// starting from 0. We disable the splits, so the assumption is ParquetFileFormat respects that
var rowIndex: Long = 0

// Used only when non-column row batches are received from the Parquet reader
val tempVector = new OnHeapColumnVector(1, ByteType)

if (rowIndexColumn.isDefined == false) {
val c = 1
}

val rowIndexColumnIndex = (rowIndexFilter, rowIndexColumn) match {
case (Some(_: KeepAllRowsFilter.type), Some(rowIndexColumn)) => rowIndexColumn.index
case (Some(_), Some(rowIndexColumn)) => rowIndexColumn.index
Expand All @@ -357,8 +374,8 @@ case class DeltaParquetFileFormat(
isRowDeletedColumn.foreach { columnMetadata =>
val isRowDeletedVector = writableVectors.head

rowIndexFilter.get
.materializeIntoVector(size, batch.column(rowIndexColumnIndex), isRowDeletedVector)
rowIndexFilter.get.materializeIntoVectorWithRowIndex(
size, batch.column(rowIndexColumnIndex), isRowDeletedVector)
// rowIndexFilter.get
// .materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector)
indexVectorTuples += (columnMetadata.index -> isRowDeletedVector)
Expand Down Expand Up @@ -419,23 +436,25 @@ case class DeltaParquetFileFormat(
newRow
*/
val newRow = columnarRow.copy();
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get
.materializeIntoVector(
0,
columnarRow.getLong(rowIndexColumnIndex),
tempVector)
if (predicatePushdownEnabled) {
isRowDeletedColumn.foreach { columnMetadata =>
val rowIndex = columnarRow.getLong(rowIndexColumnIndex)
rowIndexFilter.get.materializeSingleRowWithRowIndex(rowIndex, tempVector)
newRow.setByte(columnMetadata.index, tempVector.getByte(0))
}
} else {
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector)
newRow.setByte(columnMetadata.index, tempVector.getByte(0))
}

newRow.setByte(columnMetadata.index, tempVector.getByte(0))
rowIndexColumn.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
rowIndex += 1
}

// rowIndexColumn
// .foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
// rowIndex += 1
newRow

case rest: InternalRow => // When vectorized Parquet reader is disabled
// achatzis
// Temporary vector variable used to get DV values from RowIndexFilter
// Currently the RowIndexFilter only supports writing into a columnar vector
// and doesn't have methods to get DV value for a specific row index.
Expand All @@ -451,12 +470,8 @@ case class DeltaParquetFileFormat(
rest
*/
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get
.materializeIntoVector(
0,
rest.getLong(rowIndexColumnIndex),
tempVector)

val rowIndex = rest.getLong(rowIndexColumnIndex)
rowIndexFilter.get.materializeSingleRowWithRowIndex(rowIndex, tempVector)
rest.setByte(columnMetadata.index, tempVector.getByte(0))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.delta.commands.DeletionVectorUtils.deletionVectorsRe
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.sql.Column
import org.apache.spark.sql.{Column, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
Expand Down Expand Up @@ -86,7 +86,8 @@ object ScanWithDeletionVectors {
// a previous invocation of this rule on this table
if (fileFormat.hasTablePath) return None

// See if any files actually have a DV
// See if any files actually have a DV.
val spark = SparkSession.getActiveSession.get
val filesWithDVs = index
.matchingFiles(partitionFilters = Seq(TrueLiteral), dataFilters = Seq(TrueLiteral))
.filter(_.deletionVector != null)
Expand All @@ -97,7 +98,7 @@ object ScanWithDeletionVectors {
// `LogicalRelation` that has the same output as this `LogicalRelation`
val planOutput = scan.output

val newScan = createScanWithSkipRowColumn(scan, fileFormat, index, hadoopRelation)
val newScan = createScanWithSkipRowColumn(spark, scan, fileFormat, index, hadoopRelation)

// On top of the scan add a filter that filters out the rows which have
// skip row column value non-zero
Expand All @@ -112,12 +113,13 @@ object ScanWithDeletionVectors {
* an extra column which indicates whether the row needs to be skipped or not.
*/
private def createScanWithSkipRowColumn(
spark: SparkSession,
inputScan: LogicalRelation,
fileFormat: DeltaParquetFileFormat,
tahoeFileIndex: TahoeFileIndex,
hadoopFsRelation: HadoopFsRelation): LogicalRelation = {
val predicatePushdownEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN)
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED)

// Create a new `LogicalRelation` that has modified `DeltaFileFormat` and output with an extra
// column to indicate whether to skip the row or not
Expand Down Expand Up @@ -150,7 +152,10 @@ object ScanWithDeletionVectors {
// operator after the data is read from the underlying file reader.
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)

val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString)
val newFileFormat = fileFormat.disableSplittingAndPushdown(
tablePath = tahoeFileIndex.path.toString,
predicatePushdownEnabled = predicatePushdownEnabled)

val newRelation = hadoopFsRelation.copy(
fileFormat = newFileFormat,
dataSchema = newDataSchema)(hadoopFsRelation.sparkSession)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
target: LogicalPlan,
fileIndex: TahoeFileIndex): LogicalPlan = {
val predicatePushdownEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN)
spark.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED)
val rowIndexCol = AttributeReference(ROW_INDEX_COLUMN_NAME, ROW_INDEX_STRUCT_FIELD.dataType)()

var fileMetadataCol: AttributeReference = null
Expand Down Expand Up @@ -386,8 +386,8 @@ object DeletionVectorBitmapGenerator {
condition: Expression,
fileNameColumnOpt: Option[Column] = None,
rowIndexColumnOpt: Option[Column] = None): Seq[DeletionVectorResult] = {
val predicatePushdownEnabled =
sparkSession.sessionState.conf.getConf(DeltaSQLConf.DELETION_VECTORS_ENABLE_PREDICATE_PUSHDOWN)
val predicatePushdownConf = DeltaSQLConf.DELETION_VECTORS_PREDICATE_PUSHDOWN_ENABLED
val predicatePushdownEnabled = sparkSession.sessionState.conf.getConf(predicatePushdownConf)
val fileNameColumn = fileNameColumnOpt.getOrElse(col(s"${METADATA_NAME}.${FILE_PATH}"))
val rowIndexColumn = if (predicatePushdownEnabled) {
rowIndexColumnOpt.getOrElse(col(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}"))
Expand Down
Loading

0 comments on commit 0826e53

Please sign in to comment.