Skip to content

Commit

Permalink
fix: Fallback to Spark if scan has meta columns (#997)
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya authored Oct 7, 2024
1 parent b131cc3 commit 0028f1e
Showing 1 changed file with 14 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder}
import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder}
import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial}
import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
import org.apache.spark.sql.comet._
import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager}
import org.apache.spark.sql.comet.util.Utils
Expand Down Expand Up @@ -101,7 +102,19 @@ class CometSparkSessionExtensions
def isDynamicPruningFilter(e: Expression): Boolean =
e.exists(_.isInstanceOf[PlanExpression[_]])

def hasMetadataCol(plan: SparkPlan): Boolean = {
plan.expressions.exists(_.exists {
case a: Attribute =>
a.isMetadataCol
case _ => false
})
}

plan.transform {
case scan if hasMetadataCol(scan) =>
withInfo(scan, "Metadata column is not supported")
scan

case scanExec: FileSourceScanExec
if COMET_DPP_FALLBACK_ENABLED.get() &&
scanExec.partitionFilters.exists(isDynamicPruningFilter) =>
Expand Down

0 comments on commit 0028f1e

Please sign in to comment.