Skip to content

Commit

Permalink
Use text file sources in CSV's schema inference for consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
HyukjinKwon committed Jul 10, 2020
1 parent e6e43cb commit 3250f33
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource {
inputPaths: Seq[FileStatus],
options: CSVOptions): Dataset[String] = {
val paths = inputPaths.map(_.getPath.toString)
val df = sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)

if (Charset.forName(options.charset) == StandardCharsets.UTF_8) {
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
paths = paths,
className = classOf[TextFileFormat].getName,
options = options.parameters
).resolveRelation(checkFilesExist = false))
.select("value").as[String](Encoders.STRING)
df
} else {
val charset = options.charset
val rdd = sparkSession.sparkContext
.hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(","))
.mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset)))
sparkSession.createDataset(rdd)(Encoders.STRING)
sparkSession.createDataset(df.queryExecution.toRdd.map { row =>
val bytes = row.getBinary(0)
new String(bytes, 0, bytes.length, charset)
})(Encoders.STRING)
}
}
}
Expand Down

0 comments on commit 3250f33

Please sign in to comment.