diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala index a2029816c231f..fc879f7e98f15 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala @@ -85,20 +85,16 @@ case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition w ) override def doExecute(): RDD[InternalRow] = { - val numOutputRows = longMetric("numOutputRows") - val numInputBatches = longMetric("numInputBatches") - val evaluatorFactory = - new ColumnarToRowEvaluatorFactory( - child.output, - numOutputRows, - numInputBatches) - + val evaluatorFactory = new ColumnarToRowEvaluatorFactory( + child.output, + longMetric("numOutputRows"), + longMetric("numInputBatches")) if (conf.usePartitionEvaluator) { child.executeColumnar().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.executeColumnar().mapPartitionsInternal { batches => + child.executeColumnar().mapPartitionsWithIndexInternal { (index, batches) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, batches) + evaluator.eval(index, batches) } } } @@ -454,25 +450,20 @@ case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition { ) override def doExecuteColumnar(): RDD[ColumnarBatch] = { - val numInputRows = longMetric("numInputRows") - val numOutputBatches = longMetric("numOutputBatches") - // Instead of creating a new config we are reusing columnBatchSize. In the future if we do - // combine with some of the Arrow conversion tools we will need to unify some of the configs. - val numRows = conf.columnBatchSize - val evaluatorFactory = - new RowToColumnarEvaluatorFactory( - conf.offHeapColumnVectorEnabled, - numRows, - schema, - numInputRows, - numOutputBatches) - + val evaluatorFactory = new RowToColumnarEvaluatorFactory( + conf.offHeapColumnVectorEnabled, + // Instead of creating a new config we are reusing columnBatchSize. In the future if we do + // combine with some of the Arrow conversion tools we will need to unify some of the configs. + conf.columnBatchSize, + schema, + longMetric("numInputRows"), + longMetric("numOutputBatches")) if (conf.usePartitionEvaluator) { child.execute().mapPartitionsWithEvaluator(evaluatorFactory) } else { - child.execute().mapPartitionsInternal { rowIterator => + child.execute().mapPartitionsWithIndexInternal { (index, rowIterator) => val evaluator = evaluatorFactory.createEvaluator() - evaluator.eval(0, rowIterator) + evaluator.eval(index, rowIterator) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala index 043a3b1a7e58f..d4a871c00a14a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala @@ -279,40 +279,36 @@ class SparkSessionExtensionSuite extends SparkFunSuite with SQLHelper { } withSession(extensions) { session => session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE) - Seq(true, false).foreach { enableEvaluator => - withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enableEvaluator.toString) { - assert(session.sessionState.columnarRules.contains( - MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) - import session.sqlContext.implicits._ - // perform a join to inject a broadcast exchange - val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2") - val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2") - val data = left.join(right, $"l1" === $"r1") - // repartitioning avoids having the add operation pushed up into the LocalTableScan - .repartition(1) - val df = data.selectExpr("l2 + r2") - // execute the plan so that the final adaptive plan is available when AQE is on - df.collect() - val found = collectPlanSteps(df.queryExecution.executedPlan).sum - // 1 MyBroadcastExchangeExec - // 1 MyShuffleExchangeExec - // 1 ColumnarToRowExec - // 2 ColumnarProjectExec - // 1 ReplacedRowToColumnarExec - // so 11121 is expected. - assert(found == 11121) - - // Verify that we get back the expected, wrong, result - val result = df.collect() - assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used. - assert(result(1).getLong(0) == 201L) - assert(result(2).getLong(0) == 301L) - - withTempPath { path => - val e = intercept[Exception](df.write.parquet(path.getCanonicalPath)) - assert(e.getMessage == "columnar write") - } - } + assert(session.sessionState.columnarRules.contains( + MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))) + import session.sqlContext.implicits._ + // perform a join to inject a broadcast exchange + val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2") + val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2") + val data = left.join(right, $"l1" === $"r1") + // repartitioning avoids having the add operation pushed up into the LocalTableScan + .repartition(1) + val df = data.selectExpr("l2 + r2") + // execute the plan so that the final adaptive plan is available when AQE is on + df.collect() + val found = collectPlanSteps(df.queryExecution.executedPlan).sum + // 1 MyBroadcastExchangeExec + // 1 MyShuffleExchangeExec + // 1 ColumnarToRowExec + // 2 ColumnarProjectExec + // 1 ReplacedRowToColumnarExec + // so 11121 is expected. + assert(found == 11121) + + // Verify that we get back the expected, wrong, result + val result = df.collect() + assert(result(0).getLong(0) == 101L) // Check that broken columnar Add was used. + assert(result(1).getLong(0) == 201L) + assert(result(2).getLong(0) == 301L) + + withTempPath { path => + val e = intercept[Exception](df.write.parquet(path.getCanonicalPath)) + assert(e.getMessage == "columnar write") } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala index b97837fb97389..b14f4a405f6c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala @@ -127,22 +127,18 @@ class SparkPlanSuite extends QueryTest with SharedSparkSession { test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being (de)serialized") { withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { - Seq(true, false).foreach { enable => - withSQLConf(SQLConf.USE_PARTITION_EVALUATOR.key -> enable.toString) { - withTempPath { path => - spark.range(1).write.parquet(path.getAbsolutePath) - val df = spark.read.parquet(path.getAbsolutePath) - val columnarToRowExec = - df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get - try { - spark.range(1).foreach { _ => - columnarToRowExec.canonicalized - () - } - } catch { - case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) - } + withTempPath { path => + spark.range(1).write.parquet(path.getAbsolutePath) + val df = spark.read.parquet(path.getAbsolutePath) + val columnarToRowExec = + df.queryExecution.executedPlan.collectFirst { case p: ColumnarToRowExec => p }.get + try { + spark.range(1).foreach { _ => + columnarToRowExec.canonicalized + () } + } catch { + case e: Throwable => fail("ColumnarToRowExec was not canonicalizable", e) } } }