From 3250f33ce734091817b25791c03d9d40490fc3e8 Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Fri, 10 Jul 2020 18:08:22 +0900 Subject: [PATCH] Use text file sources in CSV's schema inference for consistency --- .../datasources/csv/CSVDataSource.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 375cec597166c..cdac9d9c93925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource { inputPaths: Seq[FileStatus], options: CSVOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) + val df = sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + className = classOf[TextFileFormat].getName, + options = options.parameters + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - className = classOf[TextFileFormat].getName, - options = options.parameters - ).resolveRelation(checkFilesExist = false)) - .select("value").as[String](Encoders.STRING) + df } else { val charset = options.charset - val rdd = sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(",")) - .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) - sparkSession.createDataset(rdd)(Encoders.STRING) + sparkSession.createDataset(df.queryExecution.toRdd.map { row => + val bytes = row.getBinary(0) + new String(bytes, 0, bytes.length, charset) + })(Encoders.STRING) } } }