Skip to content

Commit

Permalink
[SPARK-37326][SQL] Support TimestampNTZ in CSV data source
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This PR adds support for TimestampNTZ type in the CSV data source.

Most of the functionality has already been added, this patch verifies that writes + reads work for TimestampNTZ type and adds schema inference depending on the timestamp value format written. The following applies:
- If there is a mixture of `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values, use `TIMESTAMP_LTZ`.
- If there are only `TIMESTAMP_NTZ` values, resolve using the the default timestamp type configured with `spark.sql.timestampType`.

In addition, I introduced a new CSV option `timestampNTZFormat` which is similar to `timestampFormat` but it allows to configure read/write pattern for `TIMESTAMP_NTZ` types. It is basically a copy of timestamp pattern but without timezone.

The schema inference works in the following way:
1. We test if the field can be parsed a timestamp without timezone using timestampNTZFormat.
2. If the field has the timezone component, `parseWithoutTimeZone` method throws `QueryExecutionErrors.cannotParseStringAsDataTypeError` which is a `RuntimeException`.
3. Move on to parsing the field as timestamp with timezone (the existing logic).

### Why are the changes needed?

The current CSV source could write values as TimestampNTZ into a file but could not preserve this type when reading the file back, this PR fixes the issue.

### Does this PR introduce _any_ user-facing change?

Previously, CSV data source would infer timestamp values as `TimestampType` when reading a CSV file. Now, the data source would infer the timestamp value type based on the format (with or without timezone) and default timestamp type based on `spark.sql.timestampType`.

A new CSV option `timestampNTZFormat` is added to control the way values are formatted during writes or parsed during reads.

Now if the timestamp cannot be parsed as a timestamp without timezone, e.g. contains the zone-offset or zone-id component, `parseWithTimeZone` throws `RuntimeException` signalling the inference code to try the next type.

### How was this patch tested?

I extended `CSVSuite` with a few unit tests to verify that write-read roundtrip works for `TIMESTAMP_NTZ` and `TIMESTAMP_LTZ` values.

Closes #34596 from sadikovi/timestamp-ntz-support-csv.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
  • Loading branch information
sadikovi authored and MaxGekk committed Dec 1, 2021
1 parent f7be024 commit ce1f97f
Show file tree
Hide file tree
Showing 11 changed files with 331 additions and 30 deletions.
12 changes: 9 additions & 3 deletions docs/sql-data-sources-csv.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ license: |
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---

Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.
Spark SQL provides `spark.read().csv("file_name")` to read a file or directory of files in CSV format into Spark DataFrame, and `dataframe.write().csv("path")` to write to a CSV file. Function `option()` can be used to customize the behavior of reading or writing, such as controlling behavior of the header, delimiter character, character set, and so on.

<div class="codetabs">

Expand Down Expand Up @@ -162,6 +162,12 @@ Data source options of CSV can be set via:
<td>Sets the string that indicates a timestamp format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>timestampNTZFormat</code></td>
<td>yyyy-MM-dd'T'HH:mm:ss[.SSS]</td>
<td>Sets the string that indicates a timestamp without timezone format. Custom date formats follow the formats at <a href="https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html">Datetime Patterns</a>. This applies to timestamp without timezone type, note that zone-offset and time-zone components are not supported when writing or reading this data type.</td>
<td>read/write</td>
</tr>
<tr>
<td><code>maxColumns</code></td>
<td>20480</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {
Expand All @@ -38,6 +39,13 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)

private val timestampNTZFormatter = TimestampFormatter(
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
forTimestampNTZ = true)

private val decimalParser = if (options.locale == Locale.US) {
// Special handling the default locale for backward compatibility
s: String => new java.math.BigDecimal(s)
Expand Down Expand Up @@ -109,6 +117,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
case LongType => tryParseLong(field)
case _: DecimalType => tryParseDecimal(field)
case DoubleType => tryParseDouble(field)
case TimestampNTZType => tryParseTimestampNTZ(field)
case TimestampType => tryParseTimestamp(field)
case BooleanType => tryParseBoolean(field)
case StringType => StringType
Expand Down Expand Up @@ -160,6 +169,17 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
private def tryParseDouble(field: String): DataType = {
if ((allCatch opt field.toDouble).isDefined || isInfOrNan(field)) {
DoubleType
} else {
tryParseTimestampNTZ(field)
}
}

private def tryParseTimestampNTZ(field: String): DataType = {
// We can only parse the value as TimestampNTZType if it does not have zone-offset or
// time-zone component and can be parsed with the timestamp formatter.
// Otherwise, it is likely to be a timestamp with timezone.
if ((allCatch opt timestampNTZFormatter.parseWithoutTimeZone(field, true)).isDefined) {
SQLConf.get.timestampType
} else {
tryParseTimestamp(field)
}
Expand Down Expand Up @@ -225,6 +245,10 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {
} else {
Some(DecimalType(range + scale, scale))
}

case (TimestampNTZType, TimestampType) | (TimestampType, TimestampNTZType) =>
Some(TimestampType)

case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,10 @@ class CSVOptions(
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS][XXX]"
})

val timestampNTZFormatInRead: Option[String] = parameters.get("timestampNTZFormat")
val timestampNTZFormatInWrite: String = parameters.getOrElse("timestampNTZFormat",
s"${DateFormatter.defaultPattern}'T'HH:mm:ss[.SSS]")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

val maxColumns = getInt("maxColumns", 20480)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class UnivocityGenerator(
legacyFormat = FAST_DATE_FORMAT,
isParsing = false)
private val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInWrite,
options.timestampNTZFormatInWrite,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class UnivocityParser(
legacyFormat = FAST_DATE_FORMAT,
isParsing = true)
private lazy val timestampNTZFormatter = TimestampFormatter(
options.timestampFormatInRead,
options.timestampNTZFormatInRead,
options.zoneId,
legacyFormat = FAST_DATE_FORMAT,
isParsing = true,
Expand Down Expand Up @@ -204,7 +204,7 @@ class UnivocityParser(

case _: TimestampNTZType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
timestampNTZFormatter.parseWithoutTimeZone(datum)
timestampNTZFormatter.parseWithoutTimeZone(datum, true)
}

case _: DateType => (d: String) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,17 +442,22 @@ object DateTimeUtils {

/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the epoch. The result is independent of time zones,
* which means that zone ID in the input string will be ignored.
* number of microseconds since the epoch. The result will be independent of time zones.
*
* If the input string contains a component associated with time zone, the method will return
* `None` if `failOnError` is set to `true`. If `failOnError` is set to `false`, the method
* will simply discard the time zone component. Enable the check to detect situations like parsing
* a timestamp with time zone as TimestampNTZType.
*
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
def stringToTimestampWithoutTimeZone(s: UTF8String, failOnError: Boolean): Option[Long] = {
try {
val (segments, _, justTime) = parseTimestampString(s)
// If the input string can't be parsed as a timestamp, or it contains only the time part of a
// timestamp and we can't determine its date, return None.
if (segments.isEmpty || justTime) {
val (segments, zoneIdOpt, justTime) = parseTimestampString(s)
// If the input string can't be parsed as a timestamp without time zone, or it contains only
// the time part of a timestamp and we can't determine its date, return None.
if (segments.isEmpty || justTime || failOnError && zoneIdOpt.isDefined) {
return None
}
val nanoseconds = MICROSECONDS.toNanos(segments(6))
Expand All @@ -465,8 +470,19 @@ object DateTimeUtils {
}
}

/**
* Trims and parses a given UTF8 string to a corresponding [[Long]] value which representing the
* number of microseconds since the epoch. The result is independent of time zones. Zone id
* component will be discarded and ignored.
* The return type is [[Option]] in order to distinguish between 0L and null. Please
* refer to `parseTimestampString` for the allowed formats.
*/
def stringToTimestampWithoutTimeZone(s: UTF8String): Option[Long] = {
stringToTimestampWithoutTimeZone(s, false)
}

def stringToTimestampWithoutTimeZoneAnsi(s: UTF8String): Long = {
stringToTimestampWithoutTimeZone(s).getOrElse {
stringToTimestampWithoutTimeZone(s, false).getOrElse {
throw QueryExecutionErrors.cannotCastToDateTimeError(s, TimestampNTZType)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import org.apache.spark.sql.catalyst.util.DateTimeConstants._
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.{LegacyDateFormat, LENIENT_SIMPLE_DATE_FORMAT}
import org.apache.spark.sql.catalyst.util.RebaseDateTime._
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._
import org.apache.spark.sql.types.Decimal
import org.apache.spark.sql.types.{Decimal, TimestampNTZType}
import org.apache.spark.unsafe.types.UTF8String

sealed trait TimestampFormatter extends Serializable {
Expand All @@ -55,6 +56,7 @@ sealed trait TimestampFormatter extends Serializable {
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
*
* @param s - string with timestamp to parse
* @param failOnError - indicates strict parsing of timezone
* @return microseconds since epoch.
* @throws ParseException can be thrown by legacy parser
* @throws DateTimeParseException can be thrown by new parser
Expand All @@ -66,10 +68,23 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
def parseWithoutTimeZone(s: String): Long =
def parseWithoutTimeZone(s: String, failOnError: Boolean): Long =
throw new IllegalStateException(
s"The method `parseWithoutTimeZone(s: String)` should be implemented in the formatter " +
"of timestamp without time zone")
s"The method `parseWithoutTimeZone(s: String, failOnError: Boolean)` should be " +
"implemented in the formatter of timestamp without time zone")

/**
* Parses a timestamp in a string and converts it to microseconds since Unix Epoch in local time.
* Zone-id and zone-offset components are ignored.
*/
@throws(classOf[ParseException])
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
@throws(classOf[IllegalStateException])
final def parseWithoutTimeZone(s: String): Long =
// This is implemented to adhere to the original behaviour of `parseWithoutTimeZone` where we
// did not fail if timestamp contained zone-id or zone-offset component and instead ignored it.
parseWithoutTimeZone(s, false)

def format(us: Long): String
def format(ts: Timestamp): String
Expand Down Expand Up @@ -118,9 +133,12 @@ class Iso8601TimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String): Long = {
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
try {
val parsed = formatter.parse(s)
if (failOnError && parsed.query(TemporalQueries.zone()) != null) {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(pattern, s, TimestampNTZType)
}
val localDate = toLocalDate(parsed)
val localTime = toLocalTime(parsed)
DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(localDate, localTime))
Expand Down Expand Up @@ -186,9 +204,13 @@ class DefaultTimestampFormatter(
} catch checkParsedDiff(s, legacyFormatter.parse)
}

override def parseWithoutTimeZone(s: String): Long = {
override def parseWithoutTimeZone(s: String, failOnError: Boolean): Long = {
try {
DateTimeUtils.stringToTimestampWithoutTimeZoneAnsi(UTF8String.fromString(s))
val utf8Value = UTF8String.fromString(s)
DateTimeUtils.stringToTimestampWithoutTimeZone(utf8Value, failOnError).getOrElse {
throw QueryExecutionErrors.cannotParseStringAsDataTypeError(
TimestampFormatter.defaultPattern(), s, TimestampNTZType)
}
} catch checkParsedDiff(s, legacyFormatter.parse)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,13 @@ object QueryExecutionErrors {
s"[$token] as target spark data type [$dataType].")
}

def cannotParseStringAsDataTypeError(pattern: String, value: String, dataType: DataType)
: Throwable = {
new RuntimeException(
s"Cannot parse field value ${value} for pattern ${pattern} " +
s"as target spark data type [$dataType].")
}

def failToParseEmptyStringForDataTypeError(dataType: DataType): Throwable = {
new RuntimeException(
s"Failed to parse an empty string for data type ${dataType.catalogString}")
Expand Down Expand Up @@ -1894,4 +1901,3 @@ object QueryExecutionErrors {
new RuntimeException("Unable to convert timestamp of Orc to data type 'timestamp_ntz'")
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,18 @@ class DateTimeUtilsSuite extends SparkFunSuite with Matchers with SQLHelper {
checkStringToTimestamp("2021-01-01T12:30:4294967297+4294967297:30", None)
}

test("SPARK-37326: stringToTimestampWithoutTimeZone with failOnError") {
assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), false) ==
Some(DateTimeUtils.localDateTimeToMicros(LocalDateTime.of(2021, 11, 22, 10, 54, 27))))

assert(
stringToTimestampWithoutTimeZone(
UTF8String.fromString("2021-11-22 10:54:27 +08:00"), true) ==
None)
}

test("SPARK-15379: special invalid date string") {
// Test stringToDate
assert(toDate("2015-02-29 00:00:00").isEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,15 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
.selectExpr("value.a")
checkAnswer(fromCsvDF, Row(localDT))
}

test("SPARK-37326: Handle incorrectly formatted timestamp_ntz values in from_csv") {
val fromCsvDF = Seq("2021-08-12T15:16:23.000+11:00").toDF("csv")
.select(
from_csv(
$"csv",
StructType(StructField("a", TimestampNTZType) :: Nil),
Map.empty[String, String]) as "value")
.selectExpr("value.a")
checkAnswer(fromCsvDF, Row(null))
}
}
Loading

0 comments on commit ce1f97f

Please sign in to comment.