Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-37326][SQL] Support TimestampNTZ in CSV data source #34596

Closed
wants to merge 14 commits into from

Conversation

sadikovi
Copy link
Contributor

@sadikovi sadikovi commented Nov 15, 2021

What changes were proposed in this pull request?

This PR adds support for TimestampNTZ type in the CSV data source.

Most of the functionality has already been added, this patch verifies that writes + reads work for TimestampNTZ type and adds schema inference depending on the timestamp value format written. The following applies:

  • If there is a mixture of TIMESTAMP_NTZ and TIMESTAMP_LTZ values, use TIMESTAMP_LTZ.
  • If there are only TIMESTAMP_NTZ values, resolve using the the default timestamp type configured with spark.sql.timestampType.

In addition, I introduced a new CSV option timestampNTZFormat which is similar to timestampFormat but it allows to configure read/write pattern for TIMESTAMP_NTZ types. It is basically a copy of timestamp pattern but without timezone.

The schema inference works in the following way:

  1. We test if the field can be parsed a timestamp without timezone using timestampNTZFormat.
  2. If the field has the timezone component, parseWithoutTimeZone method throws QueryExecutionErrors.cannotParseStringAsDataTypeError which is a RuntimeException.
  3. Move on to parsing the field as timestamp with timezone (the existing logic).

Why are the changes needed?

The current CSV source could write values as TimestampNTZ into a file but could not preserve this type when reading the file back, this PR fixes the issue.

Does this PR introduce any user-facing change?

Previously, CSV data source would infer timestamp values as TimestampType when reading a CSV file. Now, the data source would infer the timestamp value type based on the format (with or without timezone) and default timestamp type based on spark.sql.timestampType.

A new CSV option timestampNTZFormat is added to control the way values are formatted during writes or parsed during reads.

Now if the timestamp cannot be parsed as a timestamp without timezone, e.g. contains the zone-offset or zone-id component, parseWithTimeZone throws RuntimeException signalling the inference code to try the next type.

How was this patch tested?

I extended CSVSuite with a few unit tests to verify that write-read roundtrip works for TIMESTAMP_NTZ and TIMESTAMP_LTZ values.

@github-actions github-actions bot added the SQL label Nov 15, 2021
@sadikovi
Copy link
Contributor Author

cc @gengliangwang @MaxGekk

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49690/

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49690/

@SparkQA
Copy link

SparkQA commented Nov 15, 2021

Test build #145220 has finished for PR 34596 at commit 6bfc1b1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sadikovi
Copy link
Contributor Author

Let me fix the tests first.

@sadikovi
Copy link
Contributor Author

@gengliangwang @MaxGekk @cloud-fan could you review this PR? Thanks.

@SparkQA
Copy link

SparkQA commented Nov 16, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49732/

@SparkQA
Copy link

SparkQA commented Nov 16, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49732/

@SparkQA
Copy link

SparkQA commented Nov 16, 2021

Test build #145262 has finished for PR 34596 at commit ea47b94.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sadikovi
Copy link
Contributor Author

I would like to discuss how to handle the following case:

  • User writes TIMESTAMP_NTZ value in CSV.
  • User set spark.sql.legacy.timeParserPolicy to "legacy"

This case is ambiguous: default timestamp pattern for legacy requires timezone but the value does not have any. However, the code works just fine when the pattern does not have timezone. In that case we would write the value successfully.

To resolve ambiguity, I identified 2 options:

  • throw an exception saying that TIMESTAMP_NTZ values are not supported in "legacy" time parsing, regardless of whether timestamp format has the timezone component or not.
  • Write timestamp value with the default timezone, basically converting LocalDateTime into java.sql.Timestamp (currently implemented in the PR).

@gengliangwang Do you have any preferences here? Thanks.

@@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInRead,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should fail earlier if the pattern string contains timezone? It doesn't make sense to have timezone in NTZ string.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moreover, maybe we should add a new CSV option to define pattern for timestamp ntz.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@github-actions github-actions bot added the DOCS label Nov 17, 2021
@sadikovi
Copy link
Contributor Author

cc @MaxGekk @cloud-fan @gengliangwang for review.

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49823/

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49823/

}

private def tryParseTimestampNTZ(field: String): DataType = {
if ((allCatch opt !timestampNTZFormatter.isTimeZoneSet(field)).getOrElse(false) &&
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: let's add one-line comment here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added, thanks.

@@ -164,6 +164,20 @@ class CSVOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat").orElse {
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's ignore the legacy behavior in TimestampNTZ.
For the casting between string and TimestampNTZ, we didn't respect it either.

}
}
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
if (SQLConf.get.legacyTimeParserPolicy == LegacyBehaviorPolicy.LEGACY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Nov 18, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49834/

val parsed = formatter.parse(s)
val parsedZoneId = parsed.query(TemporalQueries.zone())
parsedZoneId != null
} catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

try {
val (_, zoneIdOpt, _) = parseTimestampString(UTF8String.fromString(s))
zoneIdOpt.isDefined
} catch checkParsedDiff(s, legacyFormatter.isTimeZoneSet)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

@gengliangwang gengliangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the work

* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
stringToTimestampWithoutTimeZone(s, false)
Copy link
Contributor Author

@sadikovi sadikovi Nov 28, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used false to preserve the original behaviour in this method and the one below.

@sadikovi
Copy link
Contributor Author

@gengliangwang @cloud-fan I updated the code, could you do the penultimate review? Thanks.

@SparkQA
Copy link

SparkQA commented Nov 28, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50156/

@SparkQA
Copy link

SparkQA commented Nov 28, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50156/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Test build #145686 has finished for PR 34596 at commit 043edb6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50201/

@SparkQA
Copy link

SparkQA commented Nov 29, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50201/

@SparkQA
Copy link

SparkQA commented Nov 30, 2021

Test build #145731 has finished for PR 34596 at commit 1edef2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should maybe we skip the parsing if SQLConf.get.timestampType is set to TIMESTAMP_LTZ since parsing is non-trivial op?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate a bit more? Thanks.

My understanding was that the config indicated whether the output of parsing should be treated as TimestampNTZ or TimestampLTZ.

options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this part I'd defer to @MaxGekk to review.

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good otherwise

@@ -164,6 +164,10 @@ class CSVOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat")
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what's the reason to have the optional field [.SSS] in write. How should CSV writer decide whether to write milliseconds or not?

Another question, why the precision is in milliseconds but not in microseconds?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same reason as for timestampFormat above, I just copy-pasted it for timestampNTZFormat and removed the zone component.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is the same reason as for timestampFormat above ...

The option was added when Spark's timestamp type had milliseconds precision. Now the precision is microsecond, and don't see any reasons to loose info in write, by default.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can change it for both LTZ and NTZ, if the precision lose here is a problem.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather change it separately, sounds more like a general problem with timestamps.

StructType(StructField("a", TimestampNTZType) :: Nil),
Map.empty[String, String]) as "value")
.selectExpr("value.a")
checkAnswer(fromCsvDF, Row(null))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about positive test for the functions from_csv/to_csv and schema_of_csv?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a test right above that verifies the happy path for those functions. I only added a new test because it was missing from the original patch.

val path = s"${dir.getCanonicalPath}/csv"

val exp = spark.sql("select timestamp_ntz'2020-12-12 12:12:12' as col0")
exp.write.format("csv").option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss").save(path)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you test max precision with pattern .SSSSSS, please.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

@@ -1012,6 +1012,196 @@ abstract class CSVSuite
}
}

test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") {
withTempDir { dir =>
val path = s"${dir.getCanonicalPath}/csv"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just write directly to dir

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I could not because the directory already exists by the time the method is called. withTempDir creates a directory, but it needs to be non-existent for Spark to write into.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is withTempPath which doesn't create any dirs:

/**
* Generates a temporary path without creating the actual file/directory, then pass it to `f`. If
* a file/directory is created there by `f`, it will be delete after `f` returns.
*/
protected def withTempPath(f: File => Unit): Unit = {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to use withTempPath

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems there is a mixture of withTempDir and withTempPath in this file arguably being used with the same purpose. I will open a PR to fix the tests I added but I am not going to update other occurrences.

.option("timestampNTZFormat", "yyyy-MM-dd HH:mm:ss")
.load(path)

checkAnswer(res, exp)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAnswer doesn't check the type. Could you check that the inferred type is correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't that fail the answer since the values would be different? Yes, I can check the type explicitly, thanks.

@@ -1012,6 +1012,196 @@ abstract class CSVSuite
}
}

test("SPARK-37326: Use different pattern to write and infer TIMESTAMP_NTZ values") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test title says about different patterns but the patterns are the same in write and in inferring, in fact.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate please? The code does use different timestamp format compared to the default one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test title confuses because it can be read as the patterns in write and infer are different in the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me rewrite this.

.option("header", "true")
.load(path)

for (policy <- Seq("exception", "corrected", "legacy")) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is policy used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, good point. I forgot to change. Now it is updated.

@sadikovi
Copy link
Contributor Author

I addressed the comments, can you review again?

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50251/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/50251/

@SparkQA
Copy link

SparkQA commented Dec 1, 2021

Test build #145778 has finished for PR 34596 at commit feb3715.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@MaxGekk
Copy link
Member

MaxGekk commented Dec 1, 2021

+1, LGTM. Merging to master.
Thank you, @sadikovi and @gengliangwang @HyukjinKwon @cloud-fan for review.

@MaxGekk MaxGekk closed this in ce1f97f Dec 1, 2021
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name failOnError is misleading, as this method never fail. how about allowTimeZone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Respectfully, I have changed this exact code and variable 3 times by now. Yes, I will update 😞.

@@ -66,10 +68,23 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String): Long =
def parseWithoutTimeZone(s: String, failOnError: Boolean): Long =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, allowTimeZone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another question is, instead of adding a new parameter, can the caller side pick one of parse and parseWithoutTimeZone?

Copy link
Contributor Author

@sadikovi sadikovi Dec 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure we can do it, because parseWithoutTimeZone method can be used with different options depending on the context, and it is, in fact. Unfortunately, this is the artifact of the problem that we discussed earlier - some functions are not supposed to fail while parsing the value and others are.

@@ -2489,10 +2693,6 @@ abstract class CSVSuite
}

test("SPARK-36536: use casting when datetime pattern is not set") {
def isLegacy: Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should still test the legacy behavior for LTZ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, this is for TimestampNTZ change - isLegacy flag was only applied to TimestampNTZ, TimestampLTZ code is unchanged.

dongjoon-hyun pushed a commit that referenced this pull request Dec 1, 2021
### What changes were proposed in this pull request?

This PR fixes an issue that the test added in SPARK-37326 (#34596) fails with Java 11.
https://github.com/apache/spark/runs/4381645820?check_suite_focus=true#step:9:11681

The reason is that the error message `DateTimeFormatter` was changed as of Java 9.
https://bugs.openjdk.java.net/browse/JDK-8085887
http://hg.openjdk.java.net/jdk9/jdk9/jdk/rev/28df1af8e872

### Why are the changes needed?

To keep the build stable.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tests with the following command success.
```
JAVA_HOME=/path/to/java11/ build/sbt -Phive -Phive-thriftserver "testOnly org.apache.spark.sql.execution.datasources.csv.CSVv*Suite"
```

Closes #34771 from sarutak/followup-SPARK-37326.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
cloud-fan pushed a commit that referenced this pull request Dec 2, 2021
…support in CSV data source

### What changes were proposed in this pull request?

This is a follow-up PR to #34596. There were a few comments and suggestions raised after the PR was merged, so I addressed them in this follow-up:
- Instead of using `failOnError`, which was confusing as no error was thrown in the method, we use `allowTimeZone` which has an opposite meaning of `failOnError` and far more descriptive.
- I updated a few test names to resolve ambiguity.
- I changed the tests to use `withTempPath` as was suggested in the original PR.

### Why are the changes needed?

Code cleanup and clarifications.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing unit and integration tests.

Closes #34777 from sadikovi/timestamp-ntz-csv-follow-up.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants