Skip to content

Commit

Permalink
Enable JsonRDD2 by default, add a flag to switch back to JsonRDD
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Howell committed May 1, 2015
1 parent 0bbc445 commit f636c14
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 42 deletions.
4 changes: 4 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private[spark] object SQLConf {

val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2"

val USE_JSONRDD2 = "spark.sql.json.useJsonRDD2"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -160,6 +162,8 @@ private[sql] class SQLConf extends Serializable {

private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean

private[spark] def useJsonRDD2: Boolean = getConf(USE_JSONRDD2, "true").toBoolean

/**
* Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to
* a broadcast value during the physical executions of join operations. Setting this to -1
Expand Down
34 changes: 21 additions & 13 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -615,13 +615,17 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
Option(schema).getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
if (conf.useJsonRDD2) {
baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this))
} else {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
Option(schema).getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
}
}

/**
Expand All @@ -645,12 +649,16 @@ class SQLContext(@transient val sparkContext: SparkContext)
*/
@Experimental
def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
if (conf.useJsonRDD2) {
baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this))
} else {
val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
val appliedSchema =
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
createDataFrame(rowRDD, appliedSchema, needsConversion = false)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@ import java.io.IOException
import org.apache.hadoop.fs.Path

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}


private[sql] class DefaultSource
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
extends RelationProvider
with SchemaRelationProvider
with CreatableRelationProvider {

private def checkPath(parameters: Map[String, String]): String = {
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
Expand All @@ -42,7 +44,7 @@ private[sql] class DefaultSource
val path = checkPath(parameters)
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(path, samplingRatio, None)(sqlContext)
new JSONRelation(path, samplingRatio, None, sqlContext)
}

/** Returns a new base relation with the given schema and parameters. */
Expand All @@ -53,7 +55,7 @@ private[sql] class DefaultSource
val path = checkPath(parameters)
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)

JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
new JSONRelation(path, samplingRatio, Some(schema), sqlContext)
}

override def createRelation(
Expand Down Expand Up @@ -101,32 +103,69 @@ private[sql] class DefaultSource
}
}

private[sql] case class JSONRelation(
path: String,
samplingRatio: Double,
private[sql] class JSONRelation(
baseRDD: => RDD[String],
val path: Option[String],
val samplingRatio: Double,
userSpecifiedSchema: Option[StructType])(
@transient val sqlContext: SQLContext)
extends BaseRelation
with TableScan
with InsertableRelation {

// TODO: Support partitioned JSON relation.
private def baseRDD = sqlContext.sparkContext.textFile(path)
def this(
path: String,
samplingRatio: Double,
userSpecifiedSchema: Option[StructType],
sqlContext: SQLContext) =
this(
sqlContext.sparkContext.textFile(path),
Some(path),
samplingRatio,
userSpecifiedSchema)(sqlContext)

private val useJsonRDD2: Boolean = sqlContext.conf.useJsonRDD2

override val needConversion: Boolean = false

override val schema = userSpecifiedSchema.getOrElse(
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
override lazy val schema = userSpecifiedSchema.getOrElse {
if (useJsonRDD2) {
JsonRDD2.nullTypeToStringType(
JsonRDD2.inferSchema(
baseRDD,
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord))
} else {
JsonRDD.nullTypeToStringType(
JsonRDD.inferSchema(
baseRDD,
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord))
}
}

override def buildScan(): RDD[Row] = {
if (useJsonRDD2) {
JsonRDD2.jsonStringToRow(
baseRDD,
schema,
sqlContext.conf.columnNameOfCorruptRecord)
} else {
JsonRDD.jsonStringToRow(
baseRDD,
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)))
schema,
sqlContext.conf.columnNameOfCorruptRecord)
}
}

override def buildScan(): RDD[Row] =
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)

override def insert(data: DataFrame, overwrite: Boolean): Unit = {
val filesystemPath = new Path(path)
val filesystemPath = path match {
case Some(p) => new Path(p)
case None =>
throw new IOException(s"Cannot INSERT into table with no path defined")
}

val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)

if (overwrite) {
Expand All @@ -147,7 +186,7 @@ private[sql] case class JSONRelation(
}
}
// Write the data.
data.toJSON.saveAsTextFile(path)
data.toJSON.saveAsTextFile(filesystemPath.toString)
// Right now, we assume that the schema is not changed. We will not update the schema.
// schema = data.schema
} else {
Expand Down
29 changes: 19 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -580,19 +580,19 @@ class JsonSuite extends QueryTest {
val analyzed = jsonDF.queryExecution.analyzed
assert(
analyzed.isInstanceOf[LogicalRelation],
"The DataFrame returned by jsonFile should be based on JSONRelation.")
"The DataFrame returned by jsonFile should be based on LogicalRelation.")
val relation = analyzed.asInstanceOf[LogicalRelation].relation
assert(
relation.isInstanceOf[JSONRelation],
"The DataFrame returned by jsonFile should be based on JSONRelation.")
assert(relation.asInstanceOf[JSONRelation].path === path)
assert(relation.asInstanceOf[JSONRelation].path === Some(path))
assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001))

val schema = StructType(StructField("a", LongType, true) :: Nil)
val logicalRelation =
jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation]
val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
assert(relationWithSchema.path === path)
assert(relationWithSchema.path === Some(path))
assert(relationWithSchema.schema === schema)
assert(relationWithSchema.samplingRatio > 0.99)
}
Expand Down Expand Up @@ -1034,15 +1034,24 @@ class JsonSuite extends QueryTest {
}

test("JSONRelation equality test") {
val relation1 =
JSONRelation("path", 1.0, Some(StructType(StructField("a", IntegerType, true) :: Nil)))(null)
val context = org.apache.spark.sql.test.TestSQLContext
val relation1 = new JSONRelation(
"path",
1.0,
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
context)
val logicalRelation1 = LogicalRelation(relation1)
val relation2 =
JSONRelation("path", 0.5, Some(StructType(StructField("a", IntegerType, true) :: Nil)))(
org.apache.spark.sql.test.TestSQLContext)
val relation2 = new JSONRelation(
"path",
0.5,
Some(StructType(StructField("a", IntegerType, true) :: Nil)),
context)
val logicalRelation2 = LogicalRelation(relation2)
val relation3 =
JSONRelation("path", 1.0, Some(StructType(StructField("b", StringType, true) :: Nil)))(null)
val relation3 = new JSONRelation(
"path",
1.0,
Some(StructType(StructField("b", StringType, true) :: Nil)),
context)
val logicalRelation3 = LogicalRelation(relation3)

assert(relation1 === relation2)
Expand Down

0 comments on commit f636c14

Please sign in to comment.