From c4b0639f830cb5184328473db65e17b3fd0e74fc Mon Sep 17 00:00:00 2001 From: HyukjinKwon Date: Sun, 12 Jul 2020 09:44:27 -0700 Subject: [PATCH] [SPARK-32270][SQL] Use TextFileFormat in CSV's schema inference with a different encoding ### What changes were proposed in this pull request? This PR proposes to use text datasource in CSV's schema inference. This shares the same reasons of SPARK-18362, SPARK-19885 and SPARK-19918 - we're currently using Hadoop RDD when the encoding is different, which is unnecessary. This PR completes SPARK-18362, and address the comment at https://github.com/apache/spark/pull/15813#discussion_r90751405. We should better keep the code paths consistent with existing CSV and JSON datasources as well, but this CSV schema inference with the encoding specified is different from UTF-8 alone. There can be another story that this PR might lead to a bug fix: Spark session configurations, say Hadoop configurations, are not respected during CSV schema inference when the encoding is different (but it has to be set to Spark context for schema inference when the encoding is different). ### Why are the changes needed? For consistency, potentially better performance, and fixing a potentially very corner case bug. ### Does this PR introduce _any_ user-facing change? Virtually no. ### How was this patch tested? Existing tests should cover. Closes #29063 from HyukjinKwon/SPARK-32270. Authored-by: HyukjinKwon Signed-off-by: Dongjoon Hyun --- .../datasources/csv/CSVDataSource.scala | 26 ++++++++++--------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala index 375cec597166c..cdac9d9c93925 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/CSVDataSource.scala @@ -150,21 +150,23 @@ object TextInputCSVDataSource extends CSVDataSource { inputPaths: Seq[FileStatus], options: CSVOptions): Dataset[String] = { val paths = inputPaths.map(_.getPath.toString) + val df = sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + className = classOf[TextFileFormat].getName, + options = options.parameters + ).resolveRelation(checkFilesExist = false)) + .select("value").as[String](Encoders.STRING) + if (Charset.forName(options.charset) == StandardCharsets.UTF_8) { - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - className = classOf[TextFileFormat].getName, - options = options.parameters - ).resolveRelation(checkFilesExist = false)) - .select("value").as[String](Encoders.STRING) + df } else { val charset = options.charset - val rdd = sparkSession.sparkContext - .hadoopFile[LongWritable, Text, TextInputFormat](paths.mkString(",")) - .mapPartitions(_.map(pair => new String(pair._2.getBytes, 0, pair._2.getLength, charset))) - sparkSession.createDataset(rdd)(Encoders.STRING) + sparkSession.createDataset(df.queryExecution.toRdd.map { row => + val bytes = row.getBinary(0) + new String(bytes, 0, bytes.length, charset) + })(Encoders.STRING) } } }