Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26456][SQL] Cast date/timestamp to string by Date/TimestampFormatter #23391

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
e09c972
Switching to TimestampFormatter
MaxGekk Dec 27, 2018
697688a
Switching to DateFormatter
MaxGekk Dec 27, 2018
fcffc12
Separate parser for PartitioningUtils
MaxGekk Dec 27, 2018
f0a9fe7
Revert unneeded changes
MaxGekk Dec 27, 2018
5f0b0a3
Moving partition timestamp and date parsers to PartitioningUtils
MaxGekk Dec 28, 2018
17a32a3
Add local date and timestamp to Cast
MaxGekk Dec 28, 2018
9d90d3f
Add local date and timestamp formatters to QueryExecution
MaxGekk Dec 28, 2018
038ad80
Add local date and timestamp formatters to JDBC
MaxGekk Dec 28, 2018
f6308f6
Removing unused timestampToString
MaxGekk Dec 28, 2018
9f85ac6
Removing unused dateToString
MaxGekk Dec 28, 2018
d348c07
Fix build error
MaxGekk Dec 28, 2018
36c9f9a
Remove unused threadLocalTimestampFormat
MaxGekk Dec 28, 2018
4580c25
Revert changes in TimestampFormatter
MaxGekk Dec 28, 2018
56bdae4
apply with default locale
MaxGekk Dec 28, 2018
8541602
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Dec 28, 2018
ecf0e89
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Dec 28, 2018
c3066e1
Making dateFormatter lazy in cast
MaxGekk Dec 28, 2018
eed40d7
Removing date partition pattern
MaxGekk Dec 30, 2018
94cad6a
Merge branch 'master' into thread-local-date-format
MaxGekk Jan 1, 2019
0de72c8
Merge branch 'master' into thread-local-date-format
MaxGekk Jan 2, 2019
1156291
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 3, 2019
5ae58a3
Fix merge
MaxGekk Jan 3, 2019
78c3961
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 7, 2019
2397401
Fix merge
MaxGekk Jan 7, 2019
206c955
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
MaxGekk Jan 11, 2019
483e95b
Remove unused import
MaxGekk Jan 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
// [[func]] assumes the input is no longer null because eval already does the null check.
@inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T])

private lazy val dateFormatter = DateFormatter()
private lazy val timestampFormatter = TimestampFormatter(timeZone)

// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes)
case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d)))
case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d)))
case TimestampType => buildCast[Long](_,
t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone)))
t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t)))
case ArrayType(et, _) =>
buildCast[ArrayData](_, array => {
val builder = new UTF8StringBuilder
Expand Down Expand Up @@ -843,12 +846,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
case BinaryType =>
(c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);"
case DateType =>
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));"""
val df = JavaCode.global(
ctx.addReferenceObj("dateFormatter", dateFormatter),
dateFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(${df}.format($c));"""
case TimestampType =>
val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass)
val tf = JavaCode.global(
ctx.addReferenceObj("timestampFormatter", timestampFormatter),
timestampFormatter.getClass)
(c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));"""
org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));"""
case ArrayType(et, _) =>
(c, evPrim, evNull) => {
val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti
copy(timeZoneId = Option(timeZoneId))

override protected def nullSafeEval(timestamp: Any, format: Any): Any = {
val df = TimestampFormatter(format.toString, timeZone, Locale.US)
val df = TimestampFormatter(format.toString, timeZone)
UTF8String.fromString(df.format(timestamp.asInstanceOf[Long]))
}

Expand Down Expand Up @@ -667,7 +667,7 @@ abstract class UnixTime
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
Expand Down Expand Up @@ -700,7 +700,7 @@ abstract class UnixTime
} else {
val formatString = f.asInstanceOf[UTF8String].toString
try {
TimestampFormatter(formatString, timeZone, Locale.US).parse(
TimestampFormatter(formatString, timeZone).parse(
t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND
} catch {
case NonFatal(_) => null
Expand Down Expand Up @@ -821,7 +821,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String]
private lazy val formatter: TimestampFormatter =
try {
TimestampFormatter(constFormat.toString, timeZone, Locale.US)
TimestampFormatter(constFormat.toString, timeZone)
} catch {
case NonFatal(_) => null
}
Expand All @@ -847,7 +847,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[
null
} else {
try {
UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US)
UTF8String.fromString(TimestampFormatter(f.toString, timeZone)
.format(time.asInstanceOf[Long] * MICROS_PER_SECOND))
} catch {
case NonFatal(_) => null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,14 @@ class Iso8601DateFormatter(
}

object DateFormatter {
val defaultPattern: String = "yyyy-MM-dd"
val defaultLocale: Locale = Locale.US

def apply(format: String, locale: Locale): DateFormatter = {
new Iso8601DateFormatter(format, locale)
}

def apply(format: String): DateFormatter = apply(format, defaultLocale)

def apply(): DateFormatter = apply(defaultPattern)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both formatters seems to use thread safe implementations. You could consider just returning cached instances here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the moment, both formatters are created per partition at least not per row. Do you think it makes sense to cache them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, lets leave it for now.

}
Original file line number Diff line number Diff line change
Expand Up @@ -92,32 +92,6 @@ object DateTimeUtils {
}
}

// `SimpleDateFormat` is not thread-safe.
private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US)
}
}

def getThreadLocalTimestampFormat(timeZone: TimeZone): DateFormat = {
val sdf = threadLocalTimestampFormat.get()
sdf.setTimeZone(timeZone)
sdf
}

// `SimpleDateFormat` is not thread-safe.
private val threadLocalDateFormat = new ThreadLocal[DateFormat] {
override def initialValue(): SimpleDateFormat = {
new SimpleDateFormat("yyyy-MM-dd", Locale.US)
}
}

def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = {
val sdf = threadLocalDateFormat.get()
sdf.setTimeZone(timeZone)
sdf
}

private val computedTimeZones = new ConcurrentHashMap[String, TimeZone]
private val computeTimeZone = new JFunction[String, TimeZone] {
override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId)
Expand Down Expand Up @@ -149,24 +123,11 @@ object DateTimeUtils {
millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone)
}

def dateToString(days: SQLDate): String =
getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days))

def dateToString(days: SQLDate, timeZone: TimeZone): String = {
getThreadLocalDateFormat(timeZone).format(toJavaDate(days))
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
def timestampToString(us: SQLTimestamp): String = {
timestampToString(us, defaultTimeZone())
}

// Converts Timestamp to string according to Hive TimestampWritable convention.
def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = {
def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = {
val ts = toJavaTimestamp(us)
val timestampString = ts.toString
val timestampFormat = getThreadLocalTimestampFormat(timeZone)
val formatted = timestampFormat.format(ts)
val formatted = tf.format(us)

if (timestampString.length > 19 && timestampString.substring(19) != ".0") {
formatted + timestampString.substring(19)
Expand Down Expand Up @@ -1168,7 +1129,5 @@ object DateTimeUtils {
*/
private[util] def resetThreadLocals(): Unit = {
threadLocalGmtCalendar.remove()
threadLocalTimestampFormat.remove()
threadLocalDateFormat.remove()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ sealed trait TimestampFormatter extends Serializable {
@throws(classOf[ParseException])
@throws(classOf[DateTimeParseException])
@throws(classOf[DateTimeException])
def parse(s: String): Long // returns microseconds since epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why remove this comment?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it duplicates method description above.

def parse(s: String): Long
def format(us: Long): String
}

Expand Down Expand Up @@ -74,7 +74,18 @@ class Iso8601TimestampFormatter(
}

object TimestampFormatter {
val defaultPattern: String = "yyyy-MM-dd HH:mm:ss"
val defaultLocale: Locale = Locale.US

def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = {
new Iso8601TimestampFormatter(format, timeZone, locale)
}

def apply(format: String, timeZone: TimeZone): TimestampFormatter = {
apply(format, timeZone, defaultLocale)
}

def apply(timeZone: TimeZone): TimestampFormatter = {
apply(defaultPattern, timeZone, defaultLocale)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class DateTimeUtilsSuite extends SparkFunSuite {
}

test("nanoseconds truncation") {
val tf = TimestampFormatter(DateTimeUtils.defaultTimeZone())
def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) {
val parsedTimestampOp = DateTimeUtils.stringToTimestamp(UTF8String.fromString(originalTime))
assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly")
assert(DateTimeUtils.timestampToString(parsedTimestampOp.get) === expectedParsedTime)
assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime)
}

checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("parsing dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val daysSinceEpoch = formatter.parse("2018-12-02")
assert(daysSinceEpoch === 17867)
}
Expand All @@ -39,7 +39,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
test("format dates") {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val date = formatter.format(17867)
assert(date === "2018-12-02")
}
Expand All @@ -59,7 +59,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
"5010-11-17").foreach { date =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val days = formatter.parse(date)
val formatted = formatter.format(days)
assert(date === formatted)
Expand All @@ -82,7 +82,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
1110657).foreach { days =>
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) {
val formatter = DateFormatter("yyyy-MM-dd", Locale.US)
val formatter = DateFormatter()
val date = formatter.format(days)
val parsed = formatter.parse(date)
assert(days === parsed)
Expand All @@ -92,7 +92,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper {
}

test("parsing date without explicit day") {
val formatter = DateFormatter("yyyy MMM", Locale.US)
val formatter = DateFormatter("yyyy MMM")
val daysSinceEpoch = formatter.parse("2018 Dec")
assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
TimeZone.getTimeZone(timeZone))
val microsSinceEpoch = formatter.parse(localDate)
assert(microsSinceEpoch === expectedMicros(timeZone))
}
Expand All @@ -62,8 +61,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone =>
val formatter = TimestampFormatter(
"yyyy-MM-dd'T'HH:mm:ss.SSSSSS",
TimeZone.getTimeZone(timeZone),
Locale.US)
TimeZone.getTimeZone(timeZone))
val timestamp = formatter.format(microsSinceEpoch)
assert(timestamp === expectedTimestamp(timeZone))
}
Expand All @@ -82,7 +80,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
2177456523456789L,
11858049903010203L).foreach { micros =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter(pattern, timeZone, Locale.US)
val formatter = TimestampFormatter(pattern, timeZone)
val timestamp = formatter.format(micros)
val parsed = formatter.parse(timestamp)
assert(micros === parsed)
Expand All @@ -103,7 +101,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
"2039-01-01T01:02:03.456789",
"2345-10-07T22:45:03.010203").foreach { timestamp =>
DateTimeTestUtils.outstandingTimezones.foreach { timeZone =>
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US)
val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone)
val micros = formatter.parse(timestamp)
val formatted = formatter.format(micros)
assert(timestamp === formatted)
Expand All @@ -114,8 +112,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper {
test(" case insensitive parsing of am and pm") {
val formatter = TimestampFormatter(
"yyyy MMM dd hh:mm:ss a",
TimeZone.getTimeZone("UTC"),
Locale.US)
TimeZone.getTimeZone("UTC"))
val micros = formatter.parse("2009 Mar 20 11:30:01 am")
assert(micros === TimeUnit.SECONDS.toMicros(
LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets
import java.sql.{Date, Timestamp}

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -77,6 +77,10 @@ object HiveResult {
TimestampType,
BinaryType)

private lazy val dateFormatter = DateFormatter()
private lazy val timestampFormatter = TimestampFormatter(
DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone))

/** Hive outputs fields of structs slightly differently than top level attributes. */
private def toHiveStructString(a: (Any, DataType)): String = a match {
case (struct: Row, StructType(fields)) =>
Expand Down Expand Up @@ -111,11 +115,9 @@ object HiveResult {
toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType))
}.toSeq.sorted.mkString("{", ",", "}")
case (null, _) => "NULL"
case (d: Date, DateType) =>
DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d))
case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d))
case (t: Timestamp, TimestampType) =>
val timeZone = DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk @cloud-fan by moving getting the timezone from here to a lazy val in the object, it will be initialized only once by the first session that uses it. Another session with a different sessionLocalTimeZone set will get results in wrong timezone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski Thank you for the bug report. I will fix the issue. I think it is ok to create formatters in place because they can be pulled from caches.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the PR #28024

DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), timeZone)
DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t))
case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8)
case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal)
case (interval, CalendarIntervalType) => interval.toString
Expand Down
Loading