Skip to content

Commit

Permalink
[SPARK-48776] Fix timestamp formatting for json, xml and csv
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

In this pull request i propose to change default ISO pattern we use for formatting timestamps when we are writing to json,xml and/or csv as well as when to_(xml|json|csv) is used.

Older timestamps sometimes have offsets that contain seconds part as well. Current default formatting used is omitting seconds hence providing wrong results.

e.g.
```
sql("SET spark.sql.session.timeZone=America/Los_Angeles")
sql("SELECT to_json(struct(CAST('1800-01-01T00:00:00+00:00' AS TIMESTAMP) AS ts))").show(false)
{"ts":"1799-12-31T16:07:02.000-07:52"}
```

### Why are the changes needed?

This is correctness issue.

### Does this PR introduce _any_ user-facing change?

Yes, users will now see different results for older timestamps (correct ones).

### How was this patch tested?

Tests

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47177 from milastdbx/dev/milast/fixJsonTimestampHandling.

Authored-by: milastdbx <milan.stefanovic@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
milastdbx authored and cloud-fan committed Jul 8, 2024
1 parent 2c54aa5 commit c4085f1
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.FileSourceOptions.{IGNORE_CORRUPT_FILES, IGNORE_MISSING_FILES}
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter}
import org.apache.spark.sql.internal.{LegacyBehaviorPolicy, SQLConf}

/**
* Common options for the file-based data source.
Expand All @@ -29,6 +29,17 @@ class FileSourceOptions(

def this(parameters: Map[String, String]) = this(CaseInsensitiveMap(parameters))

protected def commonTimestampFormat =
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
if (SQLConf.get.supportSecondOffsetFormat) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXXXX]"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
}
}

val ignoreCorruptFiles: Boolean = parameters.get(IGNORE_CORRUPT_FILES).map(_.toBoolean)
.getOrElse(SQLConf.get.ignoreCorruptFiles)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,8 @@ class CSVOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
val timestampFormatInWrite: String =
parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)

val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String = parameters.getOrElse(TIMESTAMP_NTZ_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,8 @@ class JSONOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
val timestampFormatInWrite: String =
parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)

val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,8 @@ class XmlOptions(
} else {
parameters.get(TIMESTAMP_FORMAT)
}
val timestampFormatInWrite: String = parameters.getOrElse(TIMESTAMP_FORMAT,
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss.SSSXXX"
} else {
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})
val timestampFormatInWrite: String =
parameters.getOrElse(TIMESTAMP_FORMAT, commonTimestampFormat)

val timestampNTZFormatInRead: Option[String] = parameters.get(TIMESTAMP_NTZ_FORMAT)
val timestampNTZFormatInWrite: String =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4975,6 +4975,16 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val SUPPORT_SECOND_OFFSET_FORMAT =
buildConf("spark.sql.files.supportSecondOffsetFormat")
.internal()
.doc("When set to true, datetime formatter used for csv, json and xml " +
"will support zone offsets that have seconds in it. e.g. LA timezone offset prior to 1883" +
"was -07:52:58. When this flag is not set we lose seconds information." )
.version("4.0.0")
.booleanConf
.createWithDefault(true)

// Deprecate "spark.connect.copyFromLocalToFs.allowDestLocal" in favor of this config. This is
// currently optional because we don't want to break existing users who are using the old config.
// If this config is set, then we override the deprecated config.
Expand Down Expand Up @@ -5934,6 +5944,8 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def legacyPathOptionBehavior: Boolean = getConf(SQLConf.LEGACY_PATH_OPTION_BEHAVIOR)

def supportSecondOffsetFormat: Boolean = getConf(SQLConf.SUPPORT_SECOND_OFFSET_FORMAT)

def disabledJdbcConnectionProviders: String = getConf(
StaticSQLConf.DISABLED_JDBC_CONN_PROVIDER_LIST)

Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/test/resources/test-data/timestamps.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
timestamp
01/01/1800 18:00Z
01/01/1885 18:30Z
27/10/2014 18:30
26/08/2015 18:00
28/01/2016 20:00
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<book>
<author>John Smith</author>
<time>01/01/1800 18:00Z</time>
<time2>01/01/1885 18:30Z</time2>
<time3>27/10/2014 18:30</time3>
<time4>26/08/2015 18:00</time4>
<time5>28/01/2016 20:00</time5>
</book>
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,12 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
checkAnswer(df.select(to_csv($"a")), Row("1") :: Nil)
}

test("to_csv ISO default - old dates") {
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a")

checkAnswer(df.select(to_csv($"a")), Row("1800-01-01T00:00:00.000-07:52:58") :: Nil)
}

test("to_csv with option (timestampFormat)") {
val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("2015-08-26 18:00:00.0")))).toDF("a")
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm").asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,17 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
Row("""{"_1":"26/08/2015 18:00"}""") :: Nil)
}

test("to_json ISO default - old dates") {
withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {

val df = Seq(Tuple1(Tuple1(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))).toDF("a")

checkAnswer(
df.select(to_json($"a")),
Row("""{"_1":"1800-01-01T00:00:00.000-07:52:58"}""") :: Nil)
}
}

test("to_json with option (dateFormat)") {
val df = Seq(Tuple1(Tuple1(java.sql.Date.valueOf("2015-08-26")))).toDF("a")
val options = Map("dateFormat" -> "dd/MM/yyyy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,23 @@ class XmlFunctionsSuite extends QueryTest with SharedSparkSession {
Row(expected) :: Nil)
}

test("to_xml ISO default - old dates") {
withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil)
val data = Seq(Row(java.sql.Timestamp.valueOf("1800-01-01 00:00:00.0")))
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
.withColumn("a", struct($"a"))

val expected =
s"""|<ROW>
| <a>1800-01-01T00:00:00.000-07:52:58</a>
|</ROW>""".stripMargin
checkAnswer(
df.select(to_xml($"a")),
Row(expected) :: Nil)
}
}

test("to_xml with option (timestampFormat)") {
val options = Map("timestampFormat" -> "dd/MM/yyyy HH:mm")
val schema = StructType(StructField("a", TimestampType, nullable = false) :: Nil)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ abstract class CSVSuite
private val emptyFile = "test-data/empty.csv"
private val commentsFile = "test-data/comments.csv"
private val disableCommentsFile = "test-data/disable_comments.csv"
private val timestampFile = "test-data/timestamps.csv"
private val boolFile = "test-data/bool.csv"
private val decimalFile = "test-data/decimal.csv"
private val simpleSparseFile = "test-data/simple_sparse.csv"
Expand Down Expand Up @@ -968,8 +969,8 @@ abstract class CSVSuite
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.option("timestampFormat", "dd/MM/yyyy HH:mm")
.load(testFile(datesFile))
.option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
.load(testFile(timestampFile))
timestamps.write
.format("csv")
.option("header", "true")
Expand All @@ -983,11 +984,13 @@ abstract class CSVSuite
.option("header", "true")
.load(iso8601timestampsPath)

val iso8501 = FastDateFormat.getInstance("yyyy-MM-dd'T'HH:mm:ss.SSSXXX", Locale.US)
val expectedTimestamps = timestamps.collect().map { r =>
// This should be ISO8601 formatted string.
Row(iso8501.format(r.toSeq.head))
}
val expectedTimestamps = Seq(
Row("1800-01-01T10:07:02.000-07:52:58"),
Row("1885-01-01T10:30:00.000-08:00"),
Row("2014-10-27T18:30:00.000-07:00"),
Row("2015-08-26T18:00:00.000-07:00"),
Row("2016-01-28T20:00:00.000-08:00")
)

checkAnswer(iso8601Timestamps, expectedTimestamps)
}
Expand Down Expand Up @@ -3427,6 +3430,10 @@ class CSVv2Suite extends CSVSuite {
}

class CSVLegacyTimeParserSuite extends CSVSuite {

override def excluded: Seq[String] =
Seq("Write timestamps correctly in ISO8601 format by default")

override protected def sparkConf: SparkConf =
super
.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1770,6 +1770,39 @@ abstract class JsonSuite
}
}

test("Write timestamps correctly in ISO8601 format by default") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
// lets set LA timezone as for old dates LA had seconds offset
withSQLConf("spark.sql.session.timeZone" -> "America/Los_Angeles") {
// With dateFormat option.
val timestampsWithoutFormatPath = s"${dir.getCanonicalPath}/timestampsWithoutFormat.json"
val timestampsWithoutFormat = spark.read
.schema(customSchema)
.option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
.json(datesRecords.union(oldDatesRecord))

timestampsWithoutFormat.write
.format("json")
.save(timestampsWithoutFormatPath)

// This will load back the timestamps as string.
val stringSchema = StructType(StructField("date", StringType, true) :: Nil)
val stringTimestampsWithFormat = spark.read
.schema(stringSchema)
.json(timestampsWithoutFormatPath)
val expectedStringDatesWithoutFormat = Seq(
Row("1800-01-01T10:07:02.000-07:52:58"),
Row("1885-01-01T10:30:00.000-08:00"),
Row("2014-10-27T18:30:00.000-07:00"),
Row("2015-08-26T18:00:00.000-07:00"),
Row("2016-01-28T20:00:00.000-08:00"))

checkAnswer(stringTimestampsWithFormat, expectedStringDatesWithoutFormat)
}
}
}

test("Write timestamps correctly with timestampFormat option") {
val customSchema = new StructType(Array(StructField("date", TimestampType, true)))
withTempDir { dir =>
Expand Down Expand Up @@ -3968,6 +4001,10 @@ class JsonV2Suite extends JsonSuite {
}

class JsonLegacyTimeParserSuite extends JsonSuite {

override def excluded: Seq[String] =
Seq("Write timestamps correctly in ISO8601 format by default")

override protected def sparkConf: SparkConf =
super
.sparkConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ private[json] trait TestJsonData {
"""{"date": "27/10/2014 18:30"}""" ::
"""{"date": "28/01/2016 20:00"}""" :: Nil))(Encoders.STRING)

def oldDatesRecord: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize(
"""{"date": "01/01/1800 18:00Z"}""" ::
"""{"date": "01/01/1885 18:30Z"}""":: Nil))(Encoders.STRING)

lazy val singleRow: Dataset[String] =
spark.createDataset(spark.sparkContext.parallelize("""{"a":123}""" :: Nil))(Encoders.STRING)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,6 +1626,57 @@ class XmlSuite
assert(df.collect().head.getAs[Timestamp](2).getTime === 1322936130000L)
}

test("Write timestamps correctly in ISO8601 format by default") {
val originalSchema =
buildSchema(
field("author"),
field("time", TimestampType),
field("time2", TimestampType),
field("time3", TimestampType),
field("time4", TimestampType),
field("time5", TimestampType)
)

val df = spark.read
.option("rowTag", "book")
.option("timestampFormat", "dd/MM/yyyy HH:mm[XXX]")
.schema(originalSchema)
.xml(getTestResourcePath(resDir + "timestamps.xml"))

withTempDir { dir =>
// use los angeles as old dates have wierd offsets
withSQLConf("spark.session.timeZone" -> "America/Los_Angeles") {
df
.write
.option("rowTag", "book")
.xml(dir.getCanonicalPath + "/xml")
val schema =
buildSchema(
field("author"),
field("time", StringType),
field("time2", StringType),
field("time3", StringType),
field("time4", StringType),
field("time5", StringType)
)
val df2 = spark.read
.option("rowTag", "book")
.schema(schema)
.xml(dir.getCanonicalPath + "/xml")

val expectedStringDatesWithoutFormat = Seq(
Row("John Smith",
"1800-01-01T10:07:02.000-07:52:58",
"1885-01-01T10:30:00.000-08:00",
"2014-10-27T18:30:00.000-07:00",
"2015-08-26T18:00:00.000-07:00",
"2016-01-28T20:00:00.000-08:00"))

checkAnswer(df2, expectedStringDatesWithoutFormat)
}
}
}

test("Test custom timestampFormat without timezone") {
val xml = s"""<book>
| <author>John Smith</author>
Expand Down

0 comments on commit c4085f1

Please sign in to comment.