Skip to content

Commit

Permalink
Added eager analysis for error reporting.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Feb 2, 2015
1 parent e6f00b8 commit b932e86
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType

private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
new DataFrameImpl(sqlContext, logicalPlan)
new DataFrameImpl(sqlContext, sqlContext.executePlan(logicalPlan))
}
}

Expand Down
18 changes: 12 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,28 @@ import org.apache.spark.util.Utils


/**
* Implementation for [[DataFrame]]. Refer to [[DataFrame]] directly for documentation.
* See [[DataFrame]] for documentation.
*/
class DataFrameImpl protected[sql](
private[sql] class DataFrameImpl protected[sql](
override val sqlContext: SQLContext,
private val baseLogicalPlan: LogicalPlan)
val queryExecution: SQLContext#QueryExecution)
extends DataFrame {

@transient override lazy val queryExecution = sqlContext.executePlan(baseLogicalPlan)
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan) = {
this(sqlContext, {
val qe = sqlContext.executePlan(logicalPlan)
qe.analyzed // This should force analysis and throw errors if there are any
qe
})
}

@transient protected[sql] override val logicalPlan: LogicalPlan = baseLogicalPlan match {
@transient protected[sql] override val logicalPlan: LogicalPlan = queryExecution.logical match {
// For various commands (like DDL) and queries with side effects, we force query optimization to
// happen right away to let these side effects take place eagerly.
case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
case _ =>
baseLogicalPlan
queryExecution.logical
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ class ColumnExpressionSuite extends QueryTest {

test("computability check") {
def shouldBeComputable(c: Column): Unit = assert(c.isComputable === true)
def shouldNotBeComputable(c: Column): Unit = assert(c.isComputable === false)

def shouldNotBeComputable(c: Column): Unit = {
assert(c.isComputable === false)
intercept[UnsupportedOperationException] { c.head() }
}

shouldBeComputable(testData2("a"))
shouldBeComputable(testData2("b"))
Expand Down
15 changes: 15 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,21 @@ import scala.language.postfixOps
class DataFrameSuite extends QueryTest {
import org.apache.spark.sql.TestData._

test("analysis error should be eagerly reported") {
intercept[Exception] { testData.select('nonExistentName) }
intercept[Exception] {
testData.groupBy('key).agg(Map("nonExistentName" -> "sum"))
}
intercept[Exception] {
testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
}

// Uncomment the following once we report the errors properly.
// intercept[Exception] {
// testData.groupBy("nonExistentName").agg(Map("key" -> "sum"))
// }
}

test("table scan") {
checkAnswer(
testData,
Expand Down

0 comments on commit b932e86

Please sign in to comment.