diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 375cec597166c..cdac9d9c93925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource { inputPaths: Seq[FileStatus], options: CSVOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) + val df = sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + className = classOf[TextFileFormat].getName, + options = options.parameters + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - className = classOf[TextFileFormat].getName, - options = options.parameters - ).resolveRelation(checkFilesExist = false)) - .select("value").as[String](Encoders.STRING) + df } else { val charset = options.charset - val rdd = sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(",")) - .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) - sparkSession.createDataset(rdd)(Encoders.STRING) + sparkSession.createDataset(df.queryExecution.toRdd.map { row => + val bytes = row.getBinary(0) + new String(bytes, 0, bytes.length, charset) + })(Encoders.STRING) } } }