From 0028f1e16d0c769129a08843fdf6e6c1484d86f4 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 7 Oct 2024 08:59:22 -0700 Subject: [PATCH] fix: Fallback to Spark if scan has meta columns (#997) --- .../comet/CometSparkSessionExtensions.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala index bf09d6417..0185a15e5 100644 --- a/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala +++ b/spark/src/main/scala/org/apache/comet/CometSparkSessionExtensions.scala @@ -25,11 +25,12 @@ import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.network.util.ByteUnit import org.apache.spark.sql.{SparkSession, SparkSessionExtensions} -import org.apache.spark.sql.catalyst.expressions.{Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Divide, DoubleLiteral, EqualNullSafe, EqualTo, Expression, FloatLiteral, GreaterThan, GreaterThanOrEqual, KnownFloatingPointNormalized, LessThan, LessThanOrEqual, NamedExpression, PlanExpression, Remainder} import org.apache.spark.sql.catalyst.expressions.aggregate.{Final, Partial} import org.apache.spark.sql.catalyst.optimizer.NormalizeNaNAndZero import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreeNode +import org.apache.spark.sql.catalyst.util.MetadataColumnHelper import org.apache.spark.sql.comet._ import org.apache.spark.sql.comet.execution.shuffle.{CometColumnarShuffle, CometNativeShuffle, CometShuffleExchangeExec, CometShuffleManager} import org.apache.spark.sql.comet.util.Utils @@ -101,7 +102,19 @@ class CometSparkSessionExtensions def isDynamicPruningFilter(e: Expression): Boolean = e.exists(_.isInstanceOf[PlanExpression[_]]) + def hasMetadataCol(plan: SparkPlan): Boolean = { + plan.expressions.exists(_.exists { + case a: Attribute => + a.isMetadataCol + case _ => false + }) + } + plan.transform { + case scan if hasMetadataCol(scan) => + withInfo(scan, "Metadata column is not supported") + scan + case scanExec: FileSourceScanExec if COMET_DPP_FALLBACK_ENABLED.get() && scanExec.partitionFilters.exists(isDynamicPruningFilter) =>