Skip to content

Commit

Permalink
# This is a combination of 6 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix
  • Loading branch information
andreaschat-db committed Apr 23, 2024
1 parent cd81332 commit 96d6f91
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 10 deletions.
17 changes: 17 additions & 0 deletions spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.spark.sql.delta;

import org.apache.spark.sql.vectorized.ColumnVector;
import org.apache.spark.sql.execution.vectorized.WritableColumnVector;

/**
Expand All @@ -34,6 +35,22 @@ public interface RowIndexFilter {
*/
void materializeIntoVector(long start, long end, WritableColumnVector batch);

/**
*
* @param batchSize
* @param rowIndexColumn
* @param batch
*/
void materializeIntoVector(int batchSize, ColumnVector rowIndexColumn, WritableColumnVector batch);

/**
*
* @param rowNumber
* @param rowIndex
* @param batch
*/
void materializeIntoVector(int rowNumber, long rowIndex, WritableColumnVector batch);

/**
* Value that must be materialised for a row to be kept after filtering.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package org.apache.spark.sql.delta

import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.RowIndexFilterType
import org.apache.spark.sql.delta.DeltaParquetFileFormat._
import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol}
Expand Down Expand Up @@ -338,10 +337,39 @@ case class DeltaParquetFileFormat(
// Used only when non-column row batches are received from the Parquet reader
val tempVector = new OnHeapColumnVector(1, ByteType)

if (rowIndexColumn.isDefined == false) {
val c = 1
}

val rowIndexColumnIndex = (rowIndexFilter, rowIndexColumn) match {
case (Some(_: KeepAllRowsFilter.type), Some(rowIndexColumn)) => rowIndexColumn.index
case (Some(_), Some(rowIndexColumn)) => rowIndexColumn.index
case _ =>
throw new IllegalStateException("bla bla")
}

iterator.map { row =>
row match {
case batch: ColumnarBatch => // When vectorized Parquet reader is enabled
val size = batch.numRows()
trySafely(useOffHeapBuffers, size, Seq(isRowDeletedColumn.get)) { writableVectors =>
val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)]
// val index = 0
isRowDeletedColumn.foreach { columnMetadata =>
val isRowDeletedVector = writableVectors.head

rowIndexFilter.get
.materializeIntoVector(size, batch.column(rowIndexColumnIndex), isRowDeletedVector)
// rowIndexFilter.get
// .materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector)
indexVectorTuples += (columnMetadata.index -> isRowDeletedVector)
// index += 1
}

replaceVectors(batch, indexVectorTuples.toSeq: _*)
}

/*
// Create vectors for all needed metadata columns.
// We can't use the one from Parquet reader as it set the
// [[WritableColumnVector.isAllNulls]] to true and it can't be reset with using any
Expand All @@ -357,7 +385,6 @@ case class DeltaParquetFileFormat(
index += 1
}
/*
rowIndexColumn.foreach { columnMetadata =>
val rowIndexVector = writableVectors(index)
// populate the row index column value
Expand All @@ -368,19 +395,20 @@ case class DeltaParquetFileFormat(
indexVectorTuples += (columnMetadata.index -> rowIndexVector)
index += 1
}
*/
val newBatch = replaceVectors(batch, indexVectorTuples.toSeq: _*)
rowIndex += size
newBatch
}
*/

case columnarRow: ColumnarBatchRow =>
// When vectorized reader is enabled but returns immutable rows instead of
// columnar batches [[ColumnarBatchRow]]. So we have to copy the row as a
// mutable [[InternalRow]] and set the `row_index` and `is_row_deleted`
// column values. This is not efficient. It should affect only the wide
// tables. https://github.com/delta-io/delta/issues/2246
/*
val newRow = columnarRow.copy();
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector)
Expand All @@ -390,12 +418,30 @@ case class DeltaParquetFileFormat(
rowIndexColumn.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
rowIndex += 1
newRow
*/
val newRow = columnarRow.copy();
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get
.materializeIntoVector(
columnarRow.rowId, // TODO check this
columnarRow.getLong(rowIndexColumnIndex),
tempVector)

newRow.setByte(columnMetadata.index, tempVector.getByte(0))
}

// rowIndexColumn
// .foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex))
// rowIndex += 1
newRow

case rest: InternalRow => // When vectorized Parquet reader is disabled
// achatzis
// Temporary vector variable used to get DV values from RowIndexFilter
// Currently the RowIndexFilter only supports writing into a columnar vector
// and doesn't have methods to get DV value for a specific row index.
// TODO: This is not efficient, but it is ok given the default reader is vectorized
/*
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector)
rest.setByte(columnMetadata.index, tempVector.getByte(0))
Expand All @@ -404,6 +450,18 @@ case class DeltaParquetFileFormat(
rowIndexColumn.foreach(columnMetadata => rest.setLong(columnMetadata.index, rowIndex))
rowIndex += 1
rest
*/
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get
.materializeIntoVector(
0,
rest.getLong(rowIndexColumnIndex),
tempVector)

rest.setByte(columnMetadata.index, tempVector.getByte(0))
}

rest
case others =>
throw new RuntimeException(
s"Parquet reader returned an unknown row type: ${others.getClass.getName}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.expressions.AttributeReference
import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}

import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}

/**
Expand Down Expand Up @@ -117,8 +120,19 @@ object ScanWithDeletionVectors {
// Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any
// other values mean the row needs be skipped.
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
val newScanOutput = inputScan.output :+
AttributeReference(skipRowField.name, skipRowField.dataType)()
// val rowIndexField = ParquetFileFormat.ROW_INDEX_FIELD
val newScanOutput = if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output :+ AttributeReference(skipRowField.name, skipRowField.dataType)()
} else {
val fileMetadataCol = fileFormat.createFileMetadataCol()
/*
val rowIndexCol = AttributeReference(
s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}",
ROW_INDEX_STRUCT_FIELD.dataType)()
*/
inputScan.output ++
Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(), fileMetadataCol)
}

// Data schema and scan schema could be different. The scan schema may contain additional
// columns such as `_metadata.file_path` (metadata columns) which are populated in Spark scan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,12 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
private def replaceFileIndex(target: LogicalPlan, fileIndex: TahoeFileIndex): LogicalPlan = {
// val rowIndexCol =
// AttributeReference(ROW_INDEX_COLUMN_NAME, ROW_INDEX_STRUCT_FIELD.dataType)();
/*
val rowIndexCol =
AttributeReference(
s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}",
ROW_INDEX_STRUCT_FIELD.dataType)()
*/
var fileMetadataCol: AttributeReference = null

val newTarget = target.transformUp {
Expand All @@ -91,19 +93,21 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
val newDataSchema =
StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD)
val finalOutput = l.output :+ fileMetadataCol // Seq(rowIndexCol, fileMetadataCol)
/*
// Disable splitting and filter pushdown in order to generate the row-indexes
val newFormat = format.copy(isSplittable = true, disablePushDowns = false)
val newBaseRelation = hfsr.copy(
location = fileIndex,
dataSchema = newDataSchema,
fileFormat = newFormat)(hfsr.sparkSession)
*/

l.copy(relation = hfsr, output = finalOutput)
case p @ Project(projectList, _) =>
if (fileMetadataCol == null) {
throw new IllegalStateException("File metadata column is not yet created.")
}
val newProjectList = projectList ++ Seq(rowIndexCol, fileMetadataCol)
val newProjectList = projectList :+ fileMetadataCol
p.copy(projectList = newProjectList)
}
newTarget
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.vectorized.ColumnVector
import org.apache.spark.sql.execution.vectorized.WritableColumnVector

/**
Expand All @@ -46,6 +46,36 @@ abstract sealed class RowIndexMarkingFilters(bitmap: RoaringBitmapArray) extends
rowId += 1
}
}

override def materializeIntoVector(
batchSize: Int,
rowIndexColumn: ColumnVector,
batch: WritableColumnVector): Unit = {
for (rowNumber <- 0 to batchSize - 1) {
val rowIndex = rowIndexColumn.getLong(rowNumber)

val isContained = bitmap.contains(rowIndex)
val filterOutput = if (isContained) {
valueWhenContained
} else {
valueWhenNotContained
}
batch.putByte(rowNumber, filterOutput)
}
}

override def materializeIntoVector(
rowNumber: Int,
rowIndex: Long,
batch: WritableColumnVector): Unit = {
val isContained = bitmap.contains(rowIndex)
val filterOutput = if (isContained) {
valueWhenContained
} else {
valueWhenNotContained
}
batch.putByte(rowNumber, filterOutput)
}
}

sealed trait RowIndexMarkingFiltersBuilder {
Expand Down Expand Up @@ -123,6 +153,21 @@ case object DropAllRowsFilter extends RowIndexFilter {
rowId += 1
}
}

override def materializeIntoVector(
batchSize: Int,
rowIndexColumn: ColumnVector,
batch: WritableColumnVector): Unit = {
for (rowId <- 0 to batchSize - 1) {
batch.putByte(rowId, RowIndexFilter.DROP_ROW_VALUE)
}
}

override def materializeIntoVector(
rowNumber: Int,
rowIndex: Long,
batch: WritableColumnVector): Unit =
batch.putByte(rowNumber, RowIndexFilter.DROP_ROW_VALUE)
}

case object KeepAllRowsFilter extends RowIndexFilter {
Expand All @@ -134,4 +179,19 @@ case object KeepAllRowsFilter extends RowIndexFilter {
rowId += 1
}
}

override def materializeIntoVector(
batchSize: Int,
rowIndexColumn: ColumnVector,
batch: WritableColumnVector): Unit = {
for (rowId <- 0 to batchSize - 1) {
batch.putByte(rowId, RowIndexFilter.KEEP_ROW_VALUE)
}
}

override def materializeIntoVector(
rowNumber: Int,
rowIndex: Long,
batch: WritableColumnVector): Unit =
batch.putByte(rowNumber, RowIndexFilter.KEEP_ROW_VALUE)
}
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,11 @@ class TightBoundsSuite
withTempDeltaTable(
// .repartition(1)
dataDF = spark.range(0, 50000000, 1, 1).toDF("id"),
// dataDF = spark.range(0, 25000000, 1, 1).toDF("id"),
// dataDF = spark.range(0, 100000000, 1, 1).toDF("id"),
enableDVs = true
) { (targetTable, targetLog) =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString,
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") {
targetTable().delete("id == 40000000")

Expand All @@ -304,16 +305,51 @@ class TightBoundsSuite
val a = targetTable().toDF.filter("id != 1").collect()
val c = targetLog.update().allFiles.collect()
val b = 1
assert(a.length === 49999999)
assert(a.length === 49999998)
// assert(a.length === 29999999)

// a(40000000).getLong(0)
assert(a(40000000).getLong(0) === 40000000)
assert(a(1).getLong(0) === 2)
assert(a(39999998).getLong(0) === 39999999)
assert(a(39999999).getLong(0) === 40000001)
// assert(!a.map(_.getLong(0)).toSeq.contains(40000000))
// assert(a === Seq(0, 100000000).drop(2))
}
}
}

test("TEST 2") {
withTempDeltaTable(
// .repartition(1)
dataDF = spark.range(0, 100, 1, 1).toDF("id"),
enableDVs = true
) { (targetTable, targetLog) =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") {
targetTable().delete("id == 4")
targetTable().delete("id == 5")

val a = 1
}
}
}

test(s"TEST COMPLEX TMP VIEW") {
import testImplicits._
withTempView("v") {
withTable("tab") {
Seq((0, 3), (1, 2)).toDF("key", "value")
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString)
.format("delta")
.saveAsTable("tab")
sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT value as key, key as value FROM tab")
sql(s"DELETE FROM v WHERE key >= 1 and value < 3")
spark.read.format("delta").table("v")
}
}
}

}

class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class DeletionVectorsSuite extends QueryTest
}

test("select metadata columns from a Delta table with deletion vectors") {
val a = spark.read.format("delta").load(table1Path).distinct().count()

assert(spark.read.format("delta").load(table1Path)
.select("_metadata.file_path").distinct().count() == 22)
}
Expand Down

0 comments on commit 96d6f91

Please sign in to comment.