Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-31762][SQL] Fix perf regression of date/timestamp formatting in toHiveString #28582

Closed
wants to merge 12 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy._

sealed trait DateFormatter extends Serializable {
def parse(s: String): Int // returns days since epoch

def format(days: Int): String
def format(date: Date): String
def format(localDate: LocalDate): String
}

class Iso8601DateFormatter(
Expand All @@ -56,37 +59,47 @@ class Iso8601DateFormatter(
}
}

override def format(localDate: LocalDate): String = {
localDate.format(formatter)
}

override def format(days: Int): String = {
LocalDate.ofEpochDay(days).format(formatter)
format(LocalDate.ofEpochDay(days))
}

override def format(date: Date): String = {
legacyFormatter.format(date)
}
}

trait LegacyDateFormatter extends DateFormatter {
def parseToDate(s: String): Date
def formatDate(d: Date): String

override def parse(s: String): Int = {
fromJavaDate(new java.sql.Date(parseToDate(s).getTime))
}

override def format(days: Int): String = {
val date = DateTimeUtils.toJavaDate(days)
formatDate(date)
format(DateTimeUtils.toJavaDate(days))
}

override def format(localDate: LocalDate): String = {
format(localDateToDays(localDate))
}
}

class LegacyFastDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
@transient
private lazy val fdf = FastDateFormat.getInstance(pattern, locale)
override def parseToDate(s: String): Date = fdf.parse(s)
override def formatDate(d: Date): String = fdf.format(d)
override def format(d: Date): String = fdf.format(d)
}

class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyDateFormatter {
@transient
private lazy val sdf = new SimpleDateFormat(pattern, locale)
override def parseToDate(s: String): Date = sdf.parse(s)
override def formatDate(d: Date): String = sdf.format(d)
override def format(d: Date): String = sdf.format(d)
}

object DateFormatter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
def parse(s: String): Long

def format(us: Long): String
def format(ts: Timestamp): String
def format(instant: Instant): String
}

class Iso8601TimestampFormatter(
Expand Down Expand Up @@ -84,9 +87,17 @@ class Iso8601TimestampFormatter(
}
}

override def format(instant: Instant): String = {
formatter.withZone(zoneId).format(instant)
}

override def format(us: Long): String = {
val instant = DateTimeUtils.microsToInstant(us)
formatter.withZone(zoneId).format(instant)
format(instant)
}

override def format(ts: Timestamp): String = {
legacyFormatter.format(ts)
}
}

Expand All @@ -100,10 +111,26 @@ class Iso8601TimestampFormatter(
*/
class FractionTimestampFormatter(zoneId: ZoneId)
extends Iso8601TimestampFormatter(
"", zoneId, TimestampFormatter.defaultLocale, needVarLengthSecondFraction = false) {
TimestampFormatter.defaultPattern,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's wrong if we still pass the empty string as the pattern?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty result

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, it's used by the legacy formatter.

zoneId,
TimestampFormatter.defaultLocale,
needVarLengthSecondFraction = false) {

@transient
override protected lazy val formatter = DateTimeFormatterHelper.fractionFormatter

// Converts Timestamp to string according to Hive TimestampWritable convention.
// The code is borrowed from Spark 2.4 DateTimeUtils.timestampToString
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it really needed? DateTimeFormatterHelper.fractionFormatter should omit tailing 0 already.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should omit tailing 0 already.

The reason of making this PR is to pass java.sql.Timestamp to a legacy formatter which can accept the type but the legacy formatter of fractionFormatter is SimpleDateFormat which cannot omit tailing 0.

is it really needed?

I think so. What do you propose instead of it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make the comment more clear:

// The new formatter will omit the trailing 0 in the timestamp string, but the legacy formatter can't.
// Here we borrow the code from Spark 2.4 DateTimeUtils.timestampToString to omit the
// trailing 0 for the legacy formatter as well.

We don't need to mention hive at all. This is just for internal consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I updated the comment.

override def format(ts: Timestamp): String = {
val timestampString = ts.toString
val formatted = legacyFormatter.format(ts)

if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
formatted + timestampString.substring(19)
} else {
formatted
}
}
}

/**
Expand Down Expand Up @@ -149,7 +176,7 @@ class LegacyFastTimestampFormatter(
fastDateFormat.getTimeZone,
fastDateFormat.getPattern.count(_ == 'S'))

def parse(s: String): SQLTimestamp = {
override def parse(s: String): SQLTimestamp = {
cal.clear() // Clear the calendar because it can be re-used many times
if (!fastDateFormat.parse(s, new ParsePosition(0), cal)) {
throw new IllegalArgumentException(s"'$s' is an invalid timestamp")
Expand All @@ -160,12 +187,20 @@ class LegacyFastTimestampFormatter(
rebaseJulianToGregorianMicros(julianMicros)
}

def format(timestamp: SQLTimestamp): String = {
override def format(timestamp: SQLTimestamp): String = {
val julianMicros = rebaseGregorianToJulianMicros(timestamp)
cal.setTimeInMillis(Math.floorDiv(julianMicros, MICROS_PER_SECOND) * MILLIS_PER_SECOND)
cal.setMicros(Math.floorMod(julianMicros, MICROS_PER_SECOND))
fastDateFormat.format(cal)
}

override def format(ts: Timestamp): String = {
format(fromJavaTimestamp(ts))
}

override def format(instant: Instant): String = {
format(instantToMicros(instant))
}
}

class LegacySimpleTimestampFormatter(
Expand All @@ -187,6 +222,14 @@ class LegacySimpleTimestampFormatter(
override def format(us: Long): String = {
sdf.format(toJavaTimestamp(us))
}

override def format(ts: Timestamp): String = {
sdf.format(ts)
}

override def format(instant: Instant): String = {
format(instantToMicros(instant))
}
}

object LegacyDateFormats extends Enumeration {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.time.{DateTimeException, LocalDate, ZoneOffset}
import org.apache.spark.{SparkFunSuite, SparkUpgradeException}
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, localDateToDays}
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy

Expand All @@ -41,8 +41,11 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter(getZoneId(timeZone))
val date = formatter.format(17867)
assert(date === "2018-12-02")
val (days, expected) = (17867, "2018-12-02")
val date = formatter.format(days)
assert(date === expected)
assert(formatter.format(daysToLocalDate(days)) === expected)
assert(formatter.format(toJavaDate(days)) === expected)
}
}
}
Expand Down Expand Up @@ -70,8 +73,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
DateFormatter.defaultLocale,
legacyFormat)
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
assert(date === formatter.format(days))
assert(date === formatter.format(daysToLocalDate(days)))
assert(date === formatter.format(toJavaDate(days)))
}
}
}
Expand Down Expand Up @@ -170,7 +174,9 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
DateFormatter.defaultLocale,
legacyFormat)
assert(LocalDate.ofEpochDay(formatter.parse("1000-01-01")) === LocalDate.of(1000, 1, 1))
assert(formatter.format(LocalDate.of(1000, 1, 1)) === "1000-01-01")
assert(formatter.format(localDateToDays(LocalDate.of(1000, 1, 1))) === "1000-01-01")
assert(formatter.format(java.sql.Date.valueOf("1000-01-01")) === "1000-01-01")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,29 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
test("format timestamps using time zones") {
val microsSinceEpoch = 1543745472001234L
val expectedTimestamp = Map(
"UTC" -> "2018-12-02T10:11:12.001234",
PST.getId -> "2018-12-02T02:11:12.001234",
CET.getId -> "2018-12-02T11:11:12.001234",
"Africa/Dakar" -> "2018-12-02T10:11:12.001234",
"America/Los_Angeles" -> "2018-12-02T02:11:12.001234",
"Antarctica/Vostok" -> "2018-12-02T16:11:12.001234",
"Asia/Hong_Kong" -> "2018-12-02T18:11:12.001234",
"Europe/Amsterdam" -> "2018-12-02T11:11:12.001234")
"UTC" -> "2018-12-02 10:11:12.001234",
PST.getId -> "2018-12-02 02:11:12.001234",
CET.getId -> "2018-12-02 11:11:12.001234",
"Africa/Dakar" -> "2018-12-02 10:11:12.001234",
"America/Los_Angeles" -> "2018-12-02 02:11:12.001234",
"Antarctica/Vostok" -> "2018-12-02 16:11:12.001234",
"Asia/Hong_Kong" -> "2018-12-02 18:11:12.001234",
"Europe/Amsterdam" -> "2018-12-02 11:11:12.001234")
DateTimeTestUtils.outstandingTimezonesIds.foreach { zoneId =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
DateTimeUtils.getZoneId(zoneId))
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(zoneId))
Seq(
TimestampFormatter(
"yyyy-MM-dd HH:mm:ss.SSSSSS",
getZoneId(zoneId),
// Test only FAST_DATE_FORMAT because other legacy formats don't support formatting
// in microsecond precision.
LegacyDateFormats.FAST_DATE_FORMAT,
needVarLengthSecondFraction = false),
TimestampFormatter.getFractionFormatter(getZoneId(zoneId))).foreach { formatter =>
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(zoneId))
assert(formatter.format(microsToInstant(microsSinceEpoch)) === expectedTimestamp(zoneId))
assert(formatter.format(toJavaTimestamp(microsSinceEpoch)) === expectedTimestamp(zoneId))
}
}
}

Expand Down Expand Up @@ -125,20 +134,30 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers
}

test("format fraction of second") {
val formatter = TimestampFormatter.getFractionFormatter(ZoneOffset.UTC)
assert(formatter.format(0) === "1970-01-01 00:00:00")
assert(formatter.format(1) === "1970-01-01 00:00:00.000001")
assert(formatter.format(1000) === "1970-01-01 00:00:00.001")
assert(formatter.format(900000) === "1970-01-01 00:00:00.9")
assert(formatter.format(1000000) === "1970-01-01 00:00:01")
val formatter = TimestampFormatter.getFractionFormatter(UTC)
Seq(
0 -> "1970-01-01 00:00:00",
1 -> "1970-01-01 00:00:00.000001",
1000 -> "1970-01-01 00:00:00.001",
900000 -> "1970-01-01 00:00:00.9",
1000000 -> "1970-01-01 00:00:01").foreach { case (micros, tsStr) =>
assert(formatter.format(micros) === tsStr)
assert(formatter.format(microsToInstant(micros)) === tsStr)
DateTimeTestUtils.withDefaultTimeZone(UTC) {
assert(formatter.format(toJavaTimestamp(micros)) === tsStr)
}
}
}

test("formatting negative years with default pattern") {
val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0)
.atZone(ZoneOffset.UTC)
.toInstant
val instant = LocalDateTime.of(-99, 1, 1, 0, 0, 0).atZone(UTC).toInstant
val micros = DateTimeUtils.instantToMicros(instant)
assert(TimestampFormatter(ZoneOffset.UTC).format(micros) === "-0099-01-01 00:00:00")
assert(TimestampFormatter(UTC).format(micros) === "-0099-01-01 00:00:00")
assert(TimestampFormatter(UTC).format(instant) === "-0099-01-01 00:00:00")
DateTimeTestUtils.withDefaultTimeZone(UTC) { // toJavaTimestamp depends on the default time zone
assert(TimestampFormatter("yyyy-MM-dd HH:mm:SS G", UTC).format(toJavaTimestamp(micros))
=== "0100-01-01 00:00:00 BC")
}
}

test("special timestamp values") {
Expand Down Expand Up @@ -266,24 +285,31 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper with Matchers

test("SPARK-31557: rebasing in legacy formatters/parsers") {
withSQLConf(SQLConf.LEGACY_TIME_PARSER_POLICY.key -> LegacyBehaviorPolicy.LEGACY.toString) {
LegacyDateFormats.values.foreach { legacyFormat =>
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
withClue(s"${zoneId.getId} legacyFormat = $legacyFormat") {
val formatter = TimestampFormatter(
DateTimeTestUtils.outstandingZoneIds.foreach { zoneId =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> zoneId.getId) {
DateTimeTestUtils.withDefaultTimeZone(zoneId) {
withClue(s"zoneId = ${zoneId.getId}") {
val formatters = LegacyDateFormats.values.map { legacyFormat =>
TimestampFormatter(
TimestampFormatter.defaultPattern,
zoneId,
TimestampFormatter.defaultLocale,
legacyFormat,
needVarLengthSecondFraction = false)
}.toSeq :+ TimestampFormatter.getFractionFormatter(zoneId)
formatters.foreach { formatter =>
assert(microsToInstant(formatter.parse("1000-01-01 01:02:03"))
.atZone(zoneId)
.toLocalDateTime === LocalDateTime.of(1000, 1, 1, 1, 2, 3))

assert(formatter.format(
LocalDateTime.of(1000, 1, 1, 1, 2, 3).atZone(zoneId).toInstant) ===
"1000-01-01 01:02:03")
assert(formatter.format(instantToMicros(
LocalDateTime.of(1000, 1, 1, 1, 2, 3)
.atZone(zoneId).toInstant)) === "1000-01-01 01:02:03")
assert(formatter.format(java.sql.Timestamp.valueOf("1000-01-01 01:02:03")) ===
"1000-01-01 01:02:03")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,10 @@ object HiveResult {
def toHiveString(a: (Any, DataType), nested: Boolean = false): String = a match {
case (null, _) => if (nested) "null" else "NULL"
case (b, BooleanType) => b.toString
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
case (ld: LocalDate, DateType) =>
dateFormatter.format(DateTimeUtils.localDateToDays(ld))
case (t: Timestamp, TimestampType) =>
timestampFormatter.format(DateTimeUtils.fromJavaTimestamp(t))
case (i: Instant, TimestampType) =>
timestampFormatter.format(DateTimeUtils.instantToMicros(i))
case (d: Date, DateType) => dateFormatter.format(d)
case (ld: LocalDate, DateType) => dateFormatter.format(ld)
case (t: Timestamp, TimestampType) => timestampFormatter.format(t)
case (i: Instant, TimestampType) => timestampFormatter.format(i)
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => decimal.toPlainString
case (n, _: NumericType) => n.toString
Expand Down