Skip to content

Commit

Permalink
Add support for LIMIT push down during query planning to reduce num…
Browse files Browse the repository at this point in the history
…ber of files scanned

### Description

This PR adds support for limit pushdown, where we will "push down" any `LIMIT`s during query planning so that we scan the minimum number of files necessary.

### How was this patch tested?
New test suite.

## Does this PR introduce _any_ user-facing changes?

No.

Resolves #1495

GitOrigin-RevId: 43d228fb10affbd87a10aabeba240525403c71dd
  • Loading branch information
scottsand-db authored and vkorukanti committed Nov 29, 2022
1 parent 2574deb commit 1a94a58
Show file tree
Hide file tree
Showing 6 changed files with 469 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,29 @@ trait OptimisticTransactionImpl extends TransactionalWrite
scan
}

/** Returns a[[DeltaScan]] based on the limit clause when there are no filters or projections. */
override def filesForScan(limit: Long): DeltaScan = {
val scan = snapshot.filesForScan(limit)
readFiles ++= scan.files
scan
}

/** Returns a[[DeltaScan]] based on the given partition filters, projections and limits. */
override def filesForScan(
limit: Long,
partitionFilters: Seq[Expression]): DeltaScan = {
partitionFilters.foreach { f =>
assert(
DeltaTableUtils.isPredicatePartitionColumnsOnly(f, metadata.partitionColumns, spark),
s"Only filters on partition columns [${metadata.partitionColumns.mkString(", ")}]" +
s" expected, found $f")
}
val scan = snapshot.filesForScan(limit, partitionFilters)
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
readFiles ++= scan.files
scan
}

override def filesWithStatsForScan(partitionFilters: Seq[Expression]): DataFrame = {
val metadata = snapshot.filesWithStatsForScan(partitionFilters)
readPredicates += partitionFilters.reduceLeftOption(And).getOrElse(Literal(true))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@
package org.apache.spark.sql.delta.stats

// scalastyle:off import.ordering.noEmptyLine
import java.io.Closeable

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.delta.{DeltaColumnMapping, DeltaLog, DeltaTableUtils}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata, SingleAction}
import org.apache.spark.sql.delta.actions.{AddFile, Metadata}
import org.apache.spark.sql.delta.implicits._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
Expand All @@ -37,6 +41,24 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{AtomicType, BooleanType, CalendarIntervalType, DataType, DateType, NumericType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

/**
* Used to hold the list of files and scan stats after pruning files using the limit.
*/
case class ScanAfterLimit(
files: Seq[AddFile],
byteSize: Option[Long],
numPhysicalRecords: Option[Long],
numLogicalRecords: Option[Long])

/**
* Used in deduplicateAndFilterRemovedLocally/getFilesAndNumRecords iterator for grouping
* physical and logical number of records.
*
* @param numPhysicalRecords The number of records physically present in the file.
* @param numLogicalRecords The physical number of records minus the Deletion Vector cardinality.
*/
case class NumRecords(numPhysicalRecords: java.lang.Long, numLogicalRecords: java.lang.Long)

/**
* Represents a stats column (MIN, MAX, etc) for a given (nested) user table column name. Used to
* keep track of which stats columns a data skipping query depends on.
Expand Down Expand Up @@ -918,6 +940,84 @@ trait DataSkippingReaderBase
}
}

/**
* Gathers files that should be included in a scan based on the limit clause, when there is
* no filter or projection present. Statistics about the amount of data that will be read
* are gathered and returned.
*/
override def filesForScan(limit: Long): DeltaScan =
recordDeltaOperation(deltaLog, "delta.skipping.limit") {
val startTime = System.currentTimeMillis()
val scan = pruneFilesByLimit(withStats, limit)

val totalDataSize = new DataSize(
sizeInBytesOpt,
None,
numOfFilesOpt
)

val scannedDataSize = new DataSize(
scan.byteSize,
scan.numPhysicalRecords,
Some(scan.files.size)
)

DeltaScan(
version = version,
files = scan.files,
total = totalDataSize,
partition = null,
scanned = scannedDataSize)(
scannedSnapshot = snapshotToScan,
partitionFilters = ExpressionSet(Nil),
dataFilters = ExpressionSet(Nil),
unusedFilters = ExpressionSet(Nil),
scanDurationMs = System.currentTimeMillis() - startTime,
dataSkippingType = DeltaDataSkippingType.limit
)
}

/**
* Gathers files that should be included in a scan based on the given predicates and limit.
* This will be called only when all predicates are on partitioning columns.
* Statistics about the amount of data that will be read are gathered and returned.
*/
override def filesForScan(limit: Long, partitionFilters: Seq[Expression]): DeltaScan =
recordDeltaOperation(deltaLog, "delta.skipping.filteredLimit") {
val startTime = System.currentTimeMillis()
val finalPartitionFilters = constructPartitionFilters(partitionFilters)

val scan = {
pruneFilesByLimit(withStats.where(finalPartitionFilters), limit)
}

val totalDataSize = new DataSize(
sizeInBytesOpt,
None,
numOfFilesOpt
)

val scannedDataSize = new DataSize(
scan.byteSize,
scan.numPhysicalRecords,
Some(scan.files.size)
)

DeltaScan(
version = version,
files = scan.files,
total = totalDataSize,
partition = null,
scanned = scannedDataSize)(
scannedSnapshot = snapshotToScan,
partitionFilters = ExpressionSet(partitionFilters),
dataFilters = ExpressionSet(Nil),
unusedFilters = ExpressionSet(Nil),
scanDurationMs = System.currentTimeMillis() - startTime,
dataSkippingType = DeltaDataSkippingType.filteredLimit
)
}

/**
* Get AddFile (with stats) actions corresponding to given set of paths in the Snapshot.
* If a path doesn't exist in snapshot, it will be ignored and no [[AddFile]] will be returned
Expand All @@ -933,9 +1033,86 @@ trait DataSkippingReaderBase
}
}

/** Get the files and number of records within each file, to perform limit pushdown. */
def getFilesAndNumRecords(
df: DataFrame): Iterator[(AddFile, NumRecords)] with Closeable = recordFrameProfile(
"Delta", "DataSkippingReaderEdge.getFilesAndNumRecords") {
import org.apache.spark.sql.delta.implicits._

val numLogicalRecords = col("stats.numRecords")

val result = df.withColumn("numPhysicalRecords", col("stats.numRecords")) // Physical
.withColumn("numLogicalRecords", numLogicalRecords) // Logical
.withColumn("stats", nullStringLiteral)
.select(struct(col("*")).as[AddFile],
col("numPhysicalRecords").as[java.lang.Long], col("numLogicalRecords").as[java.lang.Long])
.collectAsList()

new Iterator[(AddFile, NumRecords)] with Closeable {
private val underlying = result.iterator
override def hasNext: Boolean = underlying.hasNext
override def next(): (AddFile, NumRecords) = {
val next = underlying.next()
(next._1, NumRecords(numPhysicalRecords = next._2, numLogicalRecords = next._3))
}

override def close(): Unit = {
}

}
}

protected def convertDataFrameToAddFiles(df: DataFrame): Array[AddFile] = {
df.as[AddFile].collect()
}

protected def pruneFilesByLimit(df: DataFrame, limit: Long): ScanAfterLimit = {
val withNumRecords = {
getFilesAndNumRecords(df)
}

var logicalRowsToScan = 0L
var physicalRowsToScan = 0L
var bytesToScan = 0L
var bytesToIgnore = 0L
var rowsUnknown = false

val filesAfterLimit = try {
val iter = withNumRecords
val filesToScan = ArrayBuffer[AddFile]()
val filesToIgnore = ArrayBuffer[AddFile]()
while (iter.hasNext && logicalRowsToScan < limit) {
val file = iter.next
if (file._2.numPhysicalRecords == null || file._2.numLogicalRecords == null) {
// this file has no stats, ignore for now
bytesToIgnore += file._1.size
filesToIgnore += file._1
} else {
physicalRowsToScan += file._2.numPhysicalRecords.toLong
logicalRowsToScan += file._2.numLogicalRecords.toLong
bytesToScan += file._1.size
filesToScan += file._1
}
}

// If the files that have stats do not contain enough rows, fall back to reading all files
if (logicalRowsToScan < limit && filesToIgnore.nonEmpty) {
filesToScan ++= filesToIgnore
bytesToScan += bytesToIgnore
rowsUnknown = true
}
filesToScan.toSeq
} finally {
withNumRecords.close()
}

if (rowsUnknown) {
ScanAfterLimit(filesAfterLimit, Some(bytesToScan), None, None)
} else {
ScanAfterLimit(filesAfterLimit, Some(bytesToScan),
Some(physicalRowsToScan), Some(logicalRowsToScan))
}
}
}

trait DataSkippingReader extends DataSkippingReaderBase
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@

package org.apache.spark.sql.delta.stats

// scalastyle:off import.ordering.noEmptyLine
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.stats.DeltaDataSkippingType.DeltaDataSkippingType
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.PartitioningUtils

/**
* DataSize describes following attributes for data that consists of a list of input files
Expand All @@ -42,7 +40,7 @@ case class DataSize(
rows: Option[Long] = None,
@JsonDeserialize(contentAs = classOf[java.lang.Long])
files: Option[Long] = None
)
)

object DataSize {
def apply(a: ArrayAccumulator): DataSize = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.delta.{Snapshot, SnapshotDescriptor}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}

/** Trait representing a class that can generate [[DeltaScan]] given filters, etc. */
/** Trait representing a class that can generate [[DeltaScan]] given filters and a limit. */
trait DeltaScanGenerator {
/** The snapshot that the scan is being generated on. */
val snapshotToScan: Snapshot
Expand All @@ -34,4 +34,10 @@ trait DeltaScanGenerator {

/** Returns a [[DeltaScan]] based on the given filters. */
def filesForScan(filters: Seq[Expression], keepNumRecords: Boolean = false): DeltaScan

/** Returns a[[DeltaScan]] based on the limit clause when there are no filters or projections. */
def filesForScan(limit: Long): DeltaScan

/** Returns a [[DeltaScan]] based on the given partition filters and limits. */
def filesForScan(limit: Long, partitionFilters: Seq[Expression]): DeltaScan
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,22 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]

/**
* Scan files using the given `filters` and return `DeltaScan`.
*
* Note: when `limitOpt` is non empty, `filters` must contain only partition filters. Otherwise,
* it can contain arbitrary filters. See `DeltaTableScan` for more details.
*/
protected def filesForScan(
scanGenerator: DeltaScanGenerator,
limitOpt: Option[Int],
filters: Seq[Expression],
delta: LogicalRelation): DeltaScan = {
withStatusCode("DELTA", "Filtering files for query") {
if (limitOpt.nonEmpty) {
// If we trigger limit push down, the filters must be partition filters. Since
// there are no data filters, we don't need to apply Generated Columns
// optimization. See `DeltaTableScan` for more details.
return scanGenerator.filesForScan(limitOpt.get, filters)
}
val filtersForScan =
if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
filters
Expand Down Expand Up @@ -165,6 +174,12 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
filters: Seq[Expression],
limit: Option[Int],
delta: LogicalRelation): LogicalPlan = {
if (limit.nonEmpty) {
// If we trigger limit push down, the filters must be partition filters. Since
// there are no data filters, we don't need to apply Generated Columns
// optimization. See `DeltaTableScan` for more details.
return DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
}
if (!GeneratedColumn.partitionFilterOptimizationEnabled(spark)) {
DeltaTableUtils.replaceFileIndex(scan, preparedIndex)
} else {
Expand Down Expand Up @@ -242,6 +257,7 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
* object `plan` and tries to give back the arguments as a [[DeltaTableScanType]].
*/
def unapply(plan: LogicalPlan): Option[DeltaTableScanType] = {
val limitPushdownEnabled = spark.conf.get(DeltaSQLConf.DELTA_LIMIT_PUSHDOWN_ENABLED)

// Remove projections as a plan differentiator because it does not affect file listing
// results. Plans with the same filters but different projections therefore will not have
Expand All @@ -254,6 +270,10 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
}

plan match {
case LocalLimit(IntegerLiteral(limit),
PhysicalOperation(_, filters, delta @ DeltaTable(fileIndex: TahoeLogFileIndex)))
if limitPushdownEnabled && containsPartitionFiltersOnly(filters, fileIndex) =>
Some((canonicalizePlanForDeltaFileListing(plan), filters, fileIndex, Some(limit), delta))
case PhysicalOperation(
_,
filters,
Expand Down
Loading

0 comments on commit 1a94a58

Please sign in to comment.