Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26243][SQL] Use java.time API for parsing timestamps and dates from JSON #23196

Closed
wants to merge 32 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
fb10b91
Adding DateTimeFormatter
MaxGekk Dec 1, 2018
a9b39ec
Support DateTimeFormatter by JacksonParser and JacksonGenerator
MaxGekk Dec 1, 2018
ff589f5
Make test independent from current time zone
MaxGekk Dec 1, 2018
4646ded
Fix a test by new fallback
MaxGekk Dec 1, 2018
1c838e0
Set time zone explicitly
MaxGekk Dec 1, 2018
142f301
Updating the migration guide
MaxGekk Dec 1, 2018
606da21
Fix the migration guide by replacing CSV by JSON
MaxGekk Dec 1, 2018
f326042
Inlining method's arguments
MaxGekk Dec 1, 2018
4120228
A test for roundtrip timestamp parsing
MaxGekk Dec 2, 2018
6689747
Merge remote-tracking branch 'origin/master' into json-time-parser
MaxGekk Dec 2, 2018
e575162
Set time zone to GMT to eliminate of situation when time zone offset …
MaxGekk Dec 2, 2018
a35d5bf
UTC -> GMT
MaxGekk Dec 2, 2018
2a2085d
Using floorDiv to take days from seconds
MaxGekk Dec 2, 2018
55f2eac
Removing unnecessary time zone settings
MaxGekk Dec 2, 2018
57600e2
Merge remote-tracking branch 'origin/master' into json-time-parser
MaxGekk Dec 4, 2018
07fcf46
Using legacy parser in HiveCompatibilitySuite
MaxGekk Dec 5, 2018
6b6ea8a
Enable new parser in HiveCompatibilitySuit
MaxGekk Dec 7, 2018
244654b
Remove saving legacy parser settings
MaxGekk Dec 7, 2018
015fdce
Updating migration guide
MaxGekk Dec 8, 2018
96529f5
Making date parser independent from time zones
MaxGekk Dec 12, 2018
07d6031
Test refactoring
MaxGekk Dec 13, 2018
d761dee
protected is added
MaxGekk Dec 13, 2018
24b1e3d
toInstant -> toInstantWithZoneId
MaxGekk Dec 13, 2018
9a11515
Set time zone in the test
MaxGekk Dec 13, 2018
4b01d05
GMT -> UTC
MaxGekk Dec 13, 2018
0c7b96b
DateTimeFormatter -> TimestampFormatter
MaxGekk Dec 13, 2018
bbaff09
timeParser -> timestampParser
MaxGekk Dec 13, 2018
8af9df9
Round trip tests
MaxGekk Dec 14, 2018
363482e
Renaming test suite
MaxGekk Dec 14, 2018
07e0bf8
Added withClue
MaxGekk Dec 14, 2018
c12da1f
Put test under legacy time parser
MaxGekk Dec 14, 2018
60ab5b1
TODO
MaxGekk Dec 15, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide-upgrade.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ displayTitle: Spark SQL Upgrading Guide

- Spark applications which are built with Spark version 2.4 and prior, and call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, as they are not binary compatible with Spark 3.0.

- Since Spark 3.0, CSV datasource uses java.time API for parsing and generating CSV content. New formatting implementation supports date/timestamp patterns conformed to ISO 8601. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.
- Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and generating CSV/JSON content. In Spark version 2.4 and earlier, java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` with the pattern `yyyy-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 because the timestamp does not match to the pattern but it can be parsed by earlier Spark versions due to a fallback to `Timestamp.valueOf`. To parse the same timestamp since Spark 3.0, the pattern should be `yyyy-MM-dd HH:mm:ss.SSS`. To switch back to the implementation used in Spark 2.4 and earlier, set `spark.sql.legacy.timeParser.enabled` to `true`.

## Upgrading From Spark SQL 2.3 to 2.4

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import scala.util.control.Exception.allCatch
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.analysis.TypeCoercion
import org.apache.spark.sql.catalyst.expressions.ExprUtils
import org.apache.spark.sql.catalyst.util.DateTimeFormatter
import org.apache.spark.sql.catalyst.util.TimestampFormatter
import org.apache.spark.sql.types._

class CSVInferSchema(val options: CSVOptions) extends Serializable {

@transient
private lazy val timeParser = DateTimeFormatter(
private lazy val timestampParser = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
Expand Down Expand Up @@ -160,7 +160,7 @@ class CSVInferSchema(val options: CSVOptions) extends Serializable {

private def tryParseTimestamp(field: String): DataType = {
// This case infers a custom `dataFormat` is set.
if ((allCatch opt timeParser.parse(field)).isDefined) {
if ((allCatch opt timestampParser.parse(field)).isDefined) {
TimestampType
} else {
tryParseBoolean(field)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeFormatter}
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.types._

class UnivocityGenerator(
Expand All @@ -41,18 +41,18 @@ class UnivocityGenerator(
private val valueConverters: Array[ValueConverter] =
schema.map(_.dataType).map(makeConverter).toArray

private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

private def makeConverter(dataType: DataType): ValueConverter = dataType match {
case DateType =>
(row: InternalRow, ordinal: Int) => dateFormatter.format(row.getInt(ordinal))

case TimestampType =>
(row: InternalRow, ordinal: Int) => timeFormatter.format(row.getLong(ordinal))
(row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))

case udt: UserDefinedType[_] => makeConverter(udt.sqlType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ class UnivocityParser(

private val row = new GenericInternalRow(requiredSchema.length)

private val timeFormatter = DateTimeFormatter(
private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.timeZone, options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

// Retrieve the raw record string.
private def getCurrentInput: UTF8String = {
Expand Down Expand Up @@ -158,7 +158,7 @@ class UnivocityParser(
}

case _: TimestampType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(timeFormatter.parse)
nullSafeDatum(d, name, nullable, options)(timestampFormatter.parse)

case _: DateType => (d: String) =>
nullSafeDatum(d, name, nullable, options)(dateFormatter.parse)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.nio.charset.{Charset, StandardCharsets}
import java.util.{Locale, TimeZone}

import com.fasterxml.jackson.core.{JsonFactory, JsonParser}
import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util._
Expand Down Expand Up @@ -82,13 +81,10 @@ private[sql] class JSONOptions(
val timeZone: TimeZone = DateTimeUtils.getTimeZone(
parameters.getOrElse(DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId))

// Uses `FastDateFormat` which can be direct replacement for `SimpleDateFormat` and thread-safe.
val dateFormat: FastDateFormat =
FastDateFormat.getInstance(parameters.getOrElse("dateFormat", "yyyy-MM-dd"), locale)
val dateFormat: String = parameters.getOrElse("dateFormat", "yyyy-MM-dd")

val timestampFormat: FastDateFormat =
FastDateFormat.getInstance(
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"), timeZone, locale)
val timestampFormat: String =
parameters.getOrElse("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss.SSSXXX")

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import com.fasterxml.jackson.core._

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils, MapData}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -77,6 +77,12 @@ private[sql] class JacksonGenerator(

private val lineSeparator: String = options.lineSeparatorInWrite

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

private def makeWriter(dataType: DataType): ValueWriter = dataType match {
case NullType =>
(row: SpecializedGetters, ordinal: Int) =>
Expand Down Expand Up @@ -116,14 +122,12 @@ private[sql] class JacksonGenerator(

case TimestampType =>
(row: SpecializedGetters, ordinal: Int) =>
val timestampString =
options.timestampFormat.format(DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
val timestampString = timestampFormatter.format(row.getLong(ordinal))
gen.writeString(timestampString)

case DateType =>
(row: SpecializedGetters, ordinal: Int) =>
val dateString =
options.dateFormat.format(DateTimeUtils.toJavaDate(row.getInt(ordinal)))
val dateString = dateFormatter.format(row.getInt(ordinal))
gen.writeString(dateString)

case BinaryType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ class JacksonParser(
private val factory = new JsonFactory()
options.setJacksonOptions(factory)

private val timestampFormatter = TimestampFormatter(
options.timestampFormat,
options.timeZone,
options.locale)
private val dateFormatter = DateFormatter(options.dateFormat, options.locale)

/**
* Create a converter which converts the JSON documents held by the `JsonParser`
* to a value according to a desired schema. This is a wrapper for the method
Expand Down Expand Up @@ -218,17 +224,7 @@ class JacksonParser(
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.
Long.box {
Try(options.timestampFormat.parse(stringValue).getTime * 1000L)
.getOrElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
DateTimeUtils.stringToTime(stringValue).getTime * 1000L
}
}
timestampFormatter.parse(parser.getText)

case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
Expand All @@ -237,22 +233,7 @@ class JacksonParser(
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
val stringValue = parser.getText
// This one will lose microseconds parts.
// See https://issues.apache.org/jira/browse/SPARK-10681.x
Int.box {
Try(DateTimeUtils.millisToDays(options.dateFormat.parse(stringValue).getTime))
.orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility.
Try(DateTimeUtils.millisToDays(DateTimeUtils.stringToTime(stringValue).getTime))
}
.getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
stringValue.toInt
}
}
dateFormatter.parse(parser.getText)
}

case BinaryType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.util

import java.time._
import java.time.format.DateTimeFormatterBuilder
import java.time.temporal.{ChronoField, TemporalQueries}
import java.time.temporal.{ChronoField, TemporalAccessor, TemporalQueries}
import java.util.{Locale, TimeZone}

import scala.util.Try
Expand All @@ -28,31 +28,44 @@ import org.apache.commons.lang3.time.FastDateFormat

import org.apache.spark.sql.internal.SQLConf

sealed trait DateTimeFormatter {
sealed trait TimestampFormatter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did we name it TimestampFormatter? It has DateFormatter as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we have another trait: DateFormatter

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh, sorry I mean the file name @cloud-fan.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix it in #23329

def parse(s: String): Long // returns microseconds since epoch
def format(us: Long): String
}

class Iso8601DateTimeFormatter(
trait FormatterUtils {
protected def zoneId: ZoneId
protected def buildFormatter(
pattern: String,
locale: Locale): java.time.format.DateTimeFormatter = {
new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
}
protected def toInstantWithZoneId(temporalAccessor: TemporalAccessor): java.time.Instant = {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, zoneId)
Instant.from(zonedDateTime)
}
}

class Iso8601TimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
val formatter = new DateTimeFormatterBuilder()
.appendPattern(pattern)
.parseDefaulting(ChronoField.YEAR_OF_ERA, 1970)
.parseDefaulting(ChronoField.MONTH_OF_YEAR, 1)
.parseDefaulting(ChronoField.DAY_OF_MONTH, 1)
.parseDefaulting(ChronoField.HOUR_OF_DAY, 0)
.parseDefaulting(ChronoField.MINUTE_OF_HOUR, 0)
.parseDefaulting(ChronoField.SECOND_OF_MINUTE, 0)
.toFormatter(locale)
locale: Locale) extends TimestampFormatter with FormatterUtils {
val zoneId = timeZone.toZoneId
val formatter = buildFormatter(pattern, locale)

def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
if (temporalAccessor.query(TemporalQueries.offset()) == null) {
val localDateTime = LocalDateTime.from(temporalAccessor)
val zonedDateTime = ZonedDateTime.of(localDateTime, timeZone.toZoneId)
Instant.from(zonedDateTime)
toInstantWithZoneId(temporalAccessor)
} else {
Instant.from(temporalAccessor)
}
Expand All @@ -75,10 +88,10 @@ class Iso8601DateTimeFormatter(
}
}

class LegacyDateTimeFormatter(
class LegacyTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateTimeFormatter {
locale: Locale) extends TimestampFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)

protected def toMillis(s: String): Long = format.parse(s).getTime
Expand All @@ -90,21 +103,21 @@ class LegacyDateTimeFormatter(
}
}

class LegacyFallbackDateTimeFormatter(
class LegacyFallbackTimestampFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateTimeFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) {
override def toMillis(s: String): Long = {
Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime)
}
}

object DateTimeFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateTimeFormatter = {
object TimestampFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateTimeFormatter(format, timeZone, locale)
new LegacyFallbackTimestampFormatter(format, timeZone, locale)
} else {
new Iso8601DateTimeFormatter(format, timeZone, locale)
new Iso8601TimestampFormatter(format, timeZone, locale)
}
}
}
Expand All @@ -116,29 +129,32 @@ sealed trait DateFormatter {

class Iso8601DateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
locale: Locale) extends DateFormatter with FormatterUtils {

val zoneId = ZoneId.of("UTC")

val formatter = buildFormatter(pattern, locale)

val dateTimeFormatter = new Iso8601DateTimeFormatter(pattern, timeZone, locale)
def toInstant(s: String): Instant = {
val temporalAccessor = formatter.parse(s)
toInstantWithZoneId(temporalAccessor)
}

override def parse(s: String): Int = {
val seconds = dateTimeFormatter.toInstant(s).getEpochSecond
val seconds = toInstant(s).getEpochSecond
val days = Math.floorDiv(seconds, DateTimeUtils.SECONDS_PER_DAY)

days.toInt
}

override def format(days: Int): String = {
val instant = Instant.ofEpochSecond(days * DateTimeUtils.SECONDS_PER_DAY)
dateTimeFormatter.formatter.withZone(timeZone.toZoneId).format(instant)
formatter.withZone(zoneId).format(instant)
}
}

class LegacyDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, timeZone, locale)
class LegacyDateFormatter(pattern: String, locale: Locale) extends DateFormatter {
val format = FastDateFormat.getInstance(pattern, locale)

def parse(s: String): Int = {
val milliseconds = format.parse(s).getTime
Expand All @@ -153,8 +169,7 @@ class LegacyDateFormatter(

class LegacyFallbackDateFormatter(
pattern: String,
timeZone: TimeZone,
locale: Locale) extends LegacyDateFormatter(pattern, timeZone, locale) {
locale: Locale) extends LegacyDateFormatter(pattern, locale) {
override def parse(s: String): Int = {
Try(super.parse(s)).orElse {
// If it fails to parse, then tries the way used in 2.0 and 1.x for backwards
Expand All @@ -169,11 +184,11 @@ class LegacyFallbackDateFormatter(
}

object DateFormatter {
def apply(format: String, timeZone: TimeZone, locale: Locale): DateFormatter = {
def apply(format: String, locale: Locale): DateFormatter = {
if (SQLConf.get.legacyTimeParserEnabled) {
new LegacyFallbackDateFormatter(format, timeZone, locale)
new LegacyFallbackDateFormatter(format, locale)
} else {
new Iso8601DateFormatter(format, timeZone, locale)
new Iso8601DateFormatter(format, locale)
}
}
}
Loading