From ce1f97fe7521645f22504f42e1b5df08749cfaac Mon Sep 17 00:00:00 2001 From: Ivan Sadikov Date: Wed, 1 Dec 2021 10:14:32 +0300 Subject: [PATCH] [SPARK-37326][SQL] Support TimestampNTZ in CSV data source ### What changes were proposed in this pull request? This PR adds support for TimestampNTZ type in the CSV data source. Most of the functionality has already been added, this patch verifies that writes + reads work for TimestampNTZ type and adds schema inference depending on the timestamp value format written. The following applies: - If there is a mixture of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values, use `TIMESTAMP_LTZ`. - If there are only `TIMESTAMP_NTZ` values, resolve using the the default timestamp type configured with `spark.sql.timestampType`. In addition, I introduced a new CSV option `timestampNTZFormat` which is similar to `timestampFormat` but it allows to configure read/write pattern for `TIMESTAMP_NTZ` types. It is basically a copy of timestamp pattern but without timezone. The schema inference works in the following way: 1. We test if the field can be parsed a timestamp without timezone using timestampNTZFormat. 2. If the field has the timezone component, `parseWithoutTimeZone` method throws `QueryExecutionErrors.cannotParseStringAsDataTypeError` which is a `RuntimeException`. 3. Move on to parsing the field as timestamp with timezone (the existing logic). ### Why are the changes needed? The current CSV source could write values as TimestampNTZ into a file but could not preserve this type when reading the file back, this PR fixes the issue. ### Does this PR introduce _any_ user-facing change? Previously, CSV data source would infer timestamp values as `TimestampType` when reading a CSV file. Now, the data source would infer the timestamp value type based on the format (with or without timezone) and default timestamp type based on `spark.sql.timestampType`. A new CSV option `timestampNTZFormat` is added to control the way values are formatted during writes or parsed during reads. Now if the timestamp cannot be parsed as a timestamp without timezone, e.g. contains the zone-offset or zone-id component, `parseWithTimeZone` throws `RuntimeException` signalling the inference code to try the next type. ### How was this patch tested? I extended `CSVSuite` with a few unit tests to verify that write-read roundtrip works for `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values. Closes #34596 from sadikovi/timestamp-ntz-support-csv. Authored-by: Ivan Sadikov Signed-off-by: Max Gekk --- docs/sql-data-sources-csv.md | 12 +- .../sql/catalyst/csv/CSVInferSchema.scala | 24 ++ .../spark/sql/catalyst/csv/CSVOptions.scala | 4 + .../sql/catalyst/csv/UnivocityGenerator.scala | 2 +- .../sql/catalyst/csv/UnivocityParser.scala | 4 +- .../sql/catalyst/util/DateTimeUtils.scala | 32 ++- .../catalyst/util/TimestampFormatter.scala | 36 ++- .../sql/errors/QueryExecutionErrors.scala | 8 +- .../catalyst/util/DateTimeUtilsSuite.scala | 12 + .../apache/spark/sql/CsvFunctionsSuite.scala | 11 + .../execution/datasources/csv/CSVSuite.scala | 216 +++++++++++++++++- 11 files changed, 331 insertions(+), 30 deletions(-) diff --git a/docs/sql-data-sources-csv.md b/docs/sql-data-sources-csv.md index 82cfa352a5751..1dfe8568f9afb 100644 --- a/docs/sql-data-sources-csv.md +++ b/docs/sql-data-sources-csv.md @@ -9,9 +9,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,7 +19,7 @@ license: | limitations under the License. --- -Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on. +Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.
@@ -162,6 +162,12 @@ Data source options of CSV can be set via: Sets the string that indicates a timestamp format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp type. read/write + + timestampNTZFormat + yyyy-MM-dd'T'HH:mm:ss[.SSS] + Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. + read/write + maxColumns 20480 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala index 696d25f8ed484..b4ec1645ed35b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVInferSchema.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT import org.apache.spark.sql.catalyst.util.TimestampFormatter import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ class CSVInferSchema(val options: CSVOptions) extends Serializable { @@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { legacyFormat = FAST_DATE_FORMAT, isParsing = true) + private val timestampNTZFormatter = TimestampFormatter( + options.timestampNTZFormatInRead, + options.zoneId, + legacyFormat = FAST_DATE_FORMAT, + isParsing = true, + forTimestampNTZ = true) + private val decimalParser = if (options.locale == Locale.US) { // Special handling the default locale for backward compatibility s: String => new java.math.BigDecimal(s) @@ -109,6 +117,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { case LongType => tryParseLong(field) case _: DecimalType => tryParseDecimal(field) case DoubleType => tryParseDouble(field) + case TimestampNTZType => tryParseTimestampNTZ(field) case TimestampType => tryParseTimestamp(field) case BooleanType => tryParseBoolean(field) case StringType => StringType @@ -160,6 +169,17 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { private def tryParseDouble(field: String): DataType = { if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) { DoubleType + } else { + tryParseTimestampNTZ(field) + } + } + + private def tryParseTimestampNTZ(field: String): DataType = { + // We can only parse the value as TimestampNTZType if it does not have zone-offset or + // time-zone component and can be parsed with the timestamp formatter. + // Otherwise, it is likely to be a timestamp with timezone. + if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) { + SQLConf.get.timestampType } else { tryParseTimestamp(field) } @@ -225,6 +245,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable { } else { Some(DecimalType(range + scale, scale)) } + + case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) => + Some(TimestampType) + case _ => None } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index 79624b9a608a4..2a404b14bfd89 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -164,6 +164,10 @@ class CSVOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat") + val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]") + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) val maxColumns = getInt("maxColumns", 20480) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala index 8a04e4ca56c5d..10cccd57117e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityGenerator.scala @@ -49,7 +49,7 @@ class UnivocityGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInWrite, + options.timestampNTZFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala index cd5621bbb7856..eb827aea73f0b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/UnivocityParser.scala @@ -94,7 +94,7 @@ class UnivocityParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInRead, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, @@ -204,7 +204,7 @@ class UnivocityParser( case _: TimestampNTZType => (d: String) => nullSafeDatum(d, name, nullable, options) { datum => - timestampNTZFormatter.parseWithoutTimeZone(datum) + timestampNTZFormatter.parseWithoutTimeZone(datum, true) } case _: DateType => (d: String) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 3d9598cd0c23a..ebe5153099f43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -442,17 +442,22 @@ object DateTimeUtils { /** * Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the - * number of microseconds since the epoch. The result is independent of time zones, - * which means that zone ID in the input string will be ignored. + * number of microseconds since the epoch. The result will be independent of time zones. + * + * If the input string contains a component associated with time zone, the method will return + * `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method + * will simply discard the time zone component. Enable the check to detect situations like parsing + * a timestamp with time zone as TimestampNTZType. + * * The return type is [[Option]] in order to distinguish between 0L and null. Please * refer to `parseTimestampString` for the allowed formats. */ - def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = { + def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = { try { - val (segments, _, justTime) = parseTimestampString(s) - // If the input string can't be parsed as a timestamp, or it contains only the time part of a - // timestamp and we can't determine its date, return None. - if (segments.isEmpty || justTime) { + val (segments, zoneIdOpt, justTime) = parseTimestampString(s) + // If the input string can't be parsed as a timestamp without time zone, or it contains only + // the time part of a timestamp and we can't determine its date, return None. + if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) { return None } val nanoseconds = MICROSECONDS.toNanos(segments(6)) @@ -465,8 +470,19 @@ object DateTimeUtils { } } + /** + * Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the + * number of microseconds since the epoch. The result is independent of time zones. Zone id + * component will be discarded and ignored. + * The return type is [[Option]] in order to distinguish between 0L and null. Please + * refer to `parseTimestampString` for the allowed formats. + */ + def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = { + stringToTimestampWithoutTimeZone(s, false) + } + def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = { - stringToTimestampWithoutTimeZone(s).getOrElse { + stringToTimestampWithoutTimeZone(s, false).getOrElse { throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8a9104ae9eef9..21fd0860ef449 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -31,9 +31,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._ import org.apache.spark.sql.catalyst.util.DateTimeUtils._ import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT} import org.apache.spark.sql.catalyst.util.RebaseDateTime._ +import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._ -import org.apache.spark.sql.types.Decimal +import org.apache.spark.sql.types.{Decimal, TimestampNTZType} import org.apache.spark.unsafe.types.UTF8String sealed trait TimestampFormatter extends Serializable { @@ -55,6 +56,7 @@ sealed trait TimestampFormatter extends Serializable { * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. * * @param s - string with timestamp to parse + * @param failOnError - indicates strict parsing of timezone * @return microseconds since epoch. * @throws ParseException can be thrown by legacy parser * @throws DateTimeParseException can be thrown by new parser @@ -66,10 +68,23 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeParseException]) @throws(classOf[DateTimeException]) @throws(classOf[IllegalStateException]) - def parseWithoutTimeZone(s: String): Long = + def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = throw new IllegalStateException( - s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " + - "of timestamp without time zone") + s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " + + "implemented in the formatter of timestamp without time zone") + + /** + * Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time. + * Zone-id and zone-offset components are ignored. + */ + @throws(classOf[ParseException]) + @throws(classOf[DateTimeParseException]) + @throws(classOf[DateTimeException]) + @throws(classOf[IllegalStateException]) + final def parseWithoutTimeZone(s: String): Long = + // This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we + // did not fail if timestamp contained zone-id or zone-offset component and instead ignored it. + parseWithoutTimeZone(s, false) def format(us: Long): String def format(ts: Timestamp): String @@ -118,9 +133,12 @@ class Iso8601TimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String): Long = { + override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = { try { val parsed = formatter.parse(s) + if (failOnError && parsed.query(TemporalQueries.zone()) != null) { + throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType) + } val localDate = toLocalDate(parsed) val localTime = toLocalTime(parsed) DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(localDate, localTime)) @@ -186,9 +204,13 @@ class DefaultTimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } - override def parseWithoutTimeZone(s: String): Long = { + override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = { try { - DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s)) + val utf8Value = UTF8String.fromString(s) + DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse { + throw QueryExecutionErrors.cannotParseStringAsDataTypeError( + TimestampFormatter.defaultPattern(), s, TimestampNTZType) + } } catch checkParsedDiff(s, legacyFormatter.parse) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index dd8707c66cbe3..ef809b78d7fbf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1034,6 +1034,13 @@ object QueryExecutionErrors { s"[$token] as target spark data type [$dataType].") } + def cannotParseStringAsDataTypeError(pattern: String, value: String, dataType: DataType) + : Throwable = { + new RuntimeException( + s"Cannot parse field value ${value} for pattern ${pattern} " + + s"as target spark data type [$dataType].") + } + def failToParseEmptyStringForDataTypeError(dataType: DataType): Throwable = { new RuntimeException( s"Failed to parse an empty string for data type ${dataType.catalogString}") @@ -1894,4 +1901,3 @@ object QueryExecutionErrors { new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'") } } - diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 69bb6c141ae8f..422a6cdeda2eb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -357,6 +357,18 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper { checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None) } + test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") { + assert( + stringToTimestampWithoutTimeZone( + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) == + Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27)))) + + assert( + stringToTimestampWithoutTimeZone( + UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) == + None) + } + test("SPARK-15379: special invalid date string") { // Test stringToDate assert(toDate("2015-02-29 00:00:00").isEmpty) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index c87314386f38b..2808652f2998d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -368,4 +368,15 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { .selectExpr("value.a") checkAnswer(fromCsvDF, Row(localDT)) } + + test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in from_csv") { + val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv") + .select( + from_csv( + $"csv", + StructType(StructField("a", TimestampNTZType) :: Nil), + Map.empty[String, String]) as "value") + .selectExpr("value.a") + checkAnswer(fromCsvDF, Row(null)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index 466c1bea9d37b..6b496b714c4c7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -1012,6 +1012,210 @@ abstract class CSVSuite } } + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + exp.write + .format("csv") + .option("header", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .save(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .load(path) + + assert(res.dtypes === exp.dtypes) + checkAnswer(res, exp) + } + } + } + + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_LTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0") + exp.write + .format("csv") + .option("header", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .save(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss.SSSSSS") + .load(path) + + assert(res.dtypes === exp.dtypes) + checkAnswer(res, exp) + } + } + } + + test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql(""" + select + timestamp_ntz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + exp.write.format("csv").option("header", "true").save(path) + + val res = spark.read + .format("csv") + .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ") + .option("header", "true") + .load(path) + + checkAnswer(res, exp) + } + } + + test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + val exp = spark.sql(""" + select timestamp_ntz'2020-12-12 12:12:12' as col0 union all + select timestamp_ntz'2020-12-12 12:12:12' as col0 + """) + + exp.write.format("csv").option("header", "true").save(path) + + val timestampTypes = Seq( + SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, + SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) + + for (timestampType <- timestampTypes) { + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + val res = spark.read + .format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + checkAnswer(res, exp) + } else { + checkAnswer( + res, + spark.sql(""" + select timestamp_ltz'2020-12-12 12:12:12' as col0 union all + select timestamp_ltz'2020-12-12 12:12:12' as col0 + """) + ) + } + } + } + } + } + + test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + Seq( + "col0", + "2020-12-12T12:12:12.000", + "2020-12-12T17:12:12.000Z", + "2020-12-12T17:12:12.000+05:00", + "2020-12-12T12:12:12.000" + ).toDF("data") + .coalesce(1) + .write.text(path) + + for (policy <- Seq("exception", "corrected", "legacy")) { + withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> policy) { + val res = spark.read.format("csv") + .option("inferSchema", "true") + .option("header", "true") + .load(path) + + if (policy == "legacy") { + // Timestamps without timezone are parsed as strings, so the col0 type would be + // StringType which is similar to reading without schema inference. + val exp = spark.read.format("csv").option("header", "true").load(path) + checkAnswer(res, exp) + } else { + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } + } + } + } + } + + test("SPARK-37326: Malformed records when reading TIMESTAMP_LTZ as TIMESTAMP_NTZ") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + + Seq( + "2020-12-12T12:12:12.000", + "2020-12-12T17:12:12.000Z", + "2020-12-12T17:12:12.000+05:00", + "2020-12-12T12:12:12.000" + ).toDF("data") + .coalesce(1) + .write.text(path) + + for (timestampNTZFormat <- Seq(None, Some("yyyy-MM-dd'T'HH:mm:ss[.SSS]"))) { + val reader = spark.read.format("csv").schema("col0 TIMESTAMP_NTZ") + val res = timestampNTZFormat match { + case Some(format) => reader.option("timestampNTZFormat", format).load(path) + case None => reader.load(path) + } + + checkAnswer( + res, + Seq( + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)), + Row(null), + Row(null), + Row(LocalDateTime.of(2020, 12, 12, 12, 12, 12)) + ) + ) + } + } + } + + test("SPARK-37326: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { + val patterns = Seq( + "yyyy-MM-dd HH:mm:ss XXX", + "yyyy-MM-dd HH:mm:ss Z", + "yyyy-MM-dd HH:mm:ss z") + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + for (pattern <- patterns) { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/csv" + val err = intercept[SparkException] { + exp.write.format("csv").option("timestampNTZFormat", pattern).save(path) + } + assert( + err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") || + err.getCause.getMessage.contains("Unable to extract value")) + } + } + } + test("Write dates correctly with dateFormat option") { val customSchema = new StructType(Array(StructField("date", DateType, true))) withTempDir { dir => @@ -2489,10 +2693,6 @@ abstract class CSVSuite } test("SPARK-36536: use casting when datetime pattern is not set") { - def isLegacy: Boolean = { - spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY).toUpperCase(Locale.ROOT) == - SQLConf.LegacyBehaviorPolicy.LEGACY.toString - } withSQLConf( SQLConf.DATETIME_JAVA8API_ENABLED.key -> "true", SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.UTC.getId) { @@ -2511,13 +2711,13 @@ abstract class CSVSuite readback, Seq( Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 1, 1), Instant.parse("2021-01-01T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 1, 1, 0, 0, 0)), + LocalDateTime.of(2021, 1, 1, 0, 0, 0)), Row(LocalDate.of(2021, 2, 1), Instant.parse("2021-03-02T00:00:00Z"), - if (isLegacy) null else LocalDateTime.of(2021, 10, 1, 0, 0, 0)), + LocalDateTime.of(2021, 10, 1, 0, 0, 0)), Row(LocalDate.of(2021, 8, 18), Instant.parse("2021-08-18T21:44:30Z"), - if (isLegacy) null else LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) + LocalDateTime.of(2021, 8, 18, 21, 44, 30, 123000000)))) } } }