Skip to content

Commit

Permalink
update code to handle legacy time parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
sadikovi committed Nov 16, 2021
1 parent 6bfc1b1 commit ea47b94
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,13 @@ class Iso8601TimestampFormatter(
}

override def format(localDateTime: LocalDateTime): String = {
localDateTime.format(formatter)
// If the legacy time parser policy is selected, we can only write timestamp with timezone,
// we will use the default time zone for it.
if (SQLConf.get.legacyTimeParserPolicy == LEGACY) {
format(toJavaTimestamp(instantToMicros(localDateTime.atZone(zoneId).toInstant)))
} else {
localDateTime.format(formatter)
}
}

override def validatePatternString(checkLegacy: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,6 +1012,30 @@ abstract class CSVSuite
}
}

test("SPARK-37326: Write TIMESTAMP_NTZ in legacy time parser policy") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"

val exp = spark.sql("select timestamp_ltz'2020-12-12 12:12:12' as col1").coalesce(1)

withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> "legacy") {
exp.write.format("csv")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("header", "true")
.save(path)
}

val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
.option("header", "true")
.load(path)

checkAnswer(res, exp)
}
}

test("SPARK-37326: Roundtrip in reading and writing TIMESTAMP_NTZ values with custom schema") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"
Expand Down Expand Up @@ -1039,37 +1063,38 @@ abstract class CSVSuite
val path = s"${dir.getCanonicalPath}/csv"

val exp = spark.sql("""
select
timestamp_ntz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
select timestamp_ntz'2020-12-12 12:12:12' as col0 union all
select timestamp_ntz'2020-12-12 12:12:12' as col0
""")

exp.write.format("csv").option("header", "true").save(path)

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)
val timestampTypes = Seq(
SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString,
SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString)

checkAnswer(res, exp)
}

withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> SQLConf.TimestampTypes.TIMESTAMP_LTZ.toString) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)

val exp = spark.sql("""
select
timestamp_ltz'2020-12-12 12:12:12' as col1,
timestamp_ltz'2020-12-12 12:12:12' as col2
""")

checkAnswer(res, exp)
for (timestampType <- timestampTypes) {
withSQLConf(SQLConf.TIMESTAMP_TYPE.key -> timestampType) {
val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)

if (timestampType == SQLConf.TimestampTypes.TIMESTAMP_NTZ.toString &&
spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) != "legacy") {
checkAnswer(res, exp)
} else {
// Timestamps are written as timestamp with timezone in the legacy mode.
checkAnswer(
res,
spark.sql("""
select timestamp_ltz'2020-12-12 12:12:12' as col0 union all
select timestamp_ltz'2020-12-12 12:12:12' as col0
""")
)
}
}
}
}
}
Expand All @@ -1079,27 +1104,35 @@ abstract class CSVSuite
val path = s"${dir.getCanonicalPath}/csv"

Seq(
"col0",
"2020-12-12T12:12:12.000",
"2020-12-12T17:12:12.000Z",
"2020-12-12T17:12:12.000+05:00",
"2020-12-12T12:12:12.000"
).toDF("col0")
).toDF("data")
.coalesce(1)
.write.text(path)

val res = spark.read
.format("csv")
.option("inferSchema", "true")
.option("header", "true")
.load(path)

val exp = spark.sql("""
select timestamp'2020-12-12T12:12:12.000' as col0 union all
select timestamp'2020-12-12T17:12:12.000Z' as col0 union all
select timestamp'2020-12-12T17:12:12.000+05:00' as col0 union all
select timestamp'2020-12-12T12:12:12.000' as col0
""")

checkAnswer(res, exp)
if (spark.conf.get(SQLConf.LEGACY_TIME_PARSER_POLICY.key) == "legacy") {
// Timestamps without timezone are parsed as strings, so the col0 type would be StringType
// which is similar to reading without schema inference.
val exp = spark.read.format("csv").option("header", "true").load(path)
checkAnswer(res, exp)
} else {
val exp = spark.sql("""
select timestamp_ltz'2020-12-12T12:12:12.000' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000Z' as col0 union all
select timestamp_ltz'2020-12-12T17:12:12.000+05:00' as col0 union all
select timestamp_ltz'2020-12-12T12:12:12.000' as col0
""")
checkAnswer(res, exp)
}
}
}

Expand Down

0 comments on commit ea47b94

Please sign in to comment.