From 23c2d9111f1cff9059746bb7b48bb8ef7ad7027b Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 13 Nov 2018 10:19:09 +0100 Subject: [PATCH 1/9] localtablescanexec codegen --- .../spark/sql/execution/ExistingRDD.scala | 4 +- .../sql/execution/LocalTableScanExec.scala | 4 +- .../sql/execution/WholeStageCodegenExec.scala | 55 ++++++++++++------- .../execution/metric/SQLMetricsSuite.scala | 28 +++++----- 4 files changed, 57 insertions(+), 34 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 9f67d556af362..4e41d313a66fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -175,7 +175,7 @@ case class RDDScanExec( rdd: RDD[InternalRow], name: String, override val outputPartitioning: Partitioning = UnknownPartitioning(0), - override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { + override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen { private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("") @@ -199,4 +199,6 @@ case class RDDScanExec( override def simpleString: String = { s"$nodeName${truncatedString(output, "[", ",", "]")}" } + + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 448eb703eacde..0a732ecaa44aa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics */ case class LocalTableScanExec( output: Seq[Attribute], - @transient rows: Seq[InternalRow]) extends LeafExecNode { + @transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen { override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -76,4 +76,6 @@ case class LocalTableScanExec( longMetric("numOutputRows").add(taken.size) taken } + + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 29bcbcae366c5..7cf25651917f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan { */ def needStopCheck: Boolean = parent.needStopCheck + /** + * Helper default should stop check code. + */ + def shouldStopCheckCode: String = if (needStopCheck) { + "if (shouldStop()) return;" + } else { + "// shouldStop check is eliminated" + } + /** * A sequence of checks which evaluate to true if the downstream Limit operators have not received * enough records and reached the limit. If current node is a data producing node, it can leverage @@ -406,6 +415,31 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { override def limitNotReachedChecks: Seq[String] = Nil } +/** + * Leaf codegen node reading from a single RDD. + */ +trait InputRDDCodegen extends CodegenSupport { + + def inputRDD: RDD[InternalRow] + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + inputRDD :: Nil + } + + override def doProduce(ctx: CodegenContext): String = { + // Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen + val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", + forceInline = true) + val row = ctx.freshName("row") + s""" + | while ($limitNotReachedCond $input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | ${consume(ctx, null, row).trim} + | ${shouldStopCheckCode} + | } + """.stripMargin + } +} /** * InputAdapter is used to hide a SparkPlan from a subtree that supports codegen. @@ -413,7 +447,7 @@ trait BlockingOperatorWithCodegen extends CodegenSupport { * This is the leaf node of a tree with WholeStageCodegen that is used to generate code * that consumes an RDD iterator of InternalRow. */ -case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport { +case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen { override def output: Seq[Attribute] = child.output @@ -429,24 +463,7 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp child.doExecuteBroadcast() } - override def inputRDDs(): Seq[RDD[InternalRow]] = { - child.execute() :: Nil - } - - override def doProduce(ctx: CodegenContext): String = { - // Right now, InputAdapter is only used when there is one input RDD. - // Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen - val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", - forceInline = true) - val row = ctx.freshName("row") - s""" - | while ($limitNotReachedCond $input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | ${consume(ctx, null, row).trim} - | if (shouldStop()) return; - | } - """.stripMargin - } + override def inputRDD: RDD[InternalRow] = child.execute() override def generateTreeString( depth: Int, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index b955c157a620e..dec11fbb6efd3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -51,19 +51,21 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("LocalTableScanExec computes metrics in collect and take") { - val df1 = spark.createDataset(Seq(1, 2, 3)) - val logical = df1.queryExecution.logical - require(logical.isInstanceOf[LocalRelation]) - df1.collect() - val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics - assert(metrics1.contains("numOutputRows")) - assert(metrics1("numOutputRows").value === 3) - - val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2) - df2.collect() - val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics - assert(metrics2.contains("numOutputRows")) - assert(metrics2("numOutputRows").value === 2) + withSQLConf("spark.sql.codegen.wholeStage" -> "false") { + val df1 = spark.createDataset(Seq(1, 2, 3)) + val logical = df1.queryExecution.logical + require(logical.isInstanceOf[LocalRelation]) + df1.collect() + val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics + assert(metrics1.contains("numOutputRows")) + assert(metrics1("numOutputRows").value === 3) + + val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2) + df2.collect() + val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics + assert(metrics2.contains("numOutputRows")) + assert(metrics2("numOutputRows").value === 2) + } } test("Filter metrics") { From ea93f10346204591a64ef92c8c757ae09841f220 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 26 Nov 2018 13:08:09 +0100 Subject: [PATCH 2/9] add RowDataSourceScanExec --- .../sql/execution/DataSourceScanExec.scala | 27 ++----------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 77e381ef6e6b4..4a5fe034d8870 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -84,7 +84,7 @@ case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val tableIdentifier: Option[TableIdentifier]) - extends DataSourceScanExec { + extends DataSourceScanExec with InputRDDCodegen { def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput) @@ -104,30 +104,7 @@ case class RowDataSourceScanExec( } } - override def inputRDDs(): Seq[RDD[InternalRow]] = { - rdd :: Nil - } - - override protected def doProduce(ctx: CodegenContext): String = { - val numOutputRows = metricTerm(ctx, "numOutputRows") - // PhysicalRDD always just has one input - val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];") - val exprRows = output.zipWithIndex.map{ case (a, i) => - BoundReference(i, a.dataType, a.nullable) - } - val row = ctx.freshName("row") - ctx.INPUT_ROW = row - ctx.currentVars = null - val columnsRowInput = exprRows.map(_.genCode(ctx)) - s""" - |while ($input.hasNext()) { - | InternalRow $row = (InternalRow) $input.next(); - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput).trim} - | if (shouldStop()) return; - |} - """.stripMargin - } + override def inputRDD: RDD[InternalRow] = rdd override val metadata: Map[String, String] = { val markedFilters = for (filter <- filters) yield { From 170073efd4d752a436bc18c4c0d2c0fc755d2d37 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 26 Nov 2018 16:01:06 +0100 Subject: [PATCH 3/9] fix --- .../sql/execution/LocalTableScanExec.scala | 3 +++ .../sql/execution/WholeStageCodegenExec.scala | 25 ++++++++++++++++++- 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 0a732ecaa44aa..2988730dd6db9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -77,5 +77,8 @@ case class LocalTableScanExec( taken } + // Does not need to create an UnsafeProjection - input is already always UnsafeRows + override protected val createUnsafeProjection: Boolean = false + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 7cf25651917f6..86ce2988a52e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -422,6 +422,10 @@ trait InputRDDCodegen extends CodegenSupport { def inputRDD: RDD[InternalRow] + // If the input is an RDD of InternalRow which are potentially not UnsafeRow, + // and there is no parent to consume it, it needs an UnsafeProjection. + protected val createUnsafeProjection: Boolean = (parent == null) + override def inputRDDs(): Seq[RDD[InternalRow]] = { inputRDD :: Nil } @@ -431,10 +435,29 @@ trait InputRDDCodegen extends CodegenSupport { val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];", forceInline = true) val row = ctx.freshName("row") + + val outputVars = if (createUnsafeProjection) { + // creating the vars will make the parent consume add an unsafe projection. + ctx.INPUT_ROW = row + ctx.currentVars = null + output.zipWithIndex.map { case (a, i) => + BoundReference(i, a.dataType, a.nullable).genCode(ctx) + } + } else { + null + } + + val numOutputRowsCode = if (metrics.contains("numOutputRows")) { + val numOutputRows = metricTerm(ctx, "numOutputRows") + s"$numOutputRows.add(1);" + } else { + "" + } s""" | while ($limitNotReachedCond $input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); - | ${consume(ctx, null, row).trim} + | ${numOutputRowsCode} + | ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim} | ${shouldStopCheckCode} | } """.stripMargin From 2868b118178926609e7776f176633f06c90af964 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 26 Nov 2018 20:28:48 +0100 Subject: [PATCH 4/9] updateNumOutputRowsMetrics --- .../apache/spark/sql/execution/WholeStageCodegenExec.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 86ce2988a52e4..7d8b9e1957c83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -447,7 +447,7 @@ trait InputRDDCodegen extends CodegenSupport { null } - val numOutputRowsCode = if (metrics.contains("numOutputRows")) { + val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) { val numOutputRows = metricTerm(ctx, "numOutputRows") s"$numOutputRows.add(1);" } else { @@ -456,7 +456,7 @@ trait InputRDDCodegen extends CodegenSupport { s""" | while ($limitNotReachedCond $input.hasNext()) { | InternalRow $row = (InternalRow) $input.next(); - | ${numOutputRowsCode} + | ${updateNumOutputRowsMetrics} | ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim} | ${shouldStopCheckCode} | } From fce0512a58accff4824cb75fc32971050a172dd6 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 26 Nov 2018 20:44:41 +0100 Subject: [PATCH 5/9] update tests --- .../resources/sql-tests/results/group-by.sql.out | 2 +- .../resources/sql-tests/results/inline-table.sql.out | 4 ++-- .../resources/sql-tests/results/operators.sql.out | 12 ++++++------ 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index 9a8d025331b67..d8448b3a9552c 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -486,4 +486,4 @@ Aggregate [k#x], [k#x, min(v#x) AS every(v)#x, max(v#x) AS some(v)#x, max(v#x) A *HashAggregate(keys=[k#x], functions=[min(v#x), max(v#x)], output=[k#x, every(v)#x, some(v)#x, any(v)#x]) +- Exchange hashpartitioning(k#x, 200) +- *HashAggregate(keys=[k#x], functions=[partial_min(v#x), partial_max(v#x)], output=[k#x, min#x, max#x]) - +- LocalTableScan [k#x, v#x] + +- *LocalTableScan [k#x, v#x] diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index c065ce5012929..f76e134863e11 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -178,6 +178,6 @@ Join Cross == Physical Plan == BroadcastNestedLoopJoin BuildRight, Cross -:- LocalTableScan [col1#x, col2#x] +:- *LocalTableScan [col1#x, col2#x] +- BroadcastExchange IdentityBroadcastMode - +- LocalTableScan [col1#x, col2#x] + +- *LocalTableScan [col1#x, col2#x] diff --git a/sql/core/src/test/resources/sql-tests/results/operators.sql.out b/sql/core/src/test/resources/sql-tests/results/operators.sql.out index fd1d0db9e3f78..570b281353f3d 100644 --- a/sql/core/src/test/resources/sql-tests/results/operators.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/operators.sql.out @@ -201,7 +201,7 @@ struct -- !query 24 output == Physical Plan == *Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 25 @@ -211,7 +211,7 @@ struct -- !query 25 output == Physical Plan == *Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 26 @@ -221,7 +221,7 @@ struct -- !query 26 output == Physical Plan == *Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 27 @@ -231,7 +231,7 @@ struct -- !query 27 output == Physical Plan == *Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 28 @@ -241,7 +241,7 @@ struct -- !query 28 output == Physical Plan == *Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 29 @@ -251,7 +251,7 @@ struct -- !query 29 output == Physical Plan == *Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x] -+- Scan OneRowRelation[] ++- *Scan OneRowRelation[] -- !query 30 From 5d94c4274d468f3873db0827c88e1461f0125b58 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Mon, 26 Nov 2018 20:49:59 +0100 Subject: [PATCH 6/9] don't codegen LocalTableScanExec when it's all alone --- .../sql/execution/LocalTableScanExec.scala | 3 ++ .../execution/metric/SQLMetricsSuite.scala | 28 +++++++++---------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 2988730dd6db9..01ed271e5397b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -80,5 +80,8 @@ case class LocalTableScanExec( // Does not need to create an UnsafeProjection - input is already always UnsafeRows override protected val createUnsafeProjection: Boolean = false + // Do not codegen when there is no parent - to support the fast driver-local collect/take paths. + override def supportCodegen: Boolean = (parent != null) + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala index dec11fbb6efd3..b955c157a620e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala @@ -51,21 +51,19 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared } test("LocalTableScanExec computes metrics in collect and take") { - withSQLConf("spark.sql.codegen.wholeStage" -> "false") { - val df1 = spark.createDataset(Seq(1, 2, 3)) - val logical = df1.queryExecution.logical - require(logical.isInstanceOf[LocalRelation]) - df1.collect() - val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics - assert(metrics1.contains("numOutputRows")) - assert(metrics1("numOutputRows").value === 3) - - val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2) - df2.collect() - val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics - assert(metrics2.contains("numOutputRows")) - assert(metrics2("numOutputRows").value === 2) - } + val df1 = spark.createDataset(Seq(1, 2, 3)) + val logical = df1.queryExecution.logical + require(logical.isInstanceOf[LocalRelation]) + df1.collect() + val metrics1 = df1.queryExecution.executedPlan.collectLeaves().head.metrics + assert(metrics1.contains("numOutputRows")) + assert(metrics1("numOutputRows").value === 3) + + val df2 = spark.createDataset(Seq(1, 2, 3)).limit(2) + df2.collect() + val metrics2 = df2.queryExecution.executedPlan.collectLeaves().head.metrics + assert(metrics2.contains("numOutputRows")) + assert(metrics2("numOutputRows").value === 2) } test("Filter metrics") { From c345e2fa4283ab97bafeef07b7bd260cece48983 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 27 Nov 2018 11:15:32 +0100 Subject: [PATCH 7/9] revert test changes --- .../src/test/resources/sql-tests/results/group-by.sql.out | 2 +- .../src/test/resources/sql-tests/results/inline-table.sql.out | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out index d8448b3a9552c..9a8d025331b67 100644 --- a/sql/core/src/test/resources/sql-tests/results/group-by.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/group-by.sql.out @@ -486,4 +486,4 @@ Aggregate [k#x], [k#x, min(v#x) AS every(v)#x, max(v#x) AS some(v)#x, max(v#x) A *HashAggregate(keys=[k#x], functions=[min(v#x), max(v#x)], output=[k#x, every(v)#x, some(v)#x, any(v)#x]) +- Exchange hashpartitioning(k#x, 200) +- *HashAggregate(keys=[k#x], functions=[partial_min(v#x), partial_max(v#x)], output=[k#x, min#x, max#x]) - +- *LocalTableScan [k#x, v#x] + +- LocalTableScan [k#x, v#x] diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index f76e134863e11..c065ce5012929 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -178,6 +178,6 @@ Join Cross == Physical Plan == BroadcastNestedLoopJoin BuildRight, Cross -:- *LocalTableScan [col1#x, col2#x] +:- LocalTableScan [col1#x, col2#x] +- BroadcastExchange IdentityBroadcastMode - +- *LocalTableScan [col1#x, col2#x] + +- LocalTableScan [col1#x, col2#x] From bdb71d76828ba00c8f86685401aa3e558e2351d4 Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 27 Nov 2018 11:39:54 +0100 Subject: [PATCH 8/9] always do UnsafeProjection in RDDScanExec and RowDataSourceScanExec --- .../apache/spark/sql/execution/DataSourceScanExec.scala | 3 +++ .../org/apache/spark/sql/execution/ExistingRDD.scala | 3 +++ .../apache/spark/sql/execution/LocalTableScanExec.scala | 2 +- .../spark/sql/execution/WholeStageCodegenExec.scala | 8 +++++--- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 4a5fe034d8870..4faa27c2c1e23 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -104,6 +104,9 @@ case class RowDataSourceScanExec( } } + // Input can be InternalRow, has to be turned into UnsafeRows. + override protected val createUnsafeProjection: Boolean = true + override def inputRDD: RDD[InternalRow] = rdd override val metadata: Map[String, String] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 4e41d313a66fe..e214bfd050410 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -200,5 +200,8 @@ case class RDDScanExec( s"$nodeName${truncatedString(output, "[", ",", "]")}" } + // Input can be InternalRow, has to be turned into UnsafeRows. + override protected val createUnsafeProjection: Boolean = true + override def inputRDD: RDD[InternalRow] = rdd } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala index 01ed271e5397b..31640db3722ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/LocalTableScanExec.scala @@ -77,7 +77,7 @@ case class LocalTableScanExec( taken } - // Does not need to create an UnsafeProjection - input is already always UnsafeRows + // Input is already UnsafeRows. override protected val createUnsafeProjection: Boolean = false // Do not codegen when there is no parent - to support the fast driver-local collect/take paths. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index 7d8b9e1957c83..fbda0d87a175f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -422,9 +422,8 @@ trait InputRDDCodegen extends CodegenSupport { def inputRDD: RDD[InternalRow] - // If the input is an RDD of InternalRow which are potentially not UnsafeRow, - // and there is no parent to consume it, it needs an UnsafeProjection. - protected val createUnsafeProjection: Boolean = (parent == null) + // If the input can be InternalRows, an UnsafeProjection needs to be created. + protected val createUnsafeProjection: Boolean override def inputRDDs(): Seq[RDD[InternalRow]] = { inputRDD :: Nil @@ -488,6 +487,9 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod override def inputRDD: RDD[InternalRow] = child.execute() + // InputAdapter does not need UnsafeProjection. + protected val createUnsafeProjection: Boolean = false + override def generateTreeString( depth: Int, lastChildren: Seq[Boolean], From 3668f97b8848d68d3c85a2478783f7b33d2134cc Mon Sep 17 00:00:00 2001 From: Juliusz Sompolski Date: Tue, 27 Nov 2018 21:10:01 +0100 Subject: [PATCH 9/9] fix python test --- python/pyspark/sql/dataframe.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index c4f4d81999544..97d63fa20a886 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -257,7 +257,7 @@ def explain(self, extended=False): >>> df.explain() == Physical Plan == - Scan ExistingRDD[age#0,name#1] + *(1) Scan ExistingRDD[age#0,name#1] >>> df.explain(True) == Parsed Logical Plan ==