Skip to content

Commit

Permalink
[SPARK-32270][SQL] Use TextFileFormat in CSV's schema inference with …
Browse files Browse the repository at this point in the history
…a different encoding

### What changes were proposed in this pull request?

This PR proposes to use text datasource in CSV's schema inference. This shares the same reasons of SPARK-18362, SPARK-19885 and SPARK-19918 - we're currently using Hadoop RDD when the encoding is different, which is unnecessary. This PR completes SPARK-18362, and address the comment at #15813 (comment).

We should better keep the code paths consistent with existing CSV and JSON datasources as well, but this CSV schema inference with the encoding specified is different from UTF-8 alone.

There can be another story that this PR might lead to a bug fix: Spark session configurations, say Hadoop configurations, are not respected during CSV schema inference when the encoding is different (but it has to be set to Spark context for schema inference when the encoding is different).

### Why are the changes needed?

For consistency, potentially better performance, and fixing a potentially very corner case bug.

### Does this PR introduce _any_ user-facing change?

Virtually no.

### How was this patch tested?

Existing tests should cover.

Closes #29063 from HyukjinKwon/SPARK-32270.

Authored-by: HyukjinKwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
HyukjinKwon authored and dongjoon-hyun committed Jul 12, 2020
1 parent c56c84a commit c4b0639
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource {
inputPaths: Seq[FileStatus],
options: CSVOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
val df = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)

if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
df
} else {
val charset = options.charset
val rdd = sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(","))
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
sparkSession.createDataset(rdd)(Encoders.STRING)
sparkSession.createDataset(df.queryExecution.toRdd.map { row =>
val bytes = row.getBinary(0)
new String(bytes, 0, bytes.length, charset)
})(Encoders.STRING)
}
}
}
Expand Down

0 comments on commit c4b0639

Please sign in to comment.