Skip to content

Commit

Permalink
[SPARK-18362][SQL] Use TextFileFormat in implementation of CSVFileFormat
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This patch significantly improves the IO / file listing performance of schema inference in Spark's built-in CSV data source.

Previously, this data source used the legacy `SparkContext.hadoopFile` and `SparkContext.hadoopRDD` methods to read files during its schema inference step, causing huge file-listing bottlenecks on the driver.

This patch refactors this logic to use Spark SQL's `text` data source to read files during this step. The text data source still performs some unnecessary file listing (since in theory we already have resolved the table prior to schema inference and therefore should be able to scan without performing _any_ extra listing), but that listing is much faster and takes place in parallel. In one production workload operating over tens of thousands of files, this change managed to reduce schema inference time from 7 minutes to 2 minutes.

A similar problem also affects the JSON file format and this patch originally fixed that as well, but I've decided to split that change into a separate patch so as not to conflict with changes in another JSON PR.

## How was this patch tested?

Existing unit tests, plus manual benchmarking on a production workload.

Author: Josh Rosen <joshrosen@databricks.com>

Closes apache#15813 from JoshRosen/use-text-data-source-in-csv-and-json.
  • Loading branch information
JoshRosen authored and Robert Kruszewski committed Dec 15, 2016
1 parent 66f1886 commit b7dd746
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ import org.apache.hadoop.mapreduce._

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.{Dataset, Encoders, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.CompressionCodecs
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.functions.{length, trim}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -52,17 +54,21 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
require(files.nonEmpty, "Cannot infer schema from an empty set of files")
val csvOptions = new CSVOptions(options)

// TODO: Move filtering.
val paths = files.filterNot(_.getPath.getName startsWith "_").map(_.getPath.toString)
val rdd = baseRdd(sparkSession, csvOptions, paths)
val firstLine = findFirstLine(csvOptions, rdd)
val lines: Dataset[String] = readText(sparkSession, csvOptions, paths)
val firstLine: String = findFirstLine(csvOptions, lines)
val firstRow = new CsvReader(csvOptions).parseLine(firstLine)
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
val header = makeSafeHeader(firstRow, csvOptions, caseSensitive)

val parsedRdd = tokenRdd(sparkSession, csvOptions, header, paths)
val parsedRdd: RDD[Array[String]] = CSVRelation.univocityTokenizer(
lines,
firstLine = if (csvOptions.headerFlag) firstLine else null,
params = csvOptions)
val schema = if (csvOptions.inferSchemaFlag) {
CSVInferSchema.infer(parsedRdd, header, csvOptions)
} else {
Expand Down Expand Up @@ -173,51 +179,37 @@ class CSVFileFormat extends TextBasedFileFormat with DataSourceRegister {
}
}

private def baseRdd(
sparkSession: SparkSession,
options: CSVOptions,
inputPaths: Seq[String]): RDD[String] = {
readText(sparkSession, options, inputPaths.mkString(","))
}

private def tokenRdd(
sparkSession: SparkSession,
options: CSVOptions,
header: Array[String],
inputPaths: Seq[String]): RDD[Array[String]] = {
val rdd = baseRdd(sparkSession, options, inputPaths)
// Make sure firstLine is materialized before sending to executors
val firstLine = if (options.headerFlag) findFirstLine(options, rdd) else null
CSVRelation.univocityTokenizer(rdd, firstLine, options)
}

/**
* Returns the first line of the first non-empty file in path
*/
private def findFirstLine(options: CSVOptions, rdd: RDD[String]): String = {
private def findFirstLine(options: CSVOptions, lines: Dataset[String]): String = {
import lines.sqlContext.implicits._
val nonEmptyLines = lines.filter(length(trim($"value")) > 0)
if (options.isCommentSet) {
val comment = options.comment.toString
rdd.filter { line =>
line.trim.nonEmpty && !line.startsWith(comment)
}.first()
nonEmptyLines.filter(!$"value".startsWith(options.comment.toString)).first()
} else {
rdd.filter { line =>
line.trim.nonEmpty
}.first()
nonEmptyLines.first()
}
}

private def readText(
sparkSession: SparkSession,
options: CSVOptions,
location: String): RDD[String] = {
inputPaths: Seq[String]): Dataset[String] = {
if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.sparkContext.textFile(location)
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = inputPaths,
className = classOf[TextFileFormat].getName
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
} else {
val charset = options.charset
sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](location)
val rdd = sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](inputPaths.mkString(","))
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
sparkSession.createDataset(rdd)(Encoders.STRING)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ import org.apache.spark.sql.types._
object CSVRelation extends Logging {

def univocityTokenizer(
file: RDD[String],
file: Dataset[String],
firstLine: String,
params: CSVOptions): RDD[Array[String]] = {
// If header is set, make sure firstLine is materialized before sending to executors.
val commentPrefix = params.comment.toString
file.mapPartitions { iter =>
file.rdd.mapPartitions { iter =>
val parser = new CsvReader(params)
val filteredIter = iter.filter { line =>
line.trim.nonEmpty && !line.startsWith(commentPrefix)
Expand Down

0 comments on commit b7dd746

Please sign in to comment.