From f636c14ab5b8b07efff12393057bf500eaf21405 Mon Sep 17 00:00:00 2001 From: Nathan Howell Date: Wed, 29 Apr 2015 19:16:33 -0700 Subject: [PATCH] Enable JsonRDD2 by default, add a flag to switch back to JsonRDD --- .../scala/org/apache/spark/sql/SQLConf.scala | 4 + .../org/apache/spark/sql/SQLContext.scala | 34 ++++---- .../apache/spark/sql/json/JSONRelation.scala | 77 ++++++++++++++----- .../org/apache/spark/sql/json/JsonSuite.scala | 29 ++++--- 4 files changed, 102 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 2fa602a6082dc..00a485d5f745e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -67,6 +67,8 @@ private[spark] object SQLConf { val USE_SQL_SERIALIZER2 = "spark.sql.useSerializer2" + val USE_JSONRDD2 = "spark.sql.json.useJsonRDD2" + object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" } @@ -160,6 +162,8 @@ private[sql] class SQLConf extends Serializable { private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2, "true").toBoolean + private[spark] def useJsonRDD2: Boolean = getConf(USE_JSONRDD2, "true").toBoolean + /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to * a broadcast value during the physical executions of join operations. Setting this to -1 diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index bd4a55fa132fb..d99ee132f3699 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -615,13 +615,17 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], schema: StructType): DataFrame = { - val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord - val appliedSchema = - Option(schema).getOrElse( - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord))) - val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) - createDataFrame(rowRDD, appliedSchema, needsConversion = false) + if (conf.useJsonRDD2) { + baseRelationToDataFrame(new JSONRelation(json, None, 1.0, Some(schema))(this)) + } else { + val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord + val appliedSchema = + Option(schema).getOrElse( + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord))) + val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) + createDataFrame(rowRDD, appliedSchema, needsConversion = false) + } } /** @@ -645,12 +649,16 @@ class SQLContext(@transient val sparkContext: SparkContext) */ @Experimental def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = { - val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord - val appliedSchema = - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord)) - val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) - createDataFrame(rowRDD, appliedSchema, needsConversion = false) + if (conf.useJsonRDD2) { + baseRelationToDataFrame(new JSONRelation(json, None, samplingRatio, None)(this)) + } else { + val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord + val appliedSchema = + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord)) + val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord) + createDataFrame(rowRDD, appliedSchema, needsConversion = false) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index e3352d02787fd..030bfe7531a97 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -22,14 +22,16 @@ import java.io.IOException import org.apache.hadoop.fs.Path import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute, Row} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} private[sql] class DefaultSource - extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { + extends RelationProvider + with SchemaRelationProvider + with CreatableRelationProvider { private def checkPath(parameters: Map[String, String]): String = { parameters.getOrElse("path", sys.error("'path' must be specified for json data.")) @@ -42,7 +44,7 @@ private[sql] class DefaultSource val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(path, samplingRatio, None)(sqlContext) + new JSONRelation(path, samplingRatio, None, sqlContext) } /** Returns a new base relation with the given schema and parameters. */ @@ -53,7 +55,7 @@ private[sql] class DefaultSource val path = checkPath(parameters) val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) - JSONRelation(path, samplingRatio, Some(schema))(sqlContext) + new JSONRelation(path, samplingRatio, Some(schema), sqlContext) } override def createRelation( @@ -101,32 +103,69 @@ private[sql] class DefaultSource } } -private[sql] case class JSONRelation( - path: String, - samplingRatio: Double, +private[sql] class JSONRelation( + baseRDD: => RDD[String], + val path: Option[String], + val samplingRatio: Double, userSpecifiedSchema: Option[StructType])( @transient val sqlContext: SQLContext) extends BaseRelation with TableScan with InsertableRelation { - // TODO: Support partitioned JSON relation. - private def baseRDD = sqlContext.sparkContext.textFile(path) + def this( + path: String, + samplingRatio: Double, + userSpecifiedSchema: Option[StructType], + sqlContext: SQLContext) = + this( + sqlContext.sparkContext.textFile(path), + Some(path), + samplingRatio, + userSpecifiedSchema)(sqlContext) + + private val useJsonRDD2: Boolean = sqlContext.conf.useJsonRDD2 override val needConversion: Boolean = false - override val schema = userSpecifiedSchema.getOrElse( - JsonRDD.nullTypeToStringType( - JsonRDD.inferSchema( + override lazy val schema = userSpecifiedSchema.getOrElse { + if (useJsonRDD2) { + JsonRDD2.nullTypeToStringType( + JsonRDD2.inferSchema( + baseRDD, + samplingRatio, + sqlContext.conf.columnNameOfCorruptRecord)) + } else { + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema( + baseRDD, + samplingRatio, + sqlContext.conf.columnNameOfCorruptRecord)) + } + } + + override def buildScan(): RDD[Row] = { + if (useJsonRDD2) { + JsonRDD2.jsonStringToRow( + baseRDD, + schema, + sqlContext.conf.columnNameOfCorruptRecord) + } else { + JsonRDD.jsonStringToRow( baseRDD, - samplingRatio, - sqlContext.conf.columnNameOfCorruptRecord))) + schema, + sqlContext.conf.columnNameOfCorruptRecord) + } + } - override def buildScan(): RDD[Row] = - JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) override def insert(data: DataFrame, overwrite: Boolean): Unit = { - val filesystemPath = new Path(path) + val filesystemPath = path match { + case Some(p) => new Path(p) + case None => + throw new IOException(s"Cannot INSERT into table with no path defined") + } + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) if (overwrite) { @@ -147,7 +186,7 @@ private[sql] case class JSONRelation( } } // Write the data. - data.toJSON.saveAsTextFile(path) + data.toJSON.saveAsTextFile(filesystemPath.toString) // Right now, we assume that the schema is not changed. We will not update the schema. // schema = data.schema } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 5c77e34bfe729..23f00294601be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -580,19 +580,19 @@ class JsonSuite extends QueryTest { val analyzed = jsonDF.queryExecution.analyzed assert( analyzed.isInstanceOf[LogicalRelation], - "The DataFrame returned by jsonFile should be based on JSONRelation.") + "The DataFrame returned by jsonFile should be based on LogicalRelation.") val relation = analyzed.asInstanceOf[LogicalRelation].relation assert( relation.isInstanceOf[JSONRelation], "The DataFrame returned by jsonFile should be based on JSONRelation.") - assert(relation.asInstanceOf[JSONRelation].path === path) + assert(relation.asInstanceOf[JSONRelation].path === Some(path)) assert(relation.asInstanceOf[JSONRelation].samplingRatio === (0.49 +- 0.001)) val schema = StructType(StructField("a", LongType, true) :: Nil) val logicalRelation = jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation] val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation] - assert(relationWithSchema.path === path) + assert(relationWithSchema.path === Some(path)) assert(relationWithSchema.schema === schema) assert(relationWithSchema.samplingRatio > 0.99) } @@ -1034,15 +1034,24 @@ class JsonSuite extends QueryTest { } test("JSONRelation equality test") { - val relation1 = - JSONRelation("path", 1.0, Some(StructType(StructField("a", IntegerType, true) :: Nil)))(null) + val context = org.apache.spark.sql.test.TestSQLContext + val relation1 = new JSONRelation( + "path", + 1.0, + Some(StructType(StructField("a", IntegerType, true) :: Nil)), + context) val logicalRelation1 = LogicalRelation(relation1) - val relation2 = - JSONRelation("path", 0.5, Some(StructType(StructField("a", IntegerType, true) :: Nil)))( - org.apache.spark.sql.test.TestSQLContext) + val relation2 = new JSONRelation( + "path", + 0.5, + Some(StructType(StructField("a", IntegerType, true) :: Nil)), + context) val logicalRelation2 = LogicalRelation(relation2) - val relation3 = - JSONRelation("path", 1.0, Some(StructType(StructField("b", StringType, true) :: Nil)))(null) + val relation3 = new JSONRelation( + "path", + 1.0, + Some(StructType(StructField("b", StringType, true) :: Nil)), + context) val logicalRelation3 = LogicalRelation(relation3) assert(relation1 === relation2)