Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] NDS query 5 fails with AdaptiveSparkPlanExec assertion #4625

Closed
jlowe opened this issue Jan 25, 2022 · 5 comments · Fixed by #4630
Closed

[BUG] NDS query 5 fails with AdaptiveSparkPlanExec assertion #4625

jlowe opened this issue Jan 25, 2022 · 5 comments · Fixed by #4630
Labels
bug Something isn't working P0 Must have for release

Comments

@jlowe
Copy link
Contributor

jlowe commented Jan 25, 2022

Running NDS query 5 with Spark 3.2.0 on partitioned input results in the following exception:

org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.executeCollect(SubqueryBroadcastExec.scala:113)
	at org.apache.spark.sql.execution.ReusedSubqueryExec.executeCollect(basicPhysicalOperators.scala:902)
	at org.apache.spark.sql.execution.InSubqueryExec.updateResult(subquery.scala:127)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:252)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1$adapted(SparkPlan.scala:251)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.execution.SparkPlan.waitForSubqueries(SparkPlan.scala:251)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:221)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuFilterExec.doExecuteColumnar(basicPhysicalOperators.scala:362)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuCoalesceBatches.doExecuteColumnar(GpuCoalesceBatches.scala:584)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:145)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuUnionExec.$anonfun$doExecuteColumnar$16(basicPhysicalOperators.scala:716)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.AbstractTraversable.map(Traversable.scala:108)
	at com.nvidia.spark.rapids.GpuUnionExec.doExecuteColumnar(basicPhysicalOperators.scala:716)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuBroadcastHashJoinExec.doExecuteColumnar(GpuBroadcastHashJoinExec.scala:166)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:145)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuBroadcastHashJoinExec.doExecuteColumnar(GpuBroadcastHashJoinExec.scala:166)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuProjectExec.doExecuteColumnar(basicPhysicalOperators.scala:145)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at com.nvidia.spark.rapids.GpuHashAggregateExec.doExecuteColumnar(aggregate.scala:1403)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeColumnar$1(SparkPlan.scala:211)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeColumnar(SparkPlan.scala:207)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD$lzycompute(GpuShuffleExchangeExecBase.scala:192)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBase.inputBatchRDD(GpuShuffleExchangeExecBase.scala:192)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics.mapOutputStatisticsFuture$lzycompute(GpuShuffleExchangeExecBase.scala:135)
	at org.apache.spark.sql.rapids.execution.GpuShuffleExchangeExecBaseWithMetrics.mapOutputStatisticsFuture(GpuShuffleExchangeExecBase.scala:134)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.$anonfun$submitShuffleJob$1(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob(ShuffleExchangeExec.scala:68)
	at org.apache.spark.sql.execution.exchange.ShuffleExchangeLike.submitShuffleJob$(ShuffleExchangeExec.scala:67)
	at org.apache.spark.rapids.shims.v2.GpuShuffleExchangeExec.submitShuffleJob(GpuShuffleExchangeExec.scala:28)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture$lzycompute(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.shuffleFuture(QueryStageExec.scala:170)
	at org.apache.spark.sql.execution.adaptive.ShuffleQueryStageExec.cancel(QueryStageExec.scala:183)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$cleanUpAndThrowException$1(AdaptiveSparkPlanExec.scala:723)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$cleanUpAndThrowException$1$adapted(AdaptiveSparkPlanExec.scala:718)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:253)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1(TreeNode.scala:254)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$foreach$1$adapted(TreeNode.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreach(TreeNode.scala:254)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.cleanUpAndThrowException(AdaptiveSparkPlanExec.scala:718)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5(AdaptiveSparkPlanExec.scala:265)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$5$adapted(AdaptiveSparkPlanExec.scala:254)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$getFinalPhysicalPlan$1(AdaptiveSparkPlanExec.scala:254)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.getFinalPhysicalPlan(AdaptiveSparkPlanExec.scala:226)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:365)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715)
	at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2971)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704)
	at org.apache.spark.sql.Dataset.collect(Dataset.scala:2971)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.$anonfun$res1$1(<console>:24)
	at org.apache.spark.sql.SparkSession.time(SparkSession.scala:679)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:24)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:28)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw$$iw.<init>(<console>:30)
	at $line16.$read$$iw$$iw$$iw$$iw$$iw.<init>(<console>:32)
	at $line16.$read$$iw$$iw$$iw$$iw.<init>(<console>:34)
	at $line16.$read$$iw$$iw$$iw.<init>(<console>:36)
	at $line16.$read$$iw$$iw.<init>(<console>:38)
	at $line16.$read$$iw.<init>(<console>:40)
	at $line16.$read.<init>(<console>:42)
	at $line16.$read$.<init>(<console>:46)
	at $line16.$read$.<clinit>(<console>)
	at $line16.$eval$.$print$lzycompute(<console>:7)
	at $line16.$eval$.$print(<console>:6)
	at $line16.$eval.$print(<console>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:747)
	at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1020)
	at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:568)
	at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:36)
	at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:116)
	at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
	at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:567)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:594)
	at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:564)
	at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:865)
	at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:733)
	at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:435)
	at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:456)
	at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:239)
	at org.apache.spark.repl.Main$.doMain(Main.scala:78)
	at org.apache.spark.repl.Main$.main(Main.scala:58)
	at org.apache.spark.repl.Main.main(Main.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.util.concurrent.ExecutionException: Boxed Error
	at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
	at scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
	at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
	at scala.concurrent.Promise.complete(Promise.scala:53)
	at scala.concurrent.Promise.complete$(Promise.scala:52)
	at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:187)
	at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
	at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:208)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$doExecuteBroadcast$1(AdaptiveSparkPlanExec.scala:359)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.doExecuteBroadcast(AdaptiveSparkPlanExec.scala:358)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeBroadcast$1(SparkPlan.scala:197)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:222)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:219)
	at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:193)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:81)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:139)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:137)
	at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$1(SubqueryBroadcastExec.scala:78)
	at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
	at scala.util.Success.$anonfun$map$1(Try.scala:255)
	at scala.util.Success.map(Try.scala:213)
	at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
	at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
	... 5 more
@jlowe jlowe added bug Something isn't working ? - Needs Triage Need team to review and classify P0 Must have for release labels Jan 25, 2022
@jlowe
Copy link
Contributor Author

jlowe commented Jan 25, 2022

The assertion is for the child of AdaptiveSparkPlanExec to be a BroadcastQueryStageExec, but it finds a WholeStageCodegenExec for the child instead with this plan:

*(1) ColumnarToRow
+- BroadcastQueryStage 1
   +- ReusedExchange [d_date_sk#1422], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#3848]

@jlowe
Copy link
Contributor Author

jlowe commented Jan 25, 2022

@sperlingxx and @andygrove can you take a look at this? I think it may be related to #4440. It looks like Spark tries to reuse a GpuBroadcastExchange for a context that isn't going to execute on the GPU (or at least it doesn't know yet whether it will), so a ColumnarToRow transition gets inserted which then screws up AdaptiveSparkPlanExec's assumptions about the child node. That seems like a bug in either Spark's planning of columnar plans or AdaptiveSparkPlanExec's handling of columnar plans, as Spark's rules are injecting the plan node that makes AdaptiveSparkPlanExec upset.

I suspect we may be able to fix this specific instance by peeling off the ColumnarToRowExec, as the plan after the AdaptiveSparkPlanExec may ultimately be planned on the GPU later, but I do wonder about the case where the GpuBroadcastExchange really is trying to be reused by a non-GPU plan.

@sameerz sameerz removed the ? - Needs Triage Need team to review and classify label Jan 25, 2022
@sameerz sameerz added this to the Jan 10 - Jan 28 milestone Jan 25, 2022
@andygrove
Copy link
Contributor

I reproduced the issue and tried just removing the ColumnarToRowExec but this then resulted in a different problem:

Caused by: java.lang.ClassCastException: org.apache.spark.sql.rapids.execution.SerializeConcatHostBuffersDeserializeBatch cannot be cast to org.apache.spark.sql.execution.joins.HashedRelation
        at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$2(SubqueryBroadcastExec.scala:81)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withExecutionId$1(SQLExecution.scala:139)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.withExecutionId(SQLExecution.scala:137)
        at org.apache.spark.sql.execution.SubqueryBroadcastExec.$anonfun$relationFuture$1(SubqueryBroadcastExec.scala:78)

This was called from:

        at org.apache.spark.sql.execution.SubqueryBroadcastExec.executeCollect(SubqueryBroadcastExec.scala:113)
        at org.apache.spark.sql.execution.ReusedSubqueryExec.executeCollect(basicPhysicalOperators.scala:902)
        at org.apache.spark.sql.execution.InSubqueryExec.updateResult(subquery.scala:127)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$waitForSubqueries$1(SparkPlan.scala:252)

This code path assumes row-based results and I am not sure if we can override this behavior here.

I then tried another approach where I pushed the ColumnarToRow to inside the BroadcastQueryStage (which possibly partly defeats the point of reusing GPU broadcasts in the first place?) using this logic:

      case ColumnarToRowExec(qs: BroadcastQueryStageExec) =>
        qs.copy(plan = ColumnarToRowExec(qs.plan))

This failed with:

java.lang.IllegalStateException: wrong plan for broadcast stage:
 ColumnarToRow
+- ReusedExchange [d_date_sk#972], GpuBroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [id=#1776]

        at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec.<init>(QueryStageExec.scala:218)
        at org.apache.spark.sql.execution.adaptive.BroadcastQueryStageExec.copy(QueryStageExec.scala:212)
        at com.nvidia.spark.rapids.GpuTransitionOverrides.optimizeAdaptiveTransitions(GpuTransitionOverrides.scala:81)

caused by this logic in BroadcastQueryStageExec:

@transient val broadcast = plan match {
    case b: BroadcastExchangeLike => b
    case ReusedExchangeExec(_, b: BroadcastExchangeLike) => b
    case _ =>
      throw new IllegalStateException(s"wrong plan for broadcast stage:\n ${plan.treeString}")
  }

I do not see an obvious solution to this issue yet but will keep looking.

@sperlingxx
Copy link
Collaborator

sperlingxx commented Jan 26, 2022

Hi @jlowe @andygrove, I am really sorry for causing this bug.

I think I found the root cause: when we start to replace SubqueryBroadcast with GPU overrides, we missed a potenial condition when searching SubqueryBroadcast through pattern matching: code link.

          private lazy val partitionFilters = wrapped.partitionFilters.map { filter =>
            filter.transformDown {
              case dpe @ DynamicPruningExpression(inSub: InSubqueryExec)
                if inSub.plan.isInstanceOf[SubqueryBroadcastExec] =>

                val subBcMeta = GpuOverrides.wrapAndTagPlan(inSub.plan, conf)
                subBcMeta.tagForExplain()
                val gpuSubBroadcast = subBcMeta.convertIfNeeded().asInstanceOf[BaseSubqueryExec]
                dpe.copy(inSub.copy(plan = gpuSubBroadcast))
            }
          }

There exists another possible pattern: the SubqueryBroadcastExec may be used. If it is reused before applying GPU overrides rules, the pattern becomes DynamicPruningExpression(InSubqueryExec(ReusedSubqueryExec(SubqueryBroadcastExec)))).

If I am not wrong, this pattern only appears when AQE is on. When AQE is off, no rules other than ReuseExchangeAndSubquery will try to reuse subquery. However, when AQE on, another rule ReuseAdaptiveSubquery will try to reuse subquery as well. What's more, as one of the queryStageOptimizerRules, ReuseAdaptiveSubquery is applied before the GPU overriding during the build of each query stage: link.

Under this circumstance, GpuSubqueryBroadcastMeta can not do its job: replacing AdaptiveSparkPlanExec with a columnar copy. However, AdaptiveSparkPlanExec of the subquery will transform BroadcastExchange to GpuBroadcastExchange. Then, the ColumnarToRow will be unexpectly inserted between BroadcastQueryStage and AdaptiveSparkPlanExec by post columnar transitions.

Perhaps it was a little bit reckless to simply remove GpuBroadcastToCpuExec in #4440. It can prevent the plugin crashing directly for unexpected conditions like this one.

@revans2
Copy link
Collaborator

revans2 commented Jan 26, 2022

Once this is fixed, can we try to do some fuzz testing to see if there is anything else that we might have missed? I know it is a large task, but it would really help me feel more comfortable that we have both DPP and AQE covered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working P0 Must have for release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants