Skip to content

Commit

Permalink
Undo JSON changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshRosen committed Nov 30, 2016
1 parent d688f2d commit b01a307
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val schema = userSpecifiedSchema.getOrElse {
InferSchema.infer(
sparkSession.createDataset(jsonRDD)(Encoders.STRING),
jsonRDD,
columnNameOfCorruptRecord,
parsedOptions)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.Comparator

import com.fasterxml.jackson.core._

import org.apache.spark.sql.Dataset
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
import org.apache.spark.sql.catalyst.json.JSONOptions
Expand All @@ -37,7 +37,7 @@ private[sql] object InferSchema {
* 3. Replace any remaining null fields with string, the top type
*/
def infer(
json: Dataset[String],
json: RDD[String],
columnNameOfCorruptRecord: String,
configOptions: JSONOptions): StructType = {
require(configOptions.samplingRatio > 0,
Expand All @@ -50,7 +50,7 @@ private[sql] object InferSchema {
}

// perform schema inference on each row and merge afterwards
val rootType = schemaData.rdd.mapPartitions { iter =>
val rootType = schemaData.mapPartitions { iter =>
val factory = new JsonFactory()
configOptions.setJacksonOptions(factory)
iter.flatMap { row =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import java.io.CharArrayWriter

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.mapred.{JobConf, TextInputFormat}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, Encoders, Row, SparkSession}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.json.{JacksonGenerator, JacksonParser, JSONOptions}
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.{TextFileFormat, TextOutputWriter}
import org.apache.spark.sql.execution.datasources.text.TextOutputWriter
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -52,21 +55,13 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val jsonFiles: Seq[String] = files.filterNot { status =>
val jsonFiles = files.filterNot { status =>
val name = status.getPath.getName
(name.startsWith("_") && !name.contains("=")) || name.startsWith(".")
}.map(_.getPath.toString)

val lines = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = jsonFiles,
className = classOf[TextFileFormat].getName
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
}.toArray

val jsonSchema = InferSchema.infer(
lines,
createBaseRdd(sparkSession, jsonFiles),
columnNameOfCorruptRecord,
parsedOptions)
checkConstraints(jsonSchema)
Expand Down Expand Up @@ -124,6 +119,25 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}

private def createBaseRdd(
sparkSession: SparkSession,
inputPaths: Seq[FileStatus]): RDD[String] = {
val job = Job.getInstance(sparkSession.sessionState.newHadoopConf())
val conf = job.getConfiguration

val paths = inputPaths.map(_.getPath)

if (paths.nonEmpty) {
FileInputFormat.setInputPaths(job, paths: _*)
}

sparkSession.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf],
classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text]).map(_._2.toString) // get the text line
}

/** Constraints to be imposed on schema to be stored. */
private def checkConstraints(schema: StructType): Unit = {
if (schema.fieldNames.length != schema.fieldNames.distinct.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.datasources.json

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.SparkSession

private[json] trait TestJsonData {
protected def spark: SparkSession
Expand Down Expand Up @@ -196,14 +196,14 @@ private[json] trait TestJsonData {
"""42""" ::
""" ","ian":"test"}""" :: Nil)

def emptyRecords: Dataset[String] =
spark.createDataset(
def emptyRecords: RDD[String] =
spark.sparkContext.parallelize(
"""{""" ::
"""""" ::
"""{"a": {}}""" ::
"""{"a": {"b": {}}}""" ::
"""{"b": [{"c": {}}]}""" ::
"""]""" :: Nil)(Encoders.STRING)
"""]""" :: Nil)

def timestampAsLong: RDD[String] =
spark.sparkContext.parallelize(
Expand All @@ -230,5 +230,5 @@ private[json] trait TestJsonData {

lazy val singleRow: RDD[String] = spark.sparkContext.parallelize("""{"a":123}""" :: Nil)

def empty: Dataset[String] = spark.createDataset(Seq[String]())(Encoders.STRING)
def empty: RDD[String] = spark.sparkContext.parallelize(Seq[String]())
}

0 comments on commit b01a307

Please sign in to comment.