Skip to content

Commit

Permalink
Merge branch 'master' into parallelize-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas authored Apr 19, 2024
2 parents 8263637 + 9d4e4f5 commit 11c0fa3
Show file tree
Hide file tree
Showing 4 changed files with 309 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,18 @@ trait DeltaColumnMappingBase extends DeltaLogging {
physicalSchemaFieldPaths.zip(originalSchemaFields).toMap
}

/**
* Returns a map from the logical name paths to the physical name paths for the given schema.
* The logical name path is the result of splitting a multi-part identifier, and the physical name
* path is result of replacing all names in the logical name path with their physical names.
*/
def getLogicalNameToPhysicalNameMap(schema: StructType): Map[Seq[String], Seq[String]] = {
val physicalSchema = renameColumns(schema)
val logicalSchemaFieldPaths = SchemaMergingUtils.explode(schema).map(_._1)
val physicalSchemaFieldPaths = SchemaMergingUtils.explode(physicalSchema).map(_._1)
logicalSchemaFieldPaths.zip(physicalSchemaFieldPaths).toMap
}

/**
* Returns true if Column Mapping mode is enabled and the newMetadata's schema, when compared to
* the currentMetadata's schema, is indicative of a DROP COLUMN operation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ import org.apache.parquet.hadoop.util.ContextUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath
import org.apache.spark.sql.execution.datasources.OutputWriterFactory
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{ByteType, LongType, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchRow, ColumnVector}
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -101,6 +102,25 @@ case class DeltaParquetFileFormat(
} else schema
}

/**
* Prepares filters so that they can be pushed down into the Parquet reader.
*
* If column mapping is enabled, then logical column names in the filters will be replaced with
* their corresponding physical column names. This is necessary as the Parquet files will use
* physical column names, and the requested schema pushed down in the Parquet reader will also use
* physical column names.
*/
private def prepareFiltersForRead(filters: Seq[Filter]): Seq[Filter] = {
if (disablePushDowns) {
Seq.empty
} else if (columnMappingMode != NoMapping) {
val physicalNameMap = DeltaColumnMapping.getLogicalNameToPhysicalNameMap(referenceSchema)
filters.flatMap(translateFilterForColumnMapping(_, physicalNameMap))
} else {
filters
}
}

override def isSplitable(
sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = isSplittable

Expand Down Expand Up @@ -131,15 +151,13 @@ case class DeltaParquetFileFormat(
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
val pushdownFilters = if (disablePushDowns) Seq.empty else filters

val parquetDataReader: PartitionedFile => Iterator[InternalRow] =
super.buildReaderWithPartitionValues(
sparkSession,
prepareSchemaForRead(dataSchema),
prepareSchemaForRead(partitionSchema),
prepareSchemaForRead(requiredSchema),
pushdownFilters,
prepareFiltersForRead(filters),
options,
hadoopConf)

Expand Down Expand Up @@ -469,6 +487,68 @@ object DeltaParquetFileFormat {
/** Helper class that encapsulate an [[RowIndexFilterType]]. */
case class DeletionVectorDescriptorWithFilterType(
descriptor: DeletionVectorDescriptor,
filterType: RowIndexFilterType) {
filterType: RowIndexFilterType)

/**
* Translates the filter to use physical column names instead of logical column names.
* This is needed when the column mapping mode is set to `NameMapping` or `IdMapping`
* to match the requested schema that's passed to the [[ParquetFileFormat]].
*/
private def translateFilterForColumnMapping(
filter: Filter,
physicalNameMap: Map[Seq[String], Seq[String]]): Option[Filter] = {
object PhysicalAttribute {
def unapply(attribute: String): Option[String] = {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
physicalNameMap.get(parseColumnPath(attribute)).map(_.quoted)
}
}

filter match {
case EqualTo(PhysicalAttribute(physicalAttribute), value) =>
Some(EqualTo(physicalAttribute, value))
case EqualNullSafe(PhysicalAttribute(physicalAttribute), value) =>
Some(EqualNullSafe(physicalAttribute, value))
case GreaterThan(PhysicalAttribute(physicalAttribute), value) =>
Some(GreaterThan(physicalAttribute, value))
case GreaterThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
Some(GreaterThanOrEqual(physicalAttribute, value))
case LessThan(PhysicalAttribute(physicalAttribute), value) =>
Some(LessThan(physicalAttribute, value))
case LessThanOrEqual(PhysicalAttribute(physicalAttribute), value) =>
Some(LessThanOrEqual(physicalAttribute, value))
case In(PhysicalAttribute(physicalAttribute), values) =>
Some(In(physicalAttribute, values))
case IsNull(PhysicalAttribute(physicalAttribute)) =>
Some(IsNull(physicalAttribute))
case IsNotNull(PhysicalAttribute(physicalAttribute)) =>
Some(IsNotNull(physicalAttribute))
case And(left, right) =>
val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
val newRight = translateFilterForColumnMapping(right, physicalNameMap)
(newLeft, newRight) match {
case (Some(l), Some(r)) => Some(And(l, r))
case (Some(l), None) => Some(l)
case (_, _) => newRight
}
case Or(left, right) =>
val newLeft = translateFilterForColumnMapping(left, physicalNameMap)
val newRight = translateFilterForColumnMapping(right, physicalNameMap)
(newLeft, newRight) match {
case (Some(l), Some(r)) => Some(Or(l, r))
case (_, _) => None
}
case Not(child) =>
translateFilterForColumnMapping(child, physicalNameMap).map(Not)
case StringStartsWith(PhysicalAttribute(physicalAttribute), value) =>
Some(StringStartsWith(physicalAttribute, value))
case StringEndsWith(PhysicalAttribute(physicalAttribute), value) =>
Some(StringEndsWith(physicalAttribute, value))
case StringContains(PhysicalAttribute(physicalAttribute), value) =>
Some(StringContains(physicalAttribute, value))
case AlwaysTrue() => Some(AlwaysTrue())
case AlwaysFalse() => Some(AlwaysFalse())
case _ => None
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1946,4 +1946,44 @@ class DeltaColumnMappingSuite extends QueryTest
}
}
}

test("filters pushed down to parquet use physical names") {
val tableName = "table_name"
withTable(tableName) {
// Create a table with column mapping **disabled**
sql(
s"""CREATE TABLE $tableName (a INT, b INT)
|USING DELTA
|TBLPROPERTIES (
| '${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'none',
| '${DeltaConfigs.MIN_READER_VERSION.key}' = '2',
| '${DeltaConfigs.MIN_WRITER_VERSION.key}' = '5'
|)
|""".stripMargin)

sql(s"INSERT INTO $tableName VALUES (100, 1000)")

sql(
s"""ALTER TABLE $tableName
|SET TBLPROPERTIES ('${DeltaConfigs.COLUMN_MAPPING_MODE.key}' = 'name')
|""".stripMargin)

// Confirm that the physical names are equal to the logical names
val schema = DeltaLog.forTable(spark, TableIdentifier(tableName)).update().schema
assert(DeltaColumnMapping.getPhysicalName(schema("a")) == "a")
assert(DeltaColumnMapping.getPhysicalName(schema("b")) == "b")

// Rename the columns so that the logical name of the second column is equal to the physical
// name of the first column.
sql(s"ALTER TABLE $tableName RENAME COLUMN a TO c")
sql(s"ALTER TABLE $tableName RENAME COLUMN b TO a")

// Filter the table by the second column. This will return empty results if the filter was
// (incorrectly) pushed down without translating the logical names to physical names.
checkAnswer(
sql(s"SELECT * FROM $tableName WHERE a = 1000"),
Seq(Row(100, 1000))
)
}
}
}
Loading

0 comments on commit 11c0fa3

Please sign in to comment.