Skip to content

Commit

Permalink
Visualize SQL operators, not low-level Spark primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 8, 2015
1 parent 569034a commit e61b1ab
Show file tree
Hide file tree
Showing 28 changed files with 58 additions and 52 deletions.
16 changes: 7 additions & 9 deletions core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,19 @@ private[spark] object RDDOperationScope {
val scopeKey = SparkContext.RDD_SCOPE_KEY
val noOverrideKey = SparkContext.RDD_SCOPE_NO_OVERRIDE_KEY
val oldScopeJson = sc.getLocalProperty(scopeKey)
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
val oldNoOverride = sc.getLocalProperty(noOverrideKey)
try {
if (ignoreParent) {
// Ignore all parent settings and scopes and start afresh with our own root scope
sc.setLocalProperty(scopeKey, new RDDOperationScope(name).toJson)
} else {
} else if (sc.getLocalProperty(noOverrideKey) == null) {
// Otherwise, set the scope only if the higher level caller allows us to do so
if (sc.getLocalProperty(noOverrideKey) == null) {
val oldScope = Option(oldScopeJson).map(RDDOperationScope.fromJson)
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
sc.setLocalProperty(scopeKey, new RDDOperationScope(name, oldScope).toJson)
}
// Optionally disallow the child body to override our scope
if (!allowNesting) {
sc.setLocalProperty(noOverrideKey, "true")
}
body
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private[sql] case class InMemoryColumnarTableScan(

private val inMemoryPartitionPruningEnabled = sqlContext.conf.inMemoryPartitionPruning

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (enableAccumulators) {
readPartitions.setValue(0)
readBatches.setValue(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ case class Aggregate(
}
}

override def execute(): RDD[Row] = attachTree(this, "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ case class Exchange(
serializer
}

override def execute(): RDD[Row] = attachTree(this , "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this , "execute") {
newPartitioning match {
case HashPartitioning(expressions, numPartitions) =>
// TODO: Eliminate redundant expressions in grouping key and value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ private[sql] case class LogicalRDD(output: Seq[Attribute], rdd: RDD[Row])(sqlCon

/** Physical plan node for scanning data from an RDD. */
private[sql] case class PhysicalRDD(output: Seq[Attribute], rdd: RDD[Row]) extends LeafNode {
override def execute(): RDD[Row] = rdd
protected override def doExecute(): RDD[Row] = rdd
}

/** Logical plan node for scanning data from a local collection. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)

override def execute(): RDD[Row] = attachTree(this, "execute") {
protected override def doExecute(): RDD[Row] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
// TODO Move out projection objects creation and transfer to
// workers via closure. However we can't assume the Projection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ case class Generate(

val boundGenerator = BindReferences.bindReference(generator, child.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (join) {
child.execute().mapPartitions { iter =>
val nullValues = Seq.fill(generator.elementTypes.size)(Literal(null))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class GeneratedAggregate(

override def output: Seq[Attribute] = aggregateExpressions.map(_.toAttribute)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val aggregatesToCompute = aggregateExpressions.flatMap { a =>
a.collect { case agg: AggregateExpression => agg}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ private[sql] case class LocalTableScan(output: Seq[Attribute], rows: Seq[Row]) e

private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

override def execute(): RDD[Row] = rdd
protected override def doExecute(): RDD[Row] = rdd


override def executeCollect(): Array[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, trees}
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -81,12 +81,20 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/**
* Runs this query returning the result as an RDD.
*/
def execute(): RDD[Row]
final def execute(): RDD[Row] = {
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
doExecute()
}
}

/**
* Runs this query returning the result as an array.
* Runs this query returning the result as an RDD.
*/
protected def doExecute(): RDD[Row]

/**
* Runs this query returning the result as an array.
*/
def executeCollect(): Array[Row] = {
execute().mapPartitions { iter =>
val converter = CatalystTypeConverters.createToScalaConverter(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ case class Window(
}
}

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) extends

@transient lazy val buildProjection = newMutableProjection(projectList, child.output)

override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
val resuableProjection = buildProjection()
iter.map(resuableProjection)
}
Expand All @@ -54,7 +54,7 @@ case class Filter(condition: Expression, child: SparkPlan) extends UnaryNode {

@transient lazy val conditionEvaluator: (Row) => Boolean = newPredicate(condition, child.output)

override def execute(): RDD[Row] = child.execute().mapPartitions { iter =>
protected override def doExecute(): RDD[Row] = child.execute().mapPartitions { iter =>
iter.filter(conditionEvaluator)
}

Expand Down Expand Up @@ -83,7 +83,7 @@ case class Sample(
override def output: Seq[Attribute] = child.output

// TODO: How to pick seed?
override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
if (withReplacement) {
child.execute().map(_.copy()).sample(withReplacement, upperBound - lowerBound, seed)
} else {
Expand All @@ -99,7 +99,7 @@ case class Sample(
case class Union(children: Seq[SparkPlan]) extends SparkPlan {
// TODO: attributes output by union should be distinct for nullability purposes
override def output: Seq[Attribute] = children.head.output
override def execute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
protected override def doExecute(): RDD[Row] = sparkContext.union(children.map(_.execute()))
}

/**
Expand All @@ -124,7 +124,7 @@ case class Limit(limit: Int, child: SparkPlan)

override def executeCollect(): Array[Row] = child.executeTake(limit)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val rdd: RDD[_ <: Product2[Boolean, Row]] = if (sortBasedShuffleOn) {
child.execute().mapPartitions { iter =>
iter.take(limit).map(row => (false, row.copy()))
Expand Down Expand Up @@ -166,7 +166,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan)

// TODO: Terminal split should be implemented differently from non-terminal split.
// TODO: Pick num splits based on |limit|.
override def execute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)
protected override def doExecute(): RDD[Row] = sparkContext.makeRDD(collectData(), 1)

override def outputOrdering: Seq[SortOrder] = sortOrder
}
Expand All @@ -186,7 +186,7 @@ case class Sort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute(): RDD[Row] = attachTree(this, "sort") {
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
iterator.map(_.copy()).toArray.sorted(ordering).iterator
Expand Down Expand Up @@ -214,7 +214,7 @@ case class ExternalSort(
override def requiredChildDistribution: Seq[Distribution] =
if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil

override def execute(): RDD[Row] = attachTree(this, "sort") {
protected override def doExecute(): RDD[Row] = attachTree(this, "sort") {
child.execute().mapPartitions( { iterator =>
val ordering = newOrdering(sortOrder, child.output)
val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering))
Expand Down Expand Up @@ -244,7 +244,7 @@ case class Distinct(partial: Boolean, child: SparkPlan) extends UnaryNode {
override def requiredChildDistribution: Seq[Distribution] =
if (partial) UnspecifiedDistribution :: Nil else ClusteredDistribution(child.output) :: Nil

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
val hashSet = new scala.collection.mutable.HashSet[Row]()

Expand All @@ -270,7 +270,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().map(_.copy()).coalesce(numPartitions, shuffle)
}
}
Expand All @@ -285,7 +285,7 @@ case class Repartition(numPartitions: Int, shuffle: Boolean, child: SparkPlan)
case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).subtract(right.execute().map(_.copy()))
}
}
Expand All @@ -299,7 +299,7 @@ case class Except(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = children.head.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
left.execute().map(_.copy()).intersection(right.execute().map(_.copy()))
}
}
Expand All @@ -314,5 +314,5 @@ case class Intersect(left: SparkPlan, right: SparkPlan) extends BinaryNode {
case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPlan {
def children: Seq[SparkPlan] = child :: Nil

def execute(): RDD[Row] = child.execute()
protected override def doExecute(): RDD[Row] = child.execute()
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[sql] case class ExecutedCommand(cmd: RunnableCommand) extends SparkPlan

override def executeTake(limit: Int): Array[Row] = sideEffectResult.take(limit).toArray

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val converted = sideEffectResult.map(r =>
CatalystTypeConverters.convertToCatalyst(r, schema).asInstanceOf[Row])
sqlContext.sparkContext.parallelize(converted, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ package object debug {
}
}

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().mapPartitions { iter =>
new Iterator[Row] {
def hasNext: Boolean = iter.hasNext
Expand Down Expand Up @@ -193,7 +193,7 @@ package object debug {

def children: List[SparkPlan] = child :: Nil

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
child.execute().map { row =>
try typeCheck(row, child.schema) catch {
case e: Exception =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ case class BroadcastHashJoin(
sparkContext.broadcast(hashed)
}

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastRelation = Await.result(broadcastFuture, timeout)

streamedPlan.execute().mapPartitions { streamedIter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class BroadcastLeftSemiJoinHash(

override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ case class BroadcastNestedLoopJoin(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.{BinaryNode, SparkPlan}
case class CartesianProduct(left: SparkPlan, right: SparkPlan) extends BinaryNode {
override def output: Seq[Attribute] = left.output ++ right.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ case class HashOuterJoin(
hashTable
}

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val joinedRow = new JoinedRow()
left.execute().zipPartitions(right.execute()) { (leftIter, rightIter) =>
// TODO this probably can be replaced by external sort (sort merged join?)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ case class LeftSemiJoinBNL(
@transient private lazy val boundCondition =
newPredicate(condition.getOrElse(Literal(true)), left.output ++ right.output)

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val broadcastedRelation =
sparkContext.broadcast(broadcast.execute().map(_.copy()).collect().toIndexedSeq)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ case class LeftSemiJoinHash(

override def output: Seq[Attribute] = left.output

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashSet = new java.util.HashSet[Row]()
var currentRow: Row = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class ShuffledHashJoin(
override def requiredChildDistribution: Seq[ClusteredDistribution] =
ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) =>
val hashed = HashedRelation(buildIter, buildSideKeyGenerator)
hashJoin(streamIter, hashed)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ case class SortMergeJoin(
private def requiredOrders(keys: Seq[Expression]): Seq[SortOrder] =
keys.map(SortOrder(_, Ascending))

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val leftResults = left.execute().map(_.copy())
val rightResults = right.execute().map(_.copy())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:

def children: Seq[SparkPlan] = child :: Nil

def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
val childResults = child.execute().map(_.copy())

val parent = childResults.mapPartitions { iter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ private[sql] case class ParquetTableScan(
}
}.toArray

override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat

val sc = sqlContext.sparkContext
Expand Down Expand Up @@ -255,7 +255,7 @@ private[sql] case class InsertIntoParquetTable(
/**
* Inserts all rows into the Parquet file.
*/
override def execute(): RDD[Row] = {
protected override def doExecute(): RDD[Row] = {
// TODO: currently we do not check whether the "schema"s are compatible
// That means if one first creates a table and then INSERTs data with
// and incompatible schema the execution will fail. It would be nice
Expand Down
Loading

0 comments on commit e61b1ab

Please sign in to comment.