diff --git a/docs/sql-data-sources-json.md b/docs/sql-data-sources-json.md index 5e3bd2bc42dfc..b5f27aacf4150 100644 --- a/docs/sql-data-sources-json.md +++ b/docs/sql-data-sources-json.md @@ -9,9 +9,9 @@ license: | The ASF licenses this file to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - + http://www.apache.org/licenses/LICENSE-2.0 - + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -196,6 +196,12 @@ Data source options of JSON can be set via: Sets the string that indicates a timestamp format. Custom date formats follow the formats at datetime pattern. This applies to timestamp type. read/write + + timestampNTZFormat + yyyy-MM-dd'T'HH:mm:ss[.SSS] + Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at Datetime Patterns. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type. + read/write + multiLine false diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 029c014fedc90..7d0f6d732f6d8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -106,6 +106,20 @@ private[sql] class JSONOptions( s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" }) + val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat").orElse { + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + Some(s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS") + } else { + None + } + } + val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat", + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSS" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]" + }) + val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false) /** @@ -138,8 +152,9 @@ private[sql] class JSONOptions( val pretty: Boolean = parameters.get("pretty").map(_.toBoolean).getOrElse(false) /** - * Enables inferring of TimestampType from strings matched to the timestamp pattern - * defined by the timestampFormat option. + * Enables inferring of TimestampType and TimestampNTZType from strings matched to the + * corresponding timestamp pattern defined by the timestampFormat and timestampNTZFormat options + * respectively. */ val inferTimestamp: Boolean = parameters.get("inferTimestamp").map(_.toBoolean).getOrElse(false) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala index d00065b19c6a8..336c0ceecc99d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonGenerator.scala @@ -91,7 +91,7 @@ private[sql] class JacksonGenerator( legacyFormat = FAST_DATE_FORMAT, isParsing = false) private val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInWrite, + options.timestampNTZFormatInWrite, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = false, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index cb6a079aacc8a..77e22c919c624 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -66,7 +66,7 @@ class JacksonParser( legacyFormat = FAST_DATE_FORMAT, isParsing = true) private lazy val timestampNTZFormatter = TimestampFormatter( - options.timestampFormatInRead, + options.timestampNTZFormatInRead, options.zoneId, legacyFormat = FAST_DATE_FORMAT, isParsing = true, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala index 3b62b16335a8d..da76c8e5faa05 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala @@ -46,6 +46,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { options.locale, legacyFormat = FAST_DATE_FORMAT, isParsing = true) + private val timestampNTZFormatter = TimestampFormatter( + options.timestampNTZFormatInRead, + options.zoneId, + legacyFormat = FAST_DATE_FORMAT, + isParsing = true, + forTimestampNTZ = true) private def handleJsonErrorsByParseMode(parseMode: ParseMode, columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = { @@ -144,6 +150,10 @@ private[sql] class JsonInferSchema(options: JSONOptions) extends Serializable { } if (options.prefersDecimal && decimalTry.isDefined) { decimalTry.get + } else if (options.inferTimestamp && + (allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) && + (allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field)).isDefined) { + SQLConf.get.timestampType } else if (options.inferTimestamp && (allCatch opt timestampFormatter.parse(field)).isDefined) { TimestampType @@ -393,6 +403,9 @@ object JsonInferSchema { case (t1: DecimalType, t2: IntegralType) => compatibleType(t1, DecimalType.forType(t2)) + case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) => + TimestampType + // strings and every string is a Json object. case (_, _) => StringType } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 8a9104ae9eef9..0fd2287c4224b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -71,6 +71,19 @@ sealed trait TimestampFormatter extends Serializable { s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " + "of timestamp without time zone") + /** + * Returns true if the parsed timestamp contains the time zone component, false otherwise. + * Used to determine if the timestamp can be inferred as timestamp without time zone. + * + * @param s - string with timestamp to inspect + * @return whether the timestamp string has the time zone component defined. + */ + @throws(classOf[IllegalStateException]) + def isTimeZoneSet(s: String): Boolean = + throw new IllegalStateException( + s"The method `isTimeZoneSet(s: String)` should be implemented in the formatter " + + "of timestamp without time zone") + def format(us: Long): String def format(ts: Timestamp): String def format(instant: Instant): String @@ -127,6 +140,14 @@ class Iso8601TimestampFormatter( } catch checkParsedDiff(s, legacyFormatter.parse) } + override def isTimeZoneSet(s: String): Boolean = { + try { + val parsed = formatter.parse(s) + val parsedZoneId = parsed.query(TemporalQueries.zone()) + parsedZoneId != null + } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + } + override def format(instant: Instant): String = { try { formatter.withZone(zoneId).format(instant) @@ -191,6 +212,13 @@ class DefaultTimestampFormatter( DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s)) } catch checkParsedDiff(s, legacyFormatter.parse) } + + override def isTimeZoneSet(s: String): Boolean = { + try { + val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s)) + zoneIdOpt.isDefined + } catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 075d6e9156597..a67718e60965c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2746,6 +2746,141 @@ abstract class JsonSuite } } + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + exp.write.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss").json(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + val res = spark.read + .option("inferTimestamp", "true") + .option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss") + .json(path) + checkAnswer(res, exp) + } + } + } + + test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_LTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + + val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col0") + exp.write.option("timestampFormat", "yyyy-MM-dd HH:mm:ss").json(path) + + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) { + val res = spark.read + .option("inferTimestamp", "true") + .option("timestampFormat", "yyyy-MM-dd HH:mm:ss") + .json(path) + + checkAnswer(res, exp) + } + } + } + + test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + + val exp = spark.sql(""" + select + timestamp_ntz'2020-12-12 12:12:12' as col1, + timestamp_ltz'2020-12-12 12:12:12' as col2 + """) + + exp.write.json(path) + + val res = spark.read + .schema("col1 TIMESTAMP_NTZ, col2 TIMESTAMP_LTZ") + .json(path) + + checkAnswer(res, exp) + } + } + + test("SPARK-37326: Timestamp type inference for a column with TIMESTAMP_NTZ values") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + + val exp = spark.sql(""" + select timestamp_ntz'2020-12-12 12:12:12' as col0 union all + select timestamp_ntz'2020-12-12 12:12:12' as col0 + """) + + exp.write.json(path) + + val timestampTypes = Seq( + SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString, + SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) + + for (timestampType <- timestampTypes) { + withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) { + val res = spark.read.option("inferTimestamp", "true").json(path) + + if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) { + checkAnswer(res, exp) + } else { + checkAnswer( + res, + spark.sql(""" + select timestamp_ltz'2020-12-12 12:12:12' as col0 union all + select timestamp_ltz'2020-12-12 12:12:12' as col0 + """) + ) + } + } + } + } + } + + test("SPARK-37326: Timestamp type inference for a mix of TIMESTAMP_NTZ and TIMESTAMP_LTZ") { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + + Seq( + """{"c0":"2020-12-12T12:12:12.000"}""", + """{"c0":"2020-12-12T17:12:12.000Z"}""", + """{"c0":"2020-12-12T17:12:12.000+05:00"}""", + """{"c0":"2020-12-12T12:12:12.000"}""" + ).toDF("data") + .coalesce(1) + .write.text(path) + + val res = spark.read.option("inferTimestamp", "true").json(path) + + val exp = spark.sql(""" + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all + select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all + select timestamp_ltz'2020-12-12T12:12:12.000' as col0 + """) + checkAnswer(res, exp) + } + } + + test("SPARK-37326: Fail to write TIMESTAMP_NTZ if timestampNTZFormat contains zone offset") { + val patterns = Seq( + "yyyy-MM-dd HH:mm:ss XXX", + "yyyy-MM-dd HH:mm:ss Z", + "yyyy-MM-dd HH:mm:ss z") + + val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0") + for (pattern <- patterns) { + withTempDir { dir => + val path = s"${dir.getCanonicalPath}/json" + val err = intercept[SparkException] { + exp.write.option("timestampNTZFormat", pattern).json(path) + } + assert( + err.getCause.getMessage.contains("Unsupported field: OffsetSeconds") || + err.getCause.getMessage.contains("Unable to extract value")) + } + } + } + test("filters push down") { withTempPath { path => val t = "2019-12-17 00:01:02"