Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Failed to filter partition with udf expression #6914

Closed
ccat3z opened this issue Aug 19, 2024 · 4 comments
Closed

[VL] Failed to filter partition with udf expression #6914

ccat3z opened this issue Aug 19, 2024 · 4 comments
Labels
bug Something isn't working triage

Comments

@ccat3z
Copy link
Contributor

ccat3z commented Aug 19, 2024

Backend

VL (Velox)

Bug description

Query failed during optimize plan if filter partition with udf expression. e.g. select * from test where udf(dt) = '...'; and dt is a partition key.

Spark version

None

Spark configurations

No response

System information

No response

Relevant logs

java.lang.UnsupportedOperationException: Cannot evaluate expression: udfexpression(datekey2date, StringType, true, input[0, string, true])
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval(Expression.scala:301)
        at org.apache.spark.sql.catalyst.expressions.Unevaluable.eval$(Expression.scala:300)
        at org.apache.spark.sql.expression.UDFExpression.eval(UDFResolver.scala:97)
        at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:544)
        at org.apache.spark.sql.catalyst.expressions.InterpretedPredicate.eval(predicates.scala:51)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$3(ExternalCatalogUtils.scala:157)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.$anonfun$prunePartitionsByFilter$3$adapted(ExternalCatalogUtils.scala:156)
        at scala.collection.immutable.Stream.filterImpl(Stream.scala:506)
        at scala.collection.immutable.Stream.filterImpl(Stream.scala:204)
        at scala.collection.TraversableLike.filter(TraversableLike.scala:347)
        at scala.collection.TraversableLike.filter$(TraversableLike.scala:347)
        at scala.collection.AbstractTraversable.filter(Traversable.scala:108)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils$.prunePartitionsByFilter(ExternalCatalogUtils.scala:156)
        at org.apache.spark.sql.hive.HiveExternalCatalog.$anonfun$listPartitionsByFilterByView$1(HiveExternalCatalog.scala:1403)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:106)
        at org.apache.spark.sql.hive.HiveExternalCatalog.listPartitionsByFilterByView(HiveExternalCatalog.scala:1392)
        at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.listPartitionsByFilterByView(ExternalCatalogWithListener.scala:308)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listPartitionsByFilterByView(SessionCatalog.scala:1358)
        at org.apache.spark.sql.execution.datasources.CatalogFileIndex.filterPartitions(CatalogFileIndex.scala:83)
        at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:185)
        at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$$anonfun$apply$1.applyOrElse(PruneFileSourcePartitions.scala:164)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:431)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:78)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:431)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:407)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown(AnalysisHelper.scala:162)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDown$(AnalysisHelper.scala:160)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDown(LogicalPlan.scala:29)
        at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:164)
        at org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions$.apply(PruneFileSourcePartitions.scala:51)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:280)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:89)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:277)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:269)
        at scala.collection.immutable.List.foreach(List.scala:392)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:269)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:247)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:247)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:89)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:145)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:145)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:89)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:86)
        at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:92)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:110)
        at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$3(QueryExecution.scala:161)
        at org.apache.spark.sql.catalyst.plans.QueryPlan$.append(QueryPlan.scala:646)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:161)
        at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:148)
        at org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:180)
        at org.apache.spark.sql.execution.command.ExplainCommand.run(commands.scala:158)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:233)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3634)
        at org.apache.spark.sql.execution.SQLExecution$.withNewQueryId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:185)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:112)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:89)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3632)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:233)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$4(Dataset.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withNewQueryId(SQLExecution.scala:58)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$3(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:100)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:610)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:767)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:605)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:63)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:407)
        at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:386)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:486)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:312)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1962)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:1119)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:1142)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:95)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:2351)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:2360)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
@ccat3z ccat3z added bug Something isn't working triage labels Aug 19, 2024
@ccat3z
Copy link
Contributor Author

ccat3z commented Aug 19, 2024

It maybe related to #6829

@ccat3z
Copy link
Contributor Author

ccat3z commented Aug 19, 2024

cc @kecookier

@marin-ma
Copy link
Contributor

marin-ma commented Aug 19, 2024

@ccat3z The partition filter is computed in Rule PruneFileSourcePartitions against the logical plan. It's not in the execution phase so Gluten cannot take over the evaluation of the partition filters. Therefore, the udf in partition filter must be the Java-based udf.

With #6829, user can register the native implementation for a hive udf while also registering the Java-based udf via create temporary function. This allows Spark to locate and use the Java-based udf when calling Expression.eval. With this patch, SQL execution will be offloaded if validation passes; otherwise, it will fall back to the Java-based implementation. However, for partition filters, it directly uses the java-based implementation. Here's an example 41ec4c7

@marin-ma marin-ma changed the title Failed to filter partition with udf expression [VL] Failed to filter partition with udf expression Aug 20, 2024
@ccat3z
Copy link
Contributor Author

ccat3z commented Aug 30, 2024

Closed by #6829

@ccat3z ccat3z closed this as completed Aug 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working triage
Projects
None yet
Development

No branches or pull requests

2 participants