From 8edd480e350ae5b81f41e18004ec20ffb39d8dde Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Tue, 23 Apr 2024 19:09:54 +0200 Subject: [PATCH] # This is a combination of 6 commits. # This is the 1st commit message: flush # This is the commit message #2: flush # This is the commit message #3: First sane version without isRowDeleted # This is the commit message #4: Hack RowIndexMarkingFilters # This is the commit message #5: Add support for non-vectorized readers # This is the commit message #6: Metadata column fix --- .../spark/sql/delta/RowIndexFilter.java | 17 +++++ .../sql/delta/DeltaParquetFileFormat.scala | 64 ++++++++++++++++++- .../sql/delta/PreprocessTableWithDVs.scala | 18 +++++- .../DMLWithDeletionVectorsHelper.scala | 6 +- .../RowIndexMarkingFilters.scala | 62 +++++++++++++++++- .../spark/sql/delta/TightBoundsSuite.scala | 42 +++++++++++- .../DeletionVectorsSuite.scala | 2 + 7 files changed, 201 insertions(+), 10 deletions(-) diff --git a/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java b/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java index c2d689e2f7..3c13b0184e 100644 --- a/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java +++ b/spark/src/main/java/org/apache/spark/sql/delta/RowIndexFilter.java @@ -16,6 +16,7 @@ package org.apache.spark.sql.delta; +import org.apache.spark.sql.vectorized.ColumnVector; import org.apache.spark.sql.execution.vectorized.WritableColumnVector; /** @@ -34,6 +35,22 @@ public interface RowIndexFilter { */ void materializeIntoVector(long start, long end, WritableColumnVector batch); + /** + * + * @param batchSize + * @param rowIndexColumn + * @param batch + */ + void materializeIntoVector(int batchSize, ColumnVector rowIndexColumn, WritableColumnVector batch); + + /** + * + * @param rowNumber + * @param rowIndex + * @param batch + */ + void materializeIntoVector(int rowNumber, long rowIndex, WritableColumnVector batch); + /** * Value that must be materialised for a row to be kept after filtering. */ diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index 988d9d586e..a7debad78d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.delta import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal - import org.apache.spark.sql.delta.RowIndexFilterType import org.apache.spark.sql.delta.DeltaParquetFileFormat._ import org.apache.spark.sql.delta.actions.{DeletionVectorDescriptor, Metadata, Protocol} @@ -338,10 +337,39 @@ case class DeltaParquetFileFormat( // Used only when non-column row batches are received from the Parquet reader val tempVector = new OnHeapColumnVector(1, ByteType) + if (rowIndexColumn.isDefined == false) { + val c = 1 + } + + val rowIndexColumnIndex = (rowIndexFilter, rowIndexColumn) match { + case (Some(_: KeepAllRowsFilter.type), Some(rowIndexColumn)) => rowIndexColumn.index + case (Some(_), Some(rowIndexColumn)) => rowIndexColumn.index + case _ => + throw new IllegalStateException("bla bla") + } + iterator.map { row => row match { case batch: ColumnarBatch => // When vectorized Parquet reader is enabled val size = batch.numRows() + trySafely(useOffHeapBuffers, size, Seq(isRowDeletedColumn.get)) { writableVectors => + val indexVectorTuples = new ArrayBuffer[(Int, ColumnVector)] + // val index = 0 + isRowDeletedColumn.foreach { columnMetadata => + val isRowDeletedVector = writableVectors.head + + rowIndexFilter.get + .materializeIntoVector(size, batch.column(rowIndexColumnIndex), isRowDeletedVector) + // rowIndexFilter.get + // .materializeIntoVector(rowIndex, rowIndex + size, isRowDeletedVector) + indexVectorTuples += (columnMetadata.index -> isRowDeletedVector) + // index += 1 + } + + replaceVectors(batch, indexVectorTuples.toSeq: _*) + } + + /* // Create vectors for all needed metadata columns. // We can't use the one from Parquet reader as it set the // [[WritableColumnVector.isAllNulls]] to true and it can't be reset with using any @@ -357,7 +385,6 @@ case class DeltaParquetFileFormat( index += 1 } - /* rowIndexColumn.foreach { columnMetadata => val rowIndexVector = writableVectors(index) // populate the row index column value @@ -368,12 +395,12 @@ case class DeltaParquetFileFormat( indexVectorTuples += (columnMetadata.index -> rowIndexVector) index += 1 } - */ val newBatch = replaceVectors(batch, indexVectorTuples.toSeq: _*) rowIndex += size newBatch } + */ case columnarRow: ColumnarBatchRow => // When vectorized reader is enabled but returns immutable rows instead of @@ -381,6 +408,7 @@ case class DeltaParquetFileFormat( // mutable [[InternalRow]] and set the `row_index` and `is_row_deleted` // column values. This is not efficient. It should affect only the wide // tables. https://github.com/delta-io/delta/issues/2246 + /* val newRow = columnarRow.copy(); isRowDeletedColumn.foreach { columnMetadata => rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector) @@ -390,12 +418,30 @@ case class DeltaParquetFileFormat( rowIndexColumn.foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex)) rowIndex += 1 newRow + */ + val newRow = columnarRow.copy(); + isRowDeletedColumn.foreach { columnMetadata => + rowIndexFilter.get + .materializeIntoVector( + columnarRow.rowId, // TODO check this + columnarRow.getLong(rowIndexColumnIndex), + tempVector) + + newRow.setByte(columnMetadata.index, tempVector.getByte(0)) + } + + // rowIndexColumn + // .foreach(columnMetadata => newRow.setLong(columnMetadata.index, rowIndex)) + // rowIndex += 1 + newRow case rest: InternalRow => // When vectorized Parquet reader is disabled + // achatzis // Temporary vector variable used to get DV values from RowIndexFilter // Currently the RowIndexFilter only supports writing into a columnar vector // and doesn't have methods to get DV value for a specific row index. // TODO: This is not efficient, but it is ok given the default reader is vectorized + /* isRowDeletedColumn.foreach { columnMetadata => rowIndexFilter.get.materializeIntoVector(rowIndex, rowIndex + 1, tempVector) rest.setByte(columnMetadata.index, tempVector.getByte(0)) @@ -404,6 +450,18 @@ case class DeltaParquetFileFormat( rowIndexColumn.foreach(columnMetadata => rest.setLong(columnMetadata.index, rowIndex)) rowIndex += 1 rest + */ + isRowDeletedColumn.foreach { columnMetadata => + rowIndexFilter.get + .materializeIntoVector( + 0, + rest.getLong(rowIndexColumnIndex), + tempVector) + + rest.setByte(columnMetadata.index, tempVector.getByte(0)) + } + + rest case others => throw new RuntimeException( s"Parquet reader returned an unknown row type: ${others.getClass.getName}") diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index 51bdd1a9de..bfd2442274 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -25,6 +25,9 @@ import org.apache.spark.sql.Column import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} + +import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} /** @@ -117,8 +120,19 @@ object ScanWithDeletionVectors { // Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any // other values mean the row needs be skipped. val skipRowField = IS_ROW_DELETED_STRUCT_FIELD - val newScanOutput = inputScan.output :+ - AttributeReference(skipRowField.name, skipRowField.dataType)() + // val rowIndexField = ParquetFileFormat.ROW_INDEX_FIELD + val newScanOutput = if (inputScan.output.map(_.name).contains(METADATA_NAME)) { + inputScan.output :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + } else { + val fileMetadataCol = fileFormat.createFileMetadataCol() + /* + val rowIndexCol = AttributeReference( + s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}", + ROW_INDEX_STRUCT_FIELD.dataType)() + */ + inputScan.output ++ + Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(), fileMetadataCol) + } // Data schema and scan schema could be different. The scan schema may contain additional // columns such as `_metadata.file_path` (metadata columns) which are populated in Spark scan diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala index 36c0170379..c4e06d4c04 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala @@ -77,10 +77,12 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { private def replaceFileIndex(target: LogicalPlan, fileIndex: TahoeFileIndex): LogicalPlan = { // val rowIndexCol = // AttributeReference(ROW_INDEX_COLUMN_NAME, ROW_INDEX_STRUCT_FIELD.dataType)(); + /* val rowIndexCol = AttributeReference( s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}", ROW_INDEX_STRUCT_FIELD.dataType)() + */ var fileMetadataCol: AttributeReference = null val newTarget = target.transformUp { @@ -91,19 +93,21 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { val newDataSchema = StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD) val finalOutput = l.output :+ fileMetadataCol // Seq(rowIndexCol, fileMetadataCol) + /* // Disable splitting and filter pushdown in order to generate the row-indexes val newFormat = format.copy(isSplittable = true, disablePushDowns = false) val newBaseRelation = hfsr.copy( location = fileIndex, dataSchema = newDataSchema, fileFormat = newFormat)(hfsr.sparkSession) + */ l.copy(relation = hfsr, output = finalOutput) case p @ Project(projectList, _) => if (fileMetadataCol == null) { throw new IllegalStateException("File metadata column is not yet created.") } - val newProjectList = projectList ++ Seq(rowIndexCol, fileMetadataCol) + val newProjectList = projectList :+ fileMetadataCol p.copy(projectList = newProjectList) } newTarget diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala index c6cdc1868f..69fa53deca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/deletionvectors/RowIndexMarkingFilters.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path - +import org.apache.spark.sql.vectorized.ColumnVector import org.apache.spark.sql.execution.vectorized.WritableColumnVector /** @@ -46,6 +46,36 @@ abstract sealed class RowIndexMarkingFilters(bitmap: RoaringBitmapArray) extends rowId += 1 } } + + override def materializeIntoVector( + batchSize: Int, + rowIndexColumn: ColumnVector, + batch: WritableColumnVector): Unit = { + for (rowNumber <- 0 to batchSize - 1) { + val rowIndex = rowIndexColumn.getLong(rowNumber) + + val isContained = bitmap.contains(rowIndex) + val filterOutput = if (isContained) { + valueWhenContained + } else { + valueWhenNotContained + } + batch.putByte(rowNumber, filterOutput) + } + } + + override def materializeIntoVector( + rowNumber: Int, + rowIndex: Long, + batch: WritableColumnVector): Unit = { + val isContained = bitmap.contains(rowIndex) + val filterOutput = if (isContained) { + valueWhenContained + } else { + valueWhenNotContained + } + batch.putByte(rowNumber, filterOutput) + } } sealed trait RowIndexMarkingFiltersBuilder { @@ -123,6 +153,21 @@ case object DropAllRowsFilter extends RowIndexFilter { rowId += 1 } } + + override def materializeIntoVector( + batchSize: Int, + rowIndexColumn: ColumnVector, + batch: WritableColumnVector): Unit = { + for (rowId <- 0 to batchSize - 1) { + batch.putByte(rowId, RowIndexFilter.DROP_ROW_VALUE) + } + } + + override def materializeIntoVector( + rowNumber: Int, + rowIndex: Long, + batch: WritableColumnVector): Unit = + batch.putByte(rowNumber, RowIndexFilter.DROP_ROW_VALUE) } case object KeepAllRowsFilter extends RowIndexFilter { @@ -134,4 +179,19 @@ case object KeepAllRowsFilter extends RowIndexFilter { rowId += 1 } } + + override def materializeIntoVector( + batchSize: Int, + rowIndexColumn: ColumnVector, + batch: WritableColumnVector): Unit = { + for (rowId <- 0 to batchSize - 1) { + batch.putByte(rowId, RowIndexFilter.KEEP_ROW_VALUE) + } + } + + override def materializeIntoVector( + rowNumber: Int, + rowIndex: Long, + batch: WritableColumnVector): Unit = + batch.putByte(rowNumber, RowIndexFilter.KEEP_ROW_VALUE) } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala index 7cbc298fb7..629bb7ac93 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala @@ -292,10 +292,11 @@ class TightBoundsSuite withTempDeltaTable( // .repartition(1) dataDF = spark.range(0, 50000000, 1, 1).toDF("id"), + // dataDF = spark.range(0, 25000000, 1, 1).toDF("id"), // dataDF = spark.range(0, 100000000, 1, 1).toDF("id"), enableDVs = true ) { (targetTable, targetLog) => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString, + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString, SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") { targetTable().delete("id == 40000000") @@ -304,16 +305,51 @@ class TightBoundsSuite val a = targetTable().toDF.filter("id != 1").collect() val c = targetLog.update().allFiles.collect() val b = 1 - assert(a.length === 49999999) + assert(a.length === 49999998) + // assert(a.length === 29999999) // a(40000000).getLong(0) - assert(a(40000000).getLong(0) === 40000000) + assert(a(1).getLong(0) === 2) + assert(a(39999998).getLong(0) === 39999999) + assert(a(39999999).getLong(0) === 40000001) // assert(!a.map(_.getLong(0)).toSeq.contains(40000000)) // assert(a === Seq(0, 100000000).drop(2)) } } } + test("TEST 2") { + withTempDeltaTable( + // .repartition(1) + dataDF = spark.range(0, 100, 1, 1).toDF("id"), + enableDVs = true + ) { (targetTable, targetLog) => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString, + SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") { + targetTable().delete("id == 4") + targetTable().delete("id == 5") + + val a = 1 + } + } + } + + test(s"TEST COMPLEX TMP VIEW") { + import testImplicits._ + withTempView("v") { + withTable("tab") { + Seq((0, 3), (1, 2)).toDF("key", "value") + .write + .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString) + .format("delta") + .saveAsTable("tab") + sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT value as key, key as value FROM tab") + sql(s"DELETE FROM v WHERE key >= 1 and value < 3") + spark.read.format("delta").table("v") + } + } + } + } class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index db8a781c31..db776ddf76 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -86,6 +86,8 @@ class DeletionVectorsSuite extends QueryTest } test("select metadata columns from a Delta table with deletion vectors") { + val a = spark.read.format("delta").load(table1Path).distinct().count() + assert(spark.read.format("delta").load(table1Path) .select("_metadata.file_path").distinct().count() == 22) }