Skip to content

Commit

Permalink
# This is a combination of 12 commits.
Browse files Browse the repository at this point in the history
# This is the 1st commit message:

flush

# This is the commit message delta-io#2:

flush

# This is the commit message delta-io#3:

First sane version without isRowDeleted

# This is the commit message delta-io#4:

Hack RowIndexMarkingFilters

# This is the commit message delta-io#5:

Add support for non-vectorized readers

# This is the commit message delta-io#6:

Metadata column fix

# This is the commit message delta-io#7:

Avoid non-deterministic UDF to filter deleted rows

# This is the commit message delta-io#8:

metadata with Expression ID

# This is the commit message delta-io#9:

Fix complex views issue

# This is the commit message delta-io#10:

Tests

# This is the commit message delta-io#11:

cleaning

# This is the commit message delta-io#12:

More tests and fixes
  • Loading branch information
andreaschat-db committed Apr 23, 2024
1 parent 48fc958 commit ea7b635
Show file tree
Hide file tree
Showing 6 changed files with 262 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
val delete = DeleteFromTable(
self.toDF.queryExecution.analyzed,
condition.getOrElse(Literal.TrueLiteral))
val a = toDataset(sparkSession, delete)
a
toDataset(sparkSession, delete)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ case class DeltaParquetFileFormat(
isRowDeletedColumn.foreach { columnMetadata =>
rowIndexFilter.get
.materializeIntoVector(
columnarRow.rowId, // TODO check this
0,
columnarRow.getLong(rowIndexColumnIndex),
tempVector)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project
import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.types.{LongType, StructField, StructType}

/**
* Plan transformer to inject a filter that removes the rows marked as deleted according to
Expand All @@ -51,9 +52,28 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela
*/
trait PreprocessTableWithDVs extends SubqueryTransformerHelper {
def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = {
transformWithSubqueries(plan) {
val newPlan = transformWithSubqueries(plan) {
case ScanWithDeletionVectors(dvScan) => dvScan
}

/*
transformWithSubqueries(newPlan) {
case a: AttributeReference if a.name == METADATA_NAME => newPlan
}
*/
/*
transformWithSubqueries(newPlan) {
case a: AttributeReference if a.name == METADATA_NAME =>
val x = UnresolvedAttribute(a.qualifier :+ a.name)
val newPlan = FakeLogicalPlan(Seq(x), newPlan)
val spark = SparkSession.getActiveSession.get
val y = spark.sessionState.analyzer.execute(newPlan)
// val resolvedExprs = resolveExprs(Seq(x), newPlan)
newPlan
}
*/

newPlan
}
}

Expand Down Expand Up @@ -120,24 +140,40 @@ object ScanWithDeletionVectors {
// Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any
// other values mean the row needs be skipped.
val skipRowField = IS_ROW_DELETED_STRUCT_FIELD
// val rowIndexCol = AttributeReference(
// s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}",
// ROW_INDEX_STRUCT_FIELD.dataType)()
// val rowIndexField = ParquetFileFormat.ROW_INDEX_FIELD
val newScanOutput = if (inputScan.output.map(_.name).contains(METADATA_NAME)) {
inputScan.output :+ AttributeReference(skipRowField.name, skipRowField.dataType)()

val withReplacedMetadata = inputScan.output.collect {
case a: AttributeReference if a.name == METADATA_NAME &&
!a.dataType.asInstanceOf[StructType].fieldNames.contains(ParquetFileFormat.ROW_INDEX) =>
fileFormat.createFileMetadataCol().withExprId(a.exprId)
case o => o
}

val newScanOutput = if (withReplacedMetadata.map(_.name).contains(METADATA_NAME)) {
withReplacedMetadata ++ Seq(AttributeReference(skipRowField.name, skipRowField.dataType)())
} else {
val fileMetadataCol = fileFormat.createFileMetadataCol()
/*
val rowIndexCol = AttributeReference(
s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}",
ROW_INDEX_STRUCT_FIELD.dataType)()
*/
inputScan.output ++
Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(), fileMetadataCol)
withReplacedMetadata ++
Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(),
fileFormat.createFileMetadataCol())
}

/*
val newScanOutput = inputScan.output.filterNot(_.name == METADATA_NAME) ++
Seq(
AttributeReference(skipRowField.name, skipRowField.dataType)(),
fileMetadataCol.withExprId(aaa.exprId))
*/

// Data schema and scan schema could be different. The scan schema may contain additional
// columns such as `_metadata.file_path` (metadata columns) which are populated in Spark scan
// operator after the data is read from the underlying file reader.
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField)
val rowIndexField =
StructField(s"${ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME}", LongType)
// ParquetFileFormat.ROW_INDEX_FIELD
val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) // .add(rowIndexField)

val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString)
val newRelation = hadoopFsRelation.copy(
Expand All @@ -153,7 +189,19 @@ object ScanWithDeletionVectors {
require(skipRowColumnRefs.size == 1,
s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME")
val skipRowColumnRef = skipRowColumnRefs.head


/*
val rowIndexCol = AttributeReference(
s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}",
ROW_INDEX_STRUCT_FIELD.dataType)()
*/
// val rowIndexCol = Column(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}").expr
// val spark = SparkSession.getActiveSession.get
// val projection = Project(Seq(Alias(rowIndexCol, ParquetFileFormat.ROW_INDEX)()), newScan)
// val analyzed = spark.sessionState.analyzer.execute(projection)
// val rowIndexProjection = Project(Seq(rowIndexCol), newScan)
// val newPlan = spark.sessionState.analyzer.execute(rowIndexProjection)

Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,19 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
fileMetadataCol = format.createFileMetadataCol()
// Take the existing schema and add additional metadata columns
val newDataSchema =
StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD)
StructType(hfsr.dataSchema) // .add(ROW_INDEX_STRUCT_FIELD)
val finalOutput = l.output :+ fileMetadataCol // Seq(rowIndexCol, fileMetadataCol)
/*

// Disable splitting and filter pushdown in order to generate the row-indexes
val newFormat = format.copy(isSplittable = true, disablePushDowns = false)
// val newFormat = format.copy(isSplittable = true, disablePushDowns = false)
val newBaseRelation = hfsr.copy(
location = fileIndex,
dataSchema = newDataSchema,
fileFormat = newFormat)(hfsr.sparkSession)
*/
location = fileIndex // ,
// dataSchema = newDataSchema,
// fileFormat = newFormat
)(hfsr.sparkSession)


l.copy(relation = hfsr, output = finalOutput)
l.copy(relation = newBaseRelation, output = finalOutput)
case p @ Project(projectList, _) =>
if (fileMetadataCol == null) {
throw new IllegalStateException("File metadata column is not yet created.")
Expand All @@ -111,7 +112,6 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand {
p.copy(projectList = newProjectList)
}
newTarget
// target
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

package org.apache.spark.sql.delta

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.FILES_MAX_PARTITION_BYTES

import scala.collection.mutable.ArrayBuffer

// scalastyle:off import.ordering.noEmptyLine
Expand Down Expand Up @@ -287,69 +284,6 @@ class TightBoundsSuite
assert(statsAfterDelete === expectedStatsAfterDelete)
}
}

test("TEST") {
withTempDeltaTable(
// .repartition(1)
dataDF = spark.range(0, 50000000, 1, 1).toDF("id"),
// dataDF = spark.range(0, 25000000, 1, 1).toDF("id"),
// dataDF = spark.range(0, 100000000, 1, 1).toDF("id"),
enableDVs = true
) { (targetTable, targetLog) =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") {
targetTable().delete("id == 40000000")

// val d = targetTable().toDF.filter("id != 1").queryExecution.executedPlan
// .filter("id != 1")
val a = targetTable().toDF.filter("id != 1").collect()
val c = targetLog.update().allFiles.collect()
val b = 1
assert(a.length === 49999998)
// assert(a.length === 29999999)

// a(40000000).getLong(0)
assert(a(1).getLong(0) === 2)
assert(a(39999998).getLong(0) === 39999999)
assert(a(39999999).getLong(0) === 40000001)
// assert(!a.map(_.getLong(0)).toSeq.contains(40000000))
// assert(a === Seq(0, 100000000).drop(2))
}
}
}

test("TEST 2") {
withTempDeltaTable(
// .repartition(1)
dataDF = spark.range(0, 100, 1, 1).toDF("id"),
enableDVs = true
) { (targetTable, targetLog) =>
withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString,
SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") {
targetTable().delete("id == 4")
targetTable().delete("id == 5")

val a = 1
}
}
}

test(s"TEST COMPLEX TMP VIEW") {
import testImplicits._
withTempView("v") {
withTable("tab") {
Seq((0, 3), (1, 2)).toDF("key", "value")
.write
.option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString)
.format("delta")
.saveAsTable("tab")
sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT value as key, key as value FROM tab")
sql(s"DELETE FROM v WHERE key >= 1 and value < 3")
spark.read.format("delta").table("v")
}
}
}

}

class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode
Loading

0 comments on commit ea7b635

Please sign in to comment.