Skip to content

Commit

Permalink
First pass at Limit Push Down support
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Nov 22, 2022
1 parent e60d8b6 commit 72cbf9b
Show file tree
Hide file tree
Showing 6 changed files with 470 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,29 @@ trait OptimisticTransactionImpl extends TransactionalWrite
scan
}

/** Returns a[[DeltaScan]] based on the limit clause when there are no filters or projections. */
override def filesForScan(limit: Long): DeltaScan = {
val scan = snapshot.filesForScan(limit)
readFiles ++= scan.files
scan
}

/** Returns a[[DeltaScan]] based on the given partition filters, projections and limits. */
override def filesForScan(
limit: Long,
partitionFilters: Seq[Expression]): DeltaScan = {
partitionFilters.foreach { f =>
assert(
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark),
s"Only filters on partition columns [${metadata.partitionColumns.mkString(", ")}]" +
s" expected, found $f")
}
val scan = snapshot.filesForScan(limit, partitionFilters)
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readFiles ++= scan.files
scan
}

override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = {
val metadata = snapshot.filesWithStatsForScan(partitionFilters)
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package org.apache.spark.sql.delta.stats

// scalastyle:off import.ordering.noEmptyLine
import java.io.Closeable

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, DeltaTableUtils}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, SingleAction}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata}
import org.apache.spark.sql.delta.implicits._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -37,6 +41,24 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{AtomicType, BooleanType, CalendarIntervalType, DataType, DateType, NumericType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* Used to hold the list of files and scan stats after pruning files using the limit.
*/
case class ScanAfterLimit(
files: Seq[AddFile],
byteSize: Option[Long],
numPhysicalRecords: Option[Long],
numLogicalRecords: Option[Long])

/**
* Used in deduplicateAndFilterRemovedLocally/getFilesAndNumRecords iterator for grouping
* physical and logical number of records.
*
* @param numPhysicalRecords The number of records physically present in the file.
* @param numLogicalRecords The physical number of records minus the Deletion Vector cardinality.
*/
case class NumRecords(numPhysicalRecords: java.lang.Long, numLogicalRecords: java.lang.Long)

/**
* Represents a stats column (MIN, MAX, etc) for a given (nested) user table column name. Used to
* keep track of which stats columns a data skipping query depends on.
Expand Down Expand Up @@ -918,6 +940,84 @@ trait DataSkippingReaderBase
}
}

/**
* Gathers files that should be included in a scan based on the limit clause, when there is
* no filter or projection present. Statistics about the amount of data that will be read
* are gathered and returned.
*/
override def filesForScan(limit: Long): DeltaScan =
recordDeltaOperation(deltaLog, "delta.skipping.limit") {
val startTime = System.currentTimeMillis()
val scan = pruneFilesByLimit(withStats, limit)

val totalDataSize = new DataSize(
Some(sizeInBytes),
None,
Some(numOfFiles)
)

val scannedDataSize = new DataSize(
scan.byteSize,
scan.numPhysicalRecords,
Some(scan.files.size)
)

DeltaScan(
version = version,
files = scan.files,
total = totalDataSize,
partition = null,
scanned = scannedDataSize)(
scannedSnapshot = snapshotToScan,
partitionFilters = ExpressionSet(Nil),
dataFilters = ExpressionSet(Nil),
unusedFilters = ExpressionSet(Nil),
scanDurationMs = System.currentTimeMillis() - startTime,
dataSkippingType = DeltaDataSkippingType.limit
)
}

/**
* Gathers files that should be included in a scan based on the given predicates and limit.
* This will be called only when all predicates are on partitioning columns.
* Statistics about the amount of data that will be read are gathered and returned.
*/
override def filesForScan(limit: Long, partitionFilters: Seq[Expression]): DeltaScan =
recordDeltaOperation(deltaLog, "delta.skipping.filteredLimit") {
val startTime = System.currentTimeMillis()
val finalPartitionFilters = constructPartitionFilters(partitionFilters)

val scan = {
pruneFilesByLimit(withStats.where(finalPartitionFilters), limit)
}

val totalDataSize = new DataSize(
Some(sizeInBytes),
None,
Some(numOfFiles)
)

val scannedDataSize = new DataSize(
scan.byteSize,
scan.numPhysicalRecords,
Some(scan.files.size)
)

DeltaScan(
version = version,
files = scan.files,
total = totalDataSize,
partition = null,
scanned = scannedDataSize)(
scannedSnapshot = snapshotToScan,
partitionFilters = ExpressionSet(partitionFilters),
dataFilters = ExpressionSet(Nil),
unusedFilters = ExpressionSet(Nil),
scanDurationMs = System.currentTimeMillis() - startTime,
dataSkippingType = DeltaDataSkippingType.filteredLimit
)
}

/**
* Get AddFile (with stats) actions corresponding to given set of paths in the Snapshot.
* If a path doesn't exist in snapshot, it will be ignored and no [[AddFile]] will be returned
Expand All @@ -933,9 +1033,86 @@ trait DataSkippingReaderBase
}
}

/** Get the files and number of records within each file, to perform limit pushdown. */
def getFilesAndNumRecords(
df: DataFrame): Iterator[(AddFile, NumRecords)] with Closeable = recordFrameProfile(
"Delta", "DataSkippingReaderEdge.getFilesAndNumRecords") {
import org.apache.spark.sql.delta.implicits._

val numLogicalRecords = col("stats.numRecords")

val result = df.withColumn("numPhysicalRecords", col("stats.numRecords")) // Physical
.withColumn("numLogicalRecords", numLogicalRecords) // Logical
.withColumn("stats", nullStringLiteral)
.select(struct(col("*")).as[AddFile],
col("numPhysicalRecords").as[java.lang.Long], col("numLogicalRecords").as[java.lang.Long])
.collectAsList()

new Iterator[(AddFile, NumRecords)] with Closeable {
private val underlying = result.iterator
override def hasNext: Boolean = underlying.hasNext
override def next(): (AddFile, NumRecords) = {
val next = underlying.next()
(next._1, NumRecords(numPhysicalRecords = next._2, numLogicalRecords = next._3))
}

override def close(): Unit = {
}

}
}

protected def convertDataFrameToAddFiles(df: DataFrame): Array[AddFile] = {
df.as[AddFile].collect()
}

protected def pruneFilesByLimit(df: DataFrame, limit: Long): ScanAfterLimit = {
val withNumRecords = {
getFilesAndNumRecords(df)
}

var logicalRowsToScan = 0L
var physicalRowsToScan = 0L
var bytesToScan = 0L
var bytesToIgnore = 0L
var rowsUnknown = false

val filesAfterLimit = try {
val iter = withNumRecords
val filesToScan = ArrayBuffer[AddFile]()
val filesToIgnore = ArrayBuffer[AddFile]()
while (iter.hasNext && logicalRowsToScan < limit) {
val file = iter.next
if (file._2.numPhysicalRecords == null || file._2.numLogicalRecords == null) {
// this file has no stats, ignore for now
bytesToIgnore += file._1.size
filesToIgnore += file._1
} else {
physicalRowsToScan += file._2.numPhysicalRecords.toLong
logicalRowsToScan += file._2.numLogicalRecords.toLong
bytesToScan += file._1.size
filesToScan += file._1
}
}

// If the files that have stats do not contain enough rows, fall back to reading all files
if (logicalRowsToScan < limit && filesToIgnore.nonEmpty) {
filesToScan ++= filesToIgnore
bytesToScan += bytesToIgnore
rowsUnknown = true
}
filesToScan.toSeq
} finally {
withNumRecords.close()
}

if (rowsUnknown) {
ScanAfterLimit(filesAfterLimit, Some(bytesToScan), None, None)
} else {
ScanAfterLimit(filesAfterLimit, Some(bytesToScan),
Some(physicalRowsToScan), Some(logicalRowsToScan))
}
}
}

trait DataSkippingReader extends DataSkippingReaderBase
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package org.apache.spark.sql.delta.stats

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.stats.DeltaDataSkippingType.DeltaDataSkippingType
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils

/**
* DataSize describes following attributes for data that consists of a list of input files
Expand All @@ -42,7 +40,7 @@ case class DataSize(
rows: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
files: Option[Long] = None
)
)

object DataSize {
def apply(a: ArrayAccumulator): DataSize = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}

/** Trait representing a class that can generate [[DeltaScan]] given filters, etc. */
trait DeltaScanGeneratorBase {
/** Trait representing a class that can generate [[DeltaScan]] given filters and a limit. */
trait DeltaScanGenerator {
/** The snapshot that the scan is being generated on. */
val snapshotToScan: Snapshot

Expand All @@ -34,7 +34,11 @@ trait DeltaScanGeneratorBase {

/** Returns a [[DeltaScan]] based on the given filters. */
def filesForScan(filters: Seq[Expression], keepNumRecords: Boolean = false): DeltaScan
}

/** Returns a[[DeltaScan]] based on the limit clause when there are no filters or projections. */
def filesForScan(limit: Long): DeltaScan

/** Returns a [[DeltaScan]] based on the given partition filters and limits. */
def filesForScan(limit: Long, partitionFilters: Seq[Expression]): DeltaScan
}

trait DeltaScanGenerator extends DeltaScanGeneratorBase
Original file line number Diff line number Diff line change
Expand Up @@ -109,13 +109,22 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]

/**
* Scan files using the given `filters` and return `DeltaScan`.
*
* Note: when `limitOpt` is non empty, `filters` must contain only partition filters. Otherwise,
* it can contain arbitrary filters. See `DeltaTableScan` for more details.
*/
protected def filesForScan(
scanGenerator: DeltaScanGenerator,
limitOpt: Option[Int],
filters: Seq[Expression],
delta: LogicalRelation): DeltaScan = {
withStatusCode("DELTA", "Filtering files for query") {
if (limitOpt.nonEmpty) {
// If we trigger limit push down, the filters must be partition filters. Since
// there are no data filters, we don't need to apply Generated Columns
// optimization. See `DeltaTableScan` for more details.
return scanGenerator.filesForScan(limitOpt.get, filters)
}
val filtersForScan =
if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
filters
Expand Down Expand Up @@ -174,6 +183,12 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
filters: Seq[Expression],
limit: Option[Int],
delta: LogicalRelation): LogicalPlan = {
if (limit.nonEmpty) {
// If we trigger limit push down, the filters must be partition filters. Since
// there are no data filters, we don't need to apply Generated Columns
// optimization. See `DeltaTableScan` for more details.
return DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
}
if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
} else {
Expand Down Expand Up @@ -248,6 +263,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
* object `plan` and tries to give back the arguments as a [[DeltaTableScanType]].
*/
def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = {
val limitPushdownEnabled = spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED)

// Remove projections as a plan differentiator because it does not affect file listing
// results. Plans with the same filters but different projections therefore will not have
Expand All @@ -260,6 +276,10 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
}

plan match {
case LocalLimit(IntegerLiteral(limit),
PhysicalOperation(_, filters, delta @ DeltaTable(fileIndex: TahoeLogFileIndex)))
if limitPushdownEnabled && containsPartitionFiltersOnly(filters, fileIndex) =>
Some((canonicalizePlanForDeltaFileListing(plan), filters, fileIndex, Some(limit), delta))
case PhysicalOperation(
_,
filters,
Expand Down
Loading

0 comments on commit 72cbf9b

Please sign in to comment.