Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26159] Codegen for LocalTableScanExec and RDDScanExec #23127

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def explain(self, extended=False):

>>> df.explain()
== Physical Plan ==
Scan ExistingRDD[age#0,name#1]
*(1) Scan ExistingRDD[age#0,name#1]

>>> df.explain(True)
== Parsed Logical Plan ==
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ case class RowDataSourceScanExec(
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
override val tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec {
extends DataSourceScanExec with InputRDDCodegen {

def output: Seq[Attribute] = requiredColumnsIndex.map(fullOutput)

Expand All @@ -104,30 +104,10 @@ case class RowDataSourceScanExec(
}
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
rdd :: Nil
}
// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true

override protected def doProduce(ctx: CodegenContext): String = {
val numOutputRows = metricTerm(ctx, "numOutputRows")
// PhysicalRDD always just has one input
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];")
val exprRows = output.zipWithIndex.map{ case (a, i) =>
BoundReference(i, a.dataType, a.nullable)
}
val row = ctx.freshName("row")
ctx.INPUT_ROW = row
ctx.currentVars = null
val columnsRowInput = exprRows.map(_.genCode(ctx))
s"""
|while ($input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| $numOutputRows.add(1);
| ${consume(ctx, columnsRowInput).trim}
| if (shouldStop()) return;
|}
""".stripMargin
}
override def inputRDD: RDD[InternalRow] = rdd

override val metadata: Map[String, String] = {
val markedFilters = for (filter <- filters) yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ case class RDDScanExec(
rdd: RDD[InternalRow],
name: String,
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen {

private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")

Expand All @@ -199,4 +199,9 @@ case class RDDScanExec(
override def simpleString: String = {
s"$nodeName${truncatedString(output, "[", ",", "]")}"
}

// Input can be InternalRow, has to be turned into UnsafeRows.
override protected val createUnsafeProjection: Boolean = true

override def inputRDD: RDD[InternalRow] = rdd
juliuszsompolski marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics
*/
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow]) extends LeafExecNode {
@transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down Expand Up @@ -76,4 +76,12 @@ case class LocalTableScanExec(
longMetric("numOutputRows").add(taken.size)
taken
}

// Input is already UnsafeRows.
override protected val createUnsafeProjection: Boolean = false

// Do not codegen when there is no parent - to support the fast driver-local collect/take paths.
override def supportCodegen: Boolean = (parent != null)

override def inputRDD: RDD[InternalRow] = rdd
}
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,15 @@ trait CodegenSupport extends SparkPlan {
*/
def needStopCheck: Boolean = parent.needStopCheck

/**
* Helper default should stop check code.
*/
def shouldStopCheckCode: String = if (needStopCheck) {
Copy link
Contributor

@cloud-fan cloud-fan Nov 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use it in more places. This can be done in folllowup.

"if (shouldStop()) return;"
} else {
"// shouldStop check is eliminated"
}

/**
* A sequence of checks which evaluate to true if the downstream Limit operators have not received
* enough records and reached the limit. If current node is a data producing node, it can leverage
Expand Down Expand Up @@ -406,14 +415,61 @@ trait BlockingOperatorWithCodegen extends CodegenSupport {
override def limitNotReachedChecks: Seq[String] = Nil
}

/**
* Leaf codegen node reading from a single RDD.
*/
trait InputRDDCodegen extends CodegenSupport {
juliuszsompolski marked this conversation as resolved.
Show resolved Hide resolved

def inputRDD: RDD[InternalRow]

// If the input can be InternalRows, an UnsafeProjection needs to be created.
protected val createUnsafeProjection: Boolean

override def inputRDDs(): Seq[RDD[InternalRow]] = {
inputRDD :: Nil
}

override def doProduce(ctx: CodegenContext): String = {
juliuszsompolski marked this conversation as resolved.
Show resolved Hide resolved
// Inline mutable state since an InputRDDCodegen is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")

val outputVars = if (createUnsafeProjection) {
// creating the vars will make the parent consume add an unsafe projection.
ctx.INPUT_ROW = row
ctx.currentVars = null
output.zipWithIndex.map { case (a, i) =>
BoundReference(i, a.dataType, a.nullable).genCode(ctx)
}
} else {
null
}

val updateNumOutputRowsMetrics = if (metrics.contains("numOutputRows")) {
val numOutputRows = metricTerm(ctx, "numOutputRows")
s"$numOutputRows.add(1);"
} else {
""
}
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${updateNumOutputRowsMetrics}
| ${consume(ctx, outputVars, if (createUnsafeProjection) null else row).trim}
| ${shouldStopCheckCode}
| }
""".stripMargin
}
}

/**
* InputAdapter is used to hide a SparkPlan from a subtree that supports codegen.
*
* This is the leaf node of a tree with WholeStageCodegen that is used to generate code
* that consumes an RDD iterator of InternalRow.
*/
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupport {
case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCodegen {

override def output: Seq[Attribute] = child.output

Expand All @@ -429,24 +485,10 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with CodegenSupp
child.doExecuteBroadcast()
}

override def inputRDDs(): Seq[RDD[InternalRow]] = {
child.execute() :: Nil
}
override def inputRDD: RDD[InternalRow] = child.execute()

override def doProduce(ctx: CodegenContext): String = {
// Right now, InputAdapter is only used when there is one input RDD.
// Inline mutable state since an InputAdapter is used once in a task for WholeStageCodegen
val input = ctx.addMutableState("scala.collection.Iterator", "input", v => s"$v = inputs[0];",
forceInline = true)
val row = ctx.freshName("row")
s"""
| while ($limitNotReachedCond $input.hasNext()) {
| InternalRow $row = (InternalRow) $input.next();
| ${consume(ctx, null, row).trim}
| if (shouldStop()) return;
| }
""".stripMargin
}
// InputAdapter does not need UnsafeProjection.
protected val createUnsafeProjection: Boolean = false

override def generateTreeString(
depth: Int,
Expand Down
12 changes: 6 additions & 6 deletions sql/core/src/test/resources/sql-tests/results/operators.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ struct<plan:string>
-- !query 24 output
== Physical Plan ==
*Project [null AS (CAST(concat(a, CAST(1 AS STRING)) AS DOUBLE) + CAST(2 AS DOUBLE))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 25
Expand All @@ -211,7 +211,7 @@ struct<plan:string>
-- !query 25 output
== Physical Plan ==
*Project [-1b AS concat(CAST((1 - 2) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 26
Expand All @@ -221,7 +221,7 @@ struct<plan:string>
-- !query 26 output
== Physical Plan ==
*Project [11b AS concat(CAST(((2 * 4) + 3) AS STRING), b)#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 27
Expand All @@ -231,7 +231,7 @@ struct<plan:string>
-- !query 27 output
== Physical Plan ==
*Project [4a2.0 AS concat(concat(CAST((3 + 1) AS STRING), a), CAST((CAST(4 AS DOUBLE) / CAST(2 AS DOUBLE)) AS STRING))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 28
Expand All @@ -241,7 +241,7 @@ struct<plan:string>
-- !query 28 output
== Physical Plan ==
*Project [true AS ((1 = 1) OR (concat(a, b) = ab))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 29
Expand All @@ -251,7 +251,7 @@ struct<plan:string>
-- !query 29 output
== Physical Plan ==
*Project [false AS ((concat(a, c) = ac) AND (2 = 3))#x]
+- Scan OneRowRelation[]
+- *Scan OneRowRelation[]


-- !query 30
Expand Down