diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala index 6b9826d652e28..22f84a1cad63d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/FileSourceOptions.scala @@ -17,8 +17,8 @@ package org.apache.spark.sql.catalyst import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES} -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap -import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter} +import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf} /** * Common options for the file-based data source. @@ -29,6 +29,17 @@ class FileSourceOptions( def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters)) + protected def commonTimestampFormat = + if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" + } else { + if (SQLConf.get.supportSecondOffsetFormat) { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXXXX]" + } else { + s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" + } + } + val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean) .getOrElse(SQLConf.get.ignoreCorruptFiles) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala index f4ade722791c9..13b0b8077128e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/csv/CSVOptions.scala @@ -186,12 +186,8 @@ class CSVOptions( } else { parameters.get(TIMESTAMP_FORMAT) } - val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT, - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" - } else { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" - }) + val timestampFormatInWrite: String = + parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat) val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT) val timestampNTZFormatInWrite: String = parameters.getOrElse(TIMESTAMP_NTZ_FORMAT, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala index 945b6e7de8b7a..247106a8b8cd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JSONOptions.scala @@ -122,12 +122,8 @@ class JSONOptions( } else { parameters.get(TIMESTAMP_FORMAT) } - val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT, - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" - } else { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" - }) + val timestampFormatInWrite: String = + parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat) val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT) val timestampNTZFormatInWrite: String = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala index 336c54e164e82..e2c2d9dbc6d63 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/XmlOptions.scala @@ -145,12 +145,8 @@ class XmlOptions( } else { parameters.get(TIMESTAMP_FORMAT) } - val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT, - if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX" - } else { - s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]" - }) + val timestampFormatInWrite: String = + parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat) val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT) val timestampNTZFormatInWrite: String = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 15275be00f31e..f0ba5f7d77801 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4975,6 +4975,16 @@ object SQLConf { .booleanConf .createWithDefault(false) + val SUPPORT_SECOND_OFFSET_FORMAT = + buildConf("spark.sql.files.supportSecondOffsetFormat") + .internal() + .doc("When set to true, datetime formatter used for csv, json and xml " + + "will support zone offsets that have seconds in it. e.g. LA timezone offset prior to 1883" + + "was -07:52:58. When this flag is not set we lose seconds information." ) + .version("4.0.0") + .booleanConf + .createWithDefault(true) + // Deprecate "spark.connect.copyFromLocalToFs.allowDestLocal" in favor of this config. This is // currently optional because we don't want to break existing users who are using the old config. // If this config is set, then we override the deprecated config. @@ -5934,6 +5944,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR) + def supportSecondOffsetFormat: Boolean = getConf(SQLConf.SUPPORT_SECOND_OFFSET_FORMAT) + def disabledJdbcConnectionProviders: String = getConf( StaticSQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST) diff --git a/sql/core/src/test/resources/test-data/timestamps.csv b/sql/core/src/test/resources/test-data/timestamps.csv new file mode 100644 index 0000000000000..cea37b68a5f97 --- /dev/null +++ b/sql/core/src/test/resources/test-data/timestamps.csv @@ -0,0 +1,6 @@ +timestamp +01/01/1800 18:00Z +01/01/1885 18:30Z +27/10/2014 18:30 +26/08/2015 18:00 +28/01/2016 20:00 diff --git a/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml b/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml new file mode 100644 index 0000000000000..5a58bc54292e1 --- /dev/null +++ b/sql/core/src/test/resources/test-data/xml-resources/timestamps.xml @@ -0,0 +1,8 @@ + + John Smith + + 01/01/1885 18:30Z + 27/10/2014 18:30 + 26/08/2015 18:00 + 28/01/2016 20:00 + \ No newline at end of file diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala index 0822b0ac8073d..bb3c00d238ca6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala @@ -264,6 +264,12 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { checkAnswer(df.select(to_csv($"a")), Row("1") :: Nil) } + test("to_csv ISO default - old dates") { + val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a") + + checkAnswer(df.select(to_csv($"a")), Row("1800-01-01T00:00:00.000-07:52:58") :: Nil) + } + test("to_csv with option (timestampFormat)") { val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a") val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala index ea00e02e232c6..9dca1d091d33b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JsonFunctionsSuite.scala @@ -406,6 +406,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession { Row("""{"_1":"26/08/2015 18:00"}""") :: Nil) } + test("to_json ISO default - old dates") { + withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") { + + val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a") + + checkAnswer( + df.select(to_json($"a")), + Row("""{"_1":"1800-01-01T00:00:00.000-07:52:58"}""") :: Nil) + } + } + test("to_json with option (dateFormat)") { val df = Seq(Tuple1(Tuple1(java.sql.Date.valueOf("2015-08-26")))).toDF("a") val options = Map("dateFormat" -> "dd/MM/yyyy") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala index 1364fab3138e3..4169d53e4fc8e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/XmlFunctionsSuite.scala @@ -166,6 +166,23 @@ class XmlFunctionsSuite extends QueryTest with SharedSparkSession { Row(expected) :: Nil) } + test("to_xml ISO default - old dates") { + withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") { + val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil) + val data = Seq(Row(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0"))) + val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) + .withColumn("a", struct($"a")) + + val expected = + s"""| + | 1800-01-01T00:00:00.000-07:52:58 + |""".stripMargin + checkAnswer( + df.select(to_xml($"a")), + Row(expected) :: Nil) + } + } + test("to_xml with option (timestampFormat)") { val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm") val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala index f7ea8a735068e..cb442c44832bd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/csv/CSVSuite.scala @@ -70,6 +70,7 @@ abstract class CSVSuite private val emptyFile = "test-data/empty.csv" private val commentsFile = "test-data/comments.csv" private val disableCommentsFile = "test-data/disable_comments.csv" + private val timestampFile = "test-data/timestamps.csv" private val boolFile = "test-data/bool.csv" private val decimalFile = "test-data/decimal.csv" private val simpleSparseFile = "test-data/simple_sparse.csv" @@ -968,8 +969,8 @@ abstract class CSVSuite .format("csv") .option("inferSchema", "true") .option("header", "true") - .option("timestampFormat", "dd/MM/yyyy HH:mm") - .load(testFile(datesFile)) + .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]") + .load(testFile(timestampFile)) timestamps.write .format("csv") .option("header", "true") @@ -983,11 +984,13 @@ abstract class CSVSuite .option("header", "true") .load(iso8601timestampsPath) - val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.US) - val expectedTimestamps = timestamps.collect().map { r => - // This should be ISO8601 formatted string. - Row(iso8501.format(r.toSeq.head)) - } + val expectedTimestamps = Seq( + Row("1800-01-01T10:07:02.000-07:52:58"), + Row("1885-01-01T10:30:00.000-08:00"), + Row("2014-10-27T18:30:00.000-07:00"), + Row("2015-08-26T18:00:00.000-07:00"), + Row("2016-01-28T20:00:00.000-08:00") + ) checkAnswer(iso8601Timestamps, expectedTimestamps) } @@ -3427,6 +3430,10 @@ class CSVv2Suite extends CSVSuite { } class CSVLegacyTimeParserSuite extends CSVSuite { + + override def excluded: Seq[String] = + Seq("Write timestamps correctly in ISO8601 format by default") + override protected def sparkConf: SparkConf = super .sparkConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index 9e5ecc08e24a2..fda066aa26472 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -1770,6 +1770,39 @@ abstract class JsonSuite } } + test("Write timestamps correctly in ISO8601 format by default") { + val customSchema = new StructType(Array(StructField("date", TimestampType, true))) + withTempDir { dir => + // lets set LA timezone as for old dates LA had seconds offset + withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") { + // With dateFormat option. + val timestampsWithoutFormatPath = s"${dir.getCanonicalPath}/timestampsWithoutFormat.json" + val timestampsWithoutFormat = spark.read + .schema(customSchema) + .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]") + .json(datesRecords.union(oldDatesRecord)) + + timestampsWithoutFormat.write + .format("json") + .save(timestampsWithoutFormatPath) + + // This will load back the timestamps as string. + val stringSchema = StructType(StructField("date", StringType, true) :: Nil) + val stringTimestampsWithFormat = spark.read + .schema(stringSchema) + .json(timestampsWithoutFormatPath) + val expectedStringDatesWithoutFormat = Seq( + Row("1800-01-01T10:07:02.000-07:52:58"), + Row("1885-01-01T10:30:00.000-08:00"), + Row("2014-10-27T18:30:00.000-07:00"), + Row("2015-08-26T18:00:00.000-07:00"), + Row("2016-01-28T20:00:00.000-08:00")) + + checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithoutFormat) + } + } + } + test("Write timestamps correctly with timestampFormat option") { val customSchema = new StructType(Array(StructField("date", TimestampType, true))) withTempDir { dir => @@ -3968,6 +4001,10 @@ class JsonV2Suite extends JsonSuite { } class JsonLegacyTimeParserSuite extends JsonSuite { + + override def excluded: Seq[String] = + Seq("Write timestamps correctly in ISO8601 format by default") + override protected def sparkConf: SparkConf = super .sparkConf diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala index 5c35ee03fb271..6fa2bdfbfe758 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/TestJsonData.scala @@ -229,6 +229,11 @@ private[json] trait TestJsonData { """{"date": "27/10/2014 18:30"}""" :: """{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING) + def oldDatesRecord: Dataset[String] = + spark.createDataset(spark.sparkContext.parallelize( + """{"date": "01/01/1800 18:00Z"}""" :: + """{"date": "01/01/1885 18:30Z"}""":: Nil))(Encoders.STRING) + lazy val singleRow: Dataset[String] = spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala index 930cc29878108..30588dde965d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/xml/XmlSuite.scala @@ -1626,6 +1626,57 @@ class XmlSuite assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L) } + test("Write timestamps correctly in ISO8601 format by default") { + val originalSchema = + buildSchema( + field("author"), + field("time", TimestampType), + field("time2", TimestampType), + field("time3", TimestampType), + field("time4", TimestampType), + field("time5", TimestampType) + ) + + val df = spark.read + .option("rowTag", "book") + .option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]") + .schema(originalSchema) + .xml(getTestResourcePath(resDir + "timestamps.xml")) + + withTempDir { dir => + // use los angeles as old dates have wierd offsets + withSQLConf("spark.session.timeZone" -> "America/Los_Angeles") { + df + .write + .option("rowTag", "book") + .xml(dir.getCanonicalPath + "/xml") + val schema = + buildSchema( + field("author"), + field("time", StringType), + field("time2", StringType), + field("time3", StringType), + field("time4", StringType), + field("time5", StringType) + ) + val df2 = spark.read + .option("rowTag", "book") + .schema(schema) + .xml(dir.getCanonicalPath + "/xml") + + val expectedStringDatesWithoutFormat = Seq( + Row("John Smith", + "1800-01-01T10:07:02.000-07:52:58", + "1885-01-01T10:30:00.000-08:00", + "2014-10-27T18:30:00.000-07:00", + "2015-08-26T18:00:00.000-07:00", + "2016-01-28T20:00:00.000-08:00")) + + checkAnswer(df2, expectedStringDatesWithoutFormat) + } + } + } + test("Test custom timestampFormat without timezone") { val xml = s""" | John Smith