Skip to content

Commit

Permalink
Optimize common case: SELECT COUNT(*) FROM Table Fix #1192
Browse files Browse the repository at this point in the history
## Description

Running the query "SELECT COUNT(*) FROM Table" takes a lot of time for big tables, Spark scan all the parquet files just to return the number of rows, that information is available from Delta Logs.

Resolves #1192

Created unit tests to validate the optimization works, including cases not covered by this optimization.

## Does this PR introduce _any_ user-facing changes?

Only performance improvement

Closes #1377

Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
GitOrigin-RevId: a9116e42a9c805adc967dd3e802f84d502f50a8b
  • Loading branch information
felipepessoto authored and allisonport-db committed Nov 28, 2022
1 parent 1521be5 commit 0c349da
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.optimizer

import org.apache.spark.sql.Column
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Complete, Count}
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.delta.DeltaTable
import org.apache.spark.sql.delta.files.TahoeLogFileIndex
import org.apache.spark.sql.delta.stats.DeltaScanGenerator
import org.apache.spark.sql.functions.{count, sum}

trait OptimizeMetadataOnlyDeltaQuery {
def optimizeQueryWithMetadata(plan: LogicalPlan): LogicalPlan = {
plan.transformUpWithSubqueries {
case agg@CountStarDeltaTable(countValue) =>
LocalRelation(agg.output, Seq(InternalRow(countValue)))
}
}

protected def getDeltaScanGenerator(index: TahoeLogFileIndex): DeltaScanGenerator

object CountStarDeltaTable {
def unapply(plan: Aggregate): Option[Long] = {
plan match {
case Aggregate(
Nil,
Seq(Alias(AggregateExpression(Count(Seq(Literal(1, _))), Complete, false, None, _), _)),
PhysicalOperation(_, Nil, DeltaTable(tahoeLogFileIndex: TahoeLogFileIndex))) =>
extractGlobalCount(tahoeLogFileIndex)
case _ => None
}
}

private def extractGlobalCount(tahoeLogFileIndex: TahoeLogFileIndex): Option[Long] = {
val row = getDeltaScanGenerator(tahoeLogFileIndex).filesWithStatsForScan(Nil)
.agg(
sum("stats.numRecords"),
count(new Column("*")),
count(new Column("stats.numRecords")))
.first

val numOfFiles = row.getLong(1)
val numOfFilesWithStats = row.getLong(2)

if (numOfFiles == numOfFilesWithStats) {
val numRecords = if (row.isNullAt(0)) {
0 // It is Null if deltaLog.snapshot.allFiles is empty
} else { row.getLong(0) }

Some(numRecords)
} else {
// If COUNT(*) is greater than COUNT(numRecords) means not every AddFile records has stats
None
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,14 @@ trait DeltaSQLConfBase {
" concurrent queries accessing the table until the history wipe is complete.")
.booleanConf
.createWithDefault(false)

val DELTA_OPTIMIZE_METADATA_QUERY_ENABLED =
buildConf("optimizeMetadataQuery.enabled")
.internal()
.doc("Whether we can use the metadata in the DeltaLog to" +
" optimize queries that can be run purely on metadata.")
.booleanConf
.createWithDefault(true)
}

object DeltaSQLConf extends DeltaSQLConfBase
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.actions.{AddFile, Metadata}
import org.apache.spark.sql.delta.files.{TahoeFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.optimizer.OptimizeMetadataOnlyDeltaQuery
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.hadoop.fs.Path

Expand All @@ -49,7 +50,8 @@ import org.apache.spark.sql.types.StructType
*/
trait PrepareDeltaScanBase extends Rule[LogicalPlan]
with PredicateHelper
with DeltaLogging { self: PrepareDeltaScan =>
with DeltaLogging
with OptimizeMetadataOnlyDeltaQuery { self: PrepareDeltaScan =>

private val snapshotIsolationEnabled = spark.conf.get(DeltaSQLConf.DELTA_SNAPSHOT_ISOLATION)

Expand Down Expand Up @@ -199,7 +201,6 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_STATS_SKIPPING)
)
if (shouldPrepareDeltaScan) {

// Should not be applied to subqueries to avoid duplicate delta jobs.
val isSubquery = plan.isInstanceOf[Subquery] || plan.isInstanceOf[SupportsSubquery]
// Should not be applied to DataSourceV2 write plans, because they'll be planned later
Expand All @@ -209,6 +210,13 @@ trait PrepareDeltaScanBase extends Rule[LogicalPlan]
return plan
}

val optimizeMetadataQueryEnabled =
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED)

if (optimizeMetadataQueryEnabled) {
plan = optimizeQueryWithMetadata(plan)
}

prepareDeltaScan(plan)
} else {
// If this query is running inside an active transaction and is touching the same table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,8 +554,9 @@ abstract class DeltaHistoryManagerBase extends DeltaTimeTravelTests
sql(s"optimize $tblName")

withSQLConf(
// Disable query rewrite or else the parquet files are not scanned.
DeltaSQLConf.DELTA_OPTIMIZE_METADATA_QUERY_ENABLED.key -> "false",
DeltaSQLConf.DELTA_VACUUM_RETENTION_CHECK_ENABLED.key -> "false") {

sql(s"vacuum $tblName retain 0 hours")
intercept[SparkException] {
sql(s"select * from ${versionAsOf(tblName, 0)}").collect()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1498,7 +1498,7 @@ class DeltaSuite extends QueryTest
}

val thrown = intercept[SparkException] {
data.toDF().count()
data.toDF().collect()
}
assert(thrown.getMessage.contains("is not a Parquet file"))
}
Expand Down Expand Up @@ -1528,7 +1528,7 @@ class DeltaSuite extends QueryTest
// We don't have a good way to tell which specific values got deleted, so just check that
// the right number remain. (Note that this works because there's 1 value per append, which
// means 1 value per file.)
assert(data.toDF().count() == 6)
assert(data.toDF().collect().size == 6)
}
}
}
Expand All @@ -1553,7 +1553,7 @@ class DeltaSuite extends QueryTest
}

val thrown = intercept[SparkException] {
data.toDF().count()
data.toDF().collect()
}
assert(thrown.getMessage.contains("FileNotFound"))
}
Expand Down
Loading

0 comments on commit 0c349da

Please sign in to comment.