Skip to content

Commit

Permalink
[SPARK-26159] Codegen for LocalTableScanExec and RDDScanExec
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Implement codegen for `LocalTableScanExec` and `ExistingRDDExec`. Refactor to share code between `LocalTableScanExec`, `ExistingRDDExec`, `InputAdapter` and `RowDataSourceScanExec`.

The difference in `doProduce` between these four was that `ExistingRDDExec` and `RowDataSourceScanExec` triggered adding an `UnsafeProjection`, while `InputAdapter` and `LocalTableScanExec` did not.

In the new trait `InputRDDCodegen` I added a flag `createUnsafeProjection` which the operators set accordingly.

Note: `LocalTableScanExec` explicitly creates its input as `UnsafeRows`, so it was obvious why it doesn't need an `UnsafeProjection`. But if an `InputAdapter` may take input that is `InternalRows` but not `UnsafeRows`, then I think it doesn't need an unsafe projection just because any other operator that is its parent would do that. That assumes that that any parent operator would always result in some `UnsafeProjection` being eventually added, and hence the output of the `WholeStageCodegen` unit would be `UnsafeRows`. If these assumptions hold, I think `createUnsafeProjection` could be set to `(parent == null)`.

Note: Do not codegen `LocalTableScanExec` when it's the only operator. `LocalTableScanExec` has optimized driver-only `executeCollect` and `executeTake` code paths that are used to return `Command` results without starting Spark Jobs. They can no longer be used if the `LocalTableScanExec` gets optimized.

## How was this patch tested?

Covered and used in existing tests.

Closes apache#23127 from juliuszsompolski/SPARK-26159.

Authored-by: Juliusz Sompolski <julek@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
juliuszsompolski authored and jackylee-ch committed Feb 18, 2019
1 parent 584af02 commit 8b7a50d
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 51 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def explain(self, extended=False):
>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
*(1) Scan ExistingRDD[age#0,name#1]
>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
extends DataSourceScanExec with InputRDDCodegen {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)

Expand All @@ -104,30 +104,10 @@ case class RowDataSourceScanExec(
}
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true

override protected def doProduce(ctx: CodegenContext): String = {
val numOutputRows = metricTerm(ctx, "numOutputRows")
// PhysicalRDD always just has one input
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columnsRowInput = exprRows.map(_.genCode(ctx))
s"""
|while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${consume(ctx, columnsRowInput).trim}
| if (shouldStop()) return;
|}
""".stripMargin
}
override def inputRDD: RDD[InternalRow] = rdd

override val metadata: Map[String, String] = {
val markedFilters = for (filter <- filters) yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case class RDDScanExec(
rdd: RDD[InternalRow],
name: String,
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen {

private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")

Expand All @@ -199,4 +199,9 @@ case class RDDScanExec(
override def simpleString: String = {
s"$nodeName${truncatedString(output, "[", ",", "]")}"
}

// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true

override def inputRDD: RDD[InternalRow] = rdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow]) extends LeafExecNode {
@transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down Expand Up @@ -76,4 +76,12 @@ case class LocalTableScanExec(
longMetric("numOutputRows").add(taken.size)
taken
}

// Input is already UnsafeRows.
override protected val createUnsafeProjection: Boolean = false

// Do not codegen when there is no parent - to support the fast driver-local collect/take paths.
override def supportCodegen: Boolean = (parent != null)

override def inputRDD: RDD[InternalRow] = rdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan {
*/
def needStopCheck: Boolean = parent.needStopCheck

/**
* Helper default should stop check code.
*/
def shouldStopCheckCode: String = if (needStopCheck) {
"if (shouldStop()) return;"
} else {
"// shouldStop check is eliminated"
}

/**
* A sequence of checks which evaluate to true if the downstream Limit operators have not received
* enough records and reached the limit. If current node is a data producing node, it can leverage
Expand Down Expand Up @@ -406,14 +415,61 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
override def limitNotReachedChecks: Seq[String] = Nil
}

/**
* Leaf codegen node reading from a single RDD.
*/
trait InputRDDCodegen extends CodegenSupport {

def inputRDD: RDD[InternalRow]

// If the input can be InternalRows, an UnsafeProjection needs to be created.
protected val createUnsafeProjection: Boolean

override def inputRDDs(): Seq[RDD[InternalRow]] = {
inputRDD :: Nil
}

override def doProduce(ctx: CodegenContext): String = {
// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")

val outputVars = if (createUnsafeProjection) {
// creating the vars will make the parent consume add an unsafe projection.
ctx.INPUT_ROW = row
ctx.currentVars = null
output.zipWithIndex.map { case (a, i) =>
BoundReference(i, a.dataType, a.nullable).genCode(ctx)
}
} else {
null
}

val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) {
val numOutputRows = metricTerm(ctx, "numOutputRows")
s"$numOutputRows.add(1);"
} else {
""
}
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${updateNumOutputRowsMetrics}
| ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
| ${shouldStopCheckCode}
| }
""".stripMargin
}
}

/**
* InputAdapter is used to hide a SparkPlan from a subtree that supports codegen.
*
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen {

override def output: Seq[Attribute] = child.output

Expand All @@ -429,24 +485,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
child.doExecuteBroadcast()
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.execute() :: Nil
}
override def inputRDD: RDD[InternalRow] = child.execute()

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, null, row).trim}
| if (shouldStop()) return;
| }
""".stripMargin
}
// InputAdapter does not need UnsafeProjection.
protected val createUnsafeProjection: Boolean = false

override def generateTreeString(
depth: Int,
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/test/resources/sql-tests/results/operators.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ struct<plan:string>
-- !query 24 output
== Physical Plan ==
*Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 25
Expand All @@ -211,7 +211,7 @@ struct<plan:string>
-- !query 25 output
== Physical Plan ==
*Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 26
Expand All @@ -221,7 +221,7 @@ struct<plan:string>
-- !query 26 output
== Physical Plan ==
*Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 27
Expand All @@ -231,7 +231,7 @@ struct<plan:string>
-- !query 27 output
== Physical Plan ==
*Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 28
Expand All @@ -241,7 +241,7 @@ struct<plan:string>
-- !query 28 output
== Physical Plan ==
*Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 29
Expand All @@ -251,7 +251,7 @@ struct<plan:string>
-- !query 29 output
== Physical Plan ==
*Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 30
Expand Down

0 comments on commit 8b7a50d

Please sign in to comment.