From e09c972d72df3f21486cc18aec46287ce087df41 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Dec 2018 15:28:04 +0100 Subject: [PATCH 01/19] Switching to TimestampFormatter --- .../sql/catalyst/util/DateTimeUtils.scala | 15 +++++++-------- .../catalyst/util/TimestampFormatter.scala | 19 ++++++++++++++++--- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 3e5e1fbc2b368..2e0677700e4e5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -77,16 +77,15 @@ object DateTimeUtils { } // `SimpleDateFormat` is not thread-safe. - private val threadLocalTimestampFormat = new ThreadLocal[DateFormat] { - override def initialValue(): SimpleDateFormat = { - new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US) + private val threadLocalTimestampFormat = new ThreadLocal[TimestampFormatter] { + override def initialValue(): TimestampFormatter = { + TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneUTC, Locale.US) } } - def getThreadLocalTimestampFormat(timeZone: TimeZone): DateFormat = { - val sdf = threadLocalTimestampFormat.get() - sdf.setTimeZone(timeZone) - sdf + def getThreadLocalTimestampFormat(timeZone: TimeZone): TimestampFormatter = { + val timestampFormatter = threadLocalTimestampFormat.get() + timestampFormatter.withTimeZone(timeZone) } // `SimpleDateFormat` is not thread-safe. @@ -150,7 +149,7 @@ object DateTimeUtils { val ts = toJavaTimestamp(us) val timestampString = ts.toString val timestampFormat = getThreadLocalTimestampFormat(timeZone) - val formatted = timestampFormat.format(ts) + val formatted = timestampFormat.format(us) if (timestampString.length > 19 && timestampString.substring(19) != ".0") { formatted + timestampString.substring(19) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index b67b2d7cc3c51..2f754ea49b4e7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -42,11 +42,12 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[ParseException]) @throws(classOf[DateTimeParseException]) @throws(classOf[DateTimeException]) - def parse(s: String): Long // returns microseconds since epoch + def parse(s: String): Long def format(us: Long): String + def withTimeZone(tz: TimeZone): TimestampFormatter } -class Iso8601TimestampFormatter( +case class Iso8601TimestampFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper { @@ -77,6 +78,10 @@ class Iso8601TimestampFormatter( formatter.withZone(timeZone.toZoneId).format(instant) } + + override def withTimeZone(tz: TimeZone): TimestampFormatter = { + this.copy(timeZone = tz) + } } class LegacyTimestampFormatter( @@ -93,15 +98,23 @@ class LegacyTimestampFormatter( override def format(us: Long): String = { format.format(DateTimeUtils.toJavaTimestamp(us)) } + + override def withTimeZone(tz: TimeZone): TimestampFormatter = { + new LegacyTimestampFormatter(pattern, tz, locale) + } } -class LegacyFallbackTimestampFormatter( +case class LegacyFallbackTimestampFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } + + override def withTimeZone(tz: TimeZone): TimestampFormatter = { + this.copy(timeZone = tz) + } } object TimestampFormatter { From 697688a9caaad40a1bf6a8340e720999dc6207c7 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Dec 2018 15:47:45 +0100 Subject: [PATCH 02/19] Switching to DateFormatter --- .../sql/catalyst/util/DateTimeUtils.scala | 20 +++++-------------- .../datasources/PartitioningUtils.scala | 2 +- .../datasources/jdbc/JDBCRelation.scala | 2 +- 3 files changed, 7 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 2e0677700e4e5..4bec87e552efb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -76,7 +76,6 @@ object DateTimeUtils { } } - // `SimpleDateFormat` is not thread-safe. private val threadLocalTimestampFormat = new ThreadLocal[TimestampFormatter] { override def initialValue(): TimestampFormatter = { TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneUTC, Locale.US) @@ -88,18 +87,13 @@ object DateTimeUtils { timestampFormatter.withTimeZone(timeZone) } - // `SimpleDateFormat` is not thread-safe. - private val threadLocalDateFormat = new ThreadLocal[DateFormat] { - override def initialValue(): SimpleDateFormat = { - new SimpleDateFormat("yyyy-MM-dd", Locale.US) + private val threadLocalDateFormat = new ThreadLocal[DateFormatter] { + override def initialValue(): DateFormatter = { + DateFormatter("yyyy-MM-dd", Locale.US) } } - def getThreadLocalDateFormat(timeZone: TimeZone): DateFormat = { - val sdf = threadLocalDateFormat.get() - sdf.setTimeZone(timeZone) - sdf - } + def getThreadLocalDateFormat(): DateFormatter = threadLocalDateFormat.get() private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] private val computeTimeZone = new JFunction[String, TimeZone] { @@ -133,11 +127,7 @@ object DateTimeUtils { } def dateToString(days: SQLDate): String = - getThreadLocalDateFormat(defaultTimeZone()).format(toJavaDate(days)) - - def dateToString(days: SQLDate, timeZone: TimeZone): String = { - getThreadLocalDateFormat(timeZone).format(toJavaDate(days)) - } + getThreadLocalDateFormat().format(days) // Converts Timestamp to string according to Hive TimestampWritable convention. def timestampToString(us: SQLTimestamp): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 6458b65466fb5..9d3d5272ac421 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -457,7 +457,7 @@ object PartitioningUtils { val dateTry = Try { // try and parse the date, if no exception occurs this is a candidate to be resolved as // DateType - DateTimeUtils.getThreadLocalDateFormat(DateTimeUtils.defaultTimeZone()).parse(raw) + DateTimeUtils.getThreadLocalDateFormat().parse(raw) // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. // This can happen since DateFormat.parse may not use the entire text of the given string: // so if there are extra-characters after the date, it returns correctly. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index 51c385e25bee3..db7ba9c74c4f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -187,7 +187,7 @@ private[sql] object JDBCRelation extends Logging { def dateTimeToString(): String = { val timeZone = DateTimeUtils.getTimeZone(timeZoneId) val dateTimeStr = columnType match { - case DateType => DateTimeUtils.dateToString(value.toInt, timeZone) + case DateType => DateTimeUtils.dateToString(value.toInt) case TimestampType => DateTimeUtils.timestampToString(value, timeZone) } s"'$dateTimeStr'" From fcffc12d3e8099d7193b31f4e3181ee28403aabb Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Dec 2018 22:17:44 +0100 Subject: [PATCH 03/19] Separate parser for PartitioningUtils --- .../sql/catalyst/util/DateTimeUtils.scala | 21 ++++++++++++++----- .../datasources/PartitioningUtils.scala | 5 ++--- .../ParquetPartitionDiscoverySuite.scala | 1 - 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 4bec87e552efb..f5f6adc287c48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -76,14 +76,25 @@ object DateTimeUtils { } } - private val threadLocalTimestampFormat = new ThreadLocal[TimestampFormatter] { + private val threadLocalTimestampFormatter = new ThreadLocal[TimestampFormatter] { override def initialValue(): TimestampFormatter = { TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneUTC, Locale.US) } } - def getThreadLocalTimestampFormat(timeZone: TimeZone): TimestampFormatter = { - val timestampFormatter = threadLocalTimestampFormat.get() + def getThreadLocalTimestampParser(timeZone: TimeZone): TimestampFormatter = { + val timestampFormatter = threadLocalTimestampParser.get() + timestampFormatter.withTimeZone(timeZone) + } + + private val threadLocalTimestampParser = new ThreadLocal[TimestampFormatter] { + override def initialValue(): TimestampFormatter = { + TimestampFormatter("yyyy-MM-dd HH:mm:ss[.S]", TimeZoneUTC, Locale.US) + } + } + + def getThreadLocalTimestampFormatter(timeZone: TimeZone): TimestampFormatter = { + val timestampFormatter = threadLocalTimestampFormatter.get() timestampFormatter.withTimeZone(timeZone) } @@ -138,7 +149,7 @@ object DateTimeUtils { def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = { val ts = toJavaTimestamp(us) val timestampString = ts.toString - val timestampFormat = getThreadLocalTimestampFormat(timeZone) + val timestampFormat = getThreadLocalTimestampFormatter(timeZone) val formatted = timestampFormat.format(us) if (timestampString.length > 19 && timestampString.substring(19) != ".0") { @@ -1123,7 +1134,7 @@ object DateTimeUtils { */ private[util] def resetThreadLocals(): Unit = { threadLocalGmtCalendar.remove() - threadLocalTimestampFormat.remove() + threadLocalTimestampFormatter.remove() threadLocalDateFormat.remove() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 9d3d5272ac421..320583876690e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -26,13 +26,12 @@ import scala.util.Try import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{AnalysisException, SparkSession} +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -474,7 +473,7 @@ object PartitioningUtils { val unescapedRaw = unescapePathName(raw) // try and parse the date, if no exception occurs this is a candidate to be resolved as // TimestampType - DateTimeUtils.getThreadLocalTimestampFormat(timeZone).parse(unescapedRaw) + DateTimeUtils.getThreadLocalTimestampParser(timeZone).parse(unescapedRaw) // SPARK-23436: see comment for date val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval() // Disallow TimestampType if the cast returned null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 88067358667c6..14cb30c1ff39e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -36,7 +36,6 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ From f0a9fe726223dd9c202e6154b431df8beb63e28f Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Dec 2018 22:19:00 +0100 Subject: [PATCH 04/19] Revert unneeded changes --- .../datasources/parquet/ParquetPartitionDiscoverySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 14cb30c1ff39e..88067358667c6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ From 5f0b0a3973e6db3163148d587cda9871760f9b29 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 10:59:58 +0100 Subject: [PATCH 05/19] Moving partition timestamp and date parsers to PartitioningUtils --- .../sql/catalyst/util/DateTimeUtils.scala | 21 +++------- .../datasources/PartitioningUtils.scala | 41 ++++++++++++++----- .../ParquetPartitionDiscoverySuite.scala | 18 +++++--- 3 files changed, 48 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index f5f6adc287c48..4bec87e552efb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -76,25 +76,14 @@ object DateTimeUtils { } } - private val threadLocalTimestampFormatter = new ThreadLocal[TimestampFormatter] { + private val threadLocalTimestampFormat = new ThreadLocal[TimestampFormatter] { override def initialValue(): TimestampFormatter = { TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneUTC, Locale.US) } } - def getThreadLocalTimestampParser(timeZone: TimeZone): TimestampFormatter = { - val timestampFormatter = threadLocalTimestampParser.get() - timestampFormatter.withTimeZone(timeZone) - } - - private val threadLocalTimestampParser = new ThreadLocal[TimestampFormatter] { - override def initialValue(): TimestampFormatter = { - TimestampFormatter("yyyy-MM-dd HH:mm:ss[.S]", TimeZoneUTC, Locale.US) - } - } - - def getThreadLocalTimestampFormatter(timeZone: TimeZone): TimestampFormatter = { - val timestampFormatter = threadLocalTimestampFormatter.get() + def getThreadLocalTimestampFormat(timeZone: TimeZone): TimestampFormatter = { + val timestampFormatter = threadLocalTimestampFormat.get() timestampFormatter.withTimeZone(timeZone) } @@ -149,7 +138,7 @@ object DateTimeUtils { def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = { val ts = toJavaTimestamp(us) val timestampString = ts.toString - val timestampFormat = getThreadLocalTimestampFormatter(timeZone) + val timestampFormat = getThreadLocalTimestampFormat(timeZone) val formatted = timestampFormat.format(us) if (timestampString.length > 19 && timestampString.substring(19) != ".0") { @@ -1134,7 +1123,7 @@ object DateTimeUtils { */ private[util] def resetThreadLocals(): Unit = { threadLocalGmtCalendar.remove() - threadLocalTimestampFormatter.remove() + threadLocalTimestampFormat.remove() threadLocalDateFormat.remove() } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 320583876690e..47f63e53725e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.{Resolver, TypeCoercion} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal} -import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.SchemaUtils @@ -58,6 +58,9 @@ object PartitionSpec { object PartitioningUtils { + val datePartitionPattern = "yyyy-MM-dd" + val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" + private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { require(columnNames.size == literals.size) @@ -121,10 +124,12 @@ object PartitioningUtils { Map.empty[String, DataType] } + val dateFormatter = DateFormatter(datePartitionPattern, Locale.US) + val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone, Locale.US) // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, - validatePartitionColumns, timeZone) + validatePartitionColumns, timeZone, dateFormatter, timestampFormatter) }.unzip // We create pairs of (path -> path's partition value) here @@ -207,7 +212,9 @@ object PartitioningUtils { basePaths: Set[Path], userSpecifiedDataTypes: Map[String, DataType], validatePartitionColumns: Boolean, - timeZone: TimeZone): (Option[PartitionValues], Option[Path]) = { + timeZone: TimeZone, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): (Option[PartitionValues], Option[Path]) = { val columns = ArrayBuffer.empty[(String, Literal)] // Old Hadoop versions don't have `Path.isRoot` var finished = path.getParent == null @@ -229,7 +236,7 @@ object PartitioningUtils { // Once we get the string, we try to parse it and find the partition column and value. val maybeColumn = parsePartitionColumn(currentPath.getName, typeInference, userSpecifiedDataTypes, - validatePartitionColumns, timeZone) + validatePartitionColumns, timeZone, dateFormatter, timestampFormatter) maybeColumn.foreach(columns += _) // Now, we determine if we should stop. @@ -264,7 +271,9 @@ object PartitioningUtils { typeInference: Boolean, userSpecifiedDataTypes: Map[String, DataType], validatePartitionColumns: Boolean, - timeZone: TimeZone): Option[(String, Literal)] = { + timeZone: TimeZone, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): Option[(String, Literal)] = { val equalSignIndex = columnSpec.indexOf('=') if (equalSignIndex == -1) { None @@ -279,7 +288,12 @@ object PartitioningUtils { // SPARK-26188: if user provides corresponding column schema, get the column value without // inference, and then cast it as user specified data type. val dataType = userSpecifiedDataTypes(columnName) - val columnValueLiteral = inferPartitionColumnValue(rawColumnValue, false, timeZone) + val columnValueLiteral = inferPartitionColumnValue( + rawColumnValue, + false, + timeZone, + dateFormatter, + timestampFormatter) val columnValue = columnValueLiteral.eval() val castedValue = Cast(columnValueLiteral, dataType, Option(timeZone.getID)).eval() if (validatePartitionColumns && columnValue != null && castedValue == null) { @@ -288,7 +302,12 @@ object PartitioningUtils { } Literal.create(castedValue, dataType) } else { - inferPartitionColumnValue(rawColumnValue, typeInference, timeZone) + inferPartitionColumnValue( + rawColumnValue, + typeInference, + timeZone, + dateFormatter, + timestampFormatter) } Some(columnName -> literal) } @@ -441,7 +460,9 @@ object PartitioningUtils { private[datasources] def inferPartitionColumnValue( raw: String, typeInference: Boolean, - timeZone: TimeZone): Literal = { + timeZone: TimeZone, + dateFormatter: DateFormatter, + timestampFormatter: TimestampFormatter): Literal = { val decimalTry = Try { // `BigDecimal` conversion can fail when the `field` is not a form of number. val bigDecimal = new JBigDecimal(raw) @@ -456,7 +477,7 @@ object PartitioningUtils { val dateTry = Try { // try and parse the date, if no exception occurs this is a candidate to be resolved as // DateType - DateTimeUtils.getThreadLocalDateFormat().parse(raw) + dateFormatter.parse(raw) // SPARK-23436: Casting the string to date may still return null if a bad Date is provided. // This can happen since DateFormat.parse may not use the entire text of the given string: // so if there are extra-characters after the date, it returns correctly. @@ -473,7 +494,7 @@ object PartitioningUtils { val unescapedRaw = unescapePathName(raw) // try and parse the date, if no exception occurs this is a candidate to be resolved as // TimestampType - DateTimeUtils.getThreadLocalTimestampParser(timeZone).parse(unescapedRaw) + timestampFormatter.parse(unescapedRaw) // SPARK-23436: see comment for date val timestampValue = Cast(Literal(unescapedRaw), TimestampType, Some(timeZone.getID)).eval() // Disallow TimestampType if the cast returned null diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index 88067358667c6..ad2325b01f524 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.execution.datasources.{PartitionPath => Partition} import org.apache.spark.sql.execution.streaming.MemoryStream @@ -56,6 +56,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val timeZone = TimeZone.getDefault() val timeZoneId = timeZone.getID + val df = DateFormatter(datePartitionPattern, Locale.US) + val tf = TimestampFormatter(timestampPartitionPattern, timeZone, Locale.US) protected override def beforeAll(): Unit = { super.beforeAll() @@ -69,7 +71,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("column type inference") { def check(raw: String, literal: Literal, timeZone: TimeZone = timeZone): Unit = { - assert(inferPartitionColumnValue(raw, true, timeZone) === literal) + assert(inferPartitionColumnValue(raw, true, timeZone, df, tf) === literal) } check("10", Literal.create(10, IntegerType)) @@ -197,13 +199,13 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha test("parse partition") { def check(path: String, expected: Option[PartitionValues]): Unit = { val actual = parsePartition(new Path(path), true, Set.empty[Path], - Map.empty, true, timeZone)._1 + Map.empty, true, timeZone, df, tf)._1 assert(expected === actual) } def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = { val message = intercept[T] { - parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone) + parsePartition(new Path(path), true, Set.empty[Path], Map.empty, true, timeZone, df, tf) }.getMessage assert(message.contains(expected)) @@ -249,7 +251,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha basePaths = Set(new Path("file://path/a=10")), Map.empty, true, - timeZone = timeZone)._1 + timeZone = timeZone, + df, + tf)._1 assert(partitionSpec1.isEmpty) @@ -260,7 +264,9 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha basePaths = Set(new Path("file://path")), Map.empty, true, - timeZone = timeZone)._1 + timeZone = timeZone, + df, + tf)._1 assert(partitionSpec2 == Option(PartitionValues( From 17a32a3911a380f2880e13d4cfc658104ada38d2 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 11:56:38 +0100 Subject: [PATCH 06/19] Add local date and timestamp to Cast --- .../spark/sql/catalyst/expressions/Cast.scala | 19 +++++++++++++------ .../sql/catalyst/util/DateFormatter.scala | 4 ++++ .../sql/catalyst/util/DateTimeUtils.scala | 12 ++++++++++++ .../catalyst/util/TimestampFormatter.scala | 4 ++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index ee463bf5eb6ac..4e8b9447b923c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -230,12 +230,15 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // [[func]] assumes the input is no longer null because eval already does the null check. @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T]) + private val dateFormatter = DateFormatter() + private lazy val timestampFormatter = TimestampFormatter(timeZone) + // UDFToString private[this] def castToString(from: DataType): Any => Any = from match { case BinaryType => buildCast[Array[Byte]](_, UTF8String.fromBytes) - case DateType => buildCast[Int](_, d => UTF8String.fromString(DateTimeUtils.dateToString(d))) + case DateType => buildCast[Int](_, d => UTF8String.fromString(dateFormatter.format(d))) case TimestampType => buildCast[Long](_, - t => UTF8String.fromString(DateTimeUtils.timestampToString(t, timeZone))) + t => UTF8String.fromString(DateTimeUtils.timestampToString(timestampFormatter, t))) case ArrayType(et, _) => buildCast[ArrayData](_, array => { val builder = new UTF8StringBuilder @@ -843,12 +846,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String case BinaryType => (c, evPrim, evNull) => code"$evPrim = UTF8String.fromBytes($c);" case DateType => - (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( - org.apache.spark.sql.catalyst.util.DateTimeUtils.dateToString($c));""" + val df = JavaCode.global( + ctx.addReferenceObj("dateFormatter", dateFormatter), + dateFormatter.getClass) + (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString(${df}.format($c));""" case TimestampType => - val tz = JavaCode.global(ctx.addReferenceObj("timeZone", timeZone), timeZone.getClass) + val tf = JavaCode.global( + ctx.addReferenceObj("timestampFormatter", timestampFormatter), + timestampFormatter.getClass) (c, evPrim, evNull) => code"""$evPrim = UTF8String.fromString( - org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($c, $tz));""" + org.apache.spark.sql.catalyst.util.DateTimeUtils.timestampToString($tf, $c));""" case ArrayType(et, _) => (c, evPrim, evNull) => { val buffer = ctx.freshVariable("buffer", classOf[UTF8StringBuilder]) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index b4c99674fc1cd..af65443cfdb02 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -88,6 +88,8 @@ class LegacyFallbackDateFormatter( } object DateFormatter { + val defaultPattern = "yyyy-MM-dd" + def apply(format: String, locale: Locale): DateFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { new LegacyFallbackDateFormatter(format, locale) @@ -95,4 +97,6 @@ object DateFormatter { new Iso8601DateFormatter(format, locale) } } + + def apply(): DateFormatter = apply(defaultPattern, Locale.US) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 4bec87e552efb..e61ed1b2898d0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -148,6 +148,18 @@ object DateTimeUtils { } } + def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = { + val ts = toJavaTimestamp(us) + val timestampString = ts.toString + val formatted = tf.format(us) + + if (timestampString.length > 19 && timestampString.substring(19) != ".0") { + formatted + timestampString.substring(19) + } else { + formatted + } + } + @tailrec def stringToTime(s: String): java.util.Date = { val indexOfGMT = s.indexOf("GMT") diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 2f754ea49b4e7..014d6c8bb8731 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -118,6 +118,8 @@ case class LegacyFallbackTimestampFormatter( } object TimestampFormatter { + val defaultPattern = "yyyy-MM-dd HH:mm:ss" + def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { new LegacyFallbackTimestampFormatter(format, timeZone, locale) @@ -125,4 +127,6 @@ object TimestampFormatter { new Iso8601TimestampFormatter(format, timeZone, locale) } } + + def apply(timeZone: TimeZone): TimestampFormatter = apply(defaultPattern, timeZone, Locale.US) } From 9d90d3f055a219df2dd8cf242bb93979a1f80176 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:24:15 +0100 Subject: [PATCH 07/19] Add local date and timestamp formatters to QueryExecution --- .../spark/sql/execution/QueryExecution.scala | 12 ++++++------ .../spark/sql/execution/QueryExecutionSuite.scala | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index eef5a3f899f55..0e9ef42d8fa0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -30,8 +30,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, QueryPlanningTracker} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.catalyst.util.{truncatedString, DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} @@ -110,6 +109,9 @@ class QueryExecution( protected def stringOrError[A](f: => A): String = try f.toString catch { case e: AnalysisException => e.toString } + private val dateFormatter = DateFormatter() + private val timestampFormatter = TimestampFormatter( + DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) /** * Returns the result as a hive compatible sequence of strings. This is used in tests and @@ -183,11 +185,9 @@ class QueryExecution( toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) }.toSeq.sorted.mkString("{", ",", "}") case (null, _) => "NULL" - case (d: Date, DateType) => - DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d)) + case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d)) case (t: Timestamp, TimestampType) => - DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), - DateTimeUtils.getTimeZone(sparkSession.sessionState.conf.sessionLocalTimeZone)) + DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t)) case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) case (interval, CalendarIntervalType) => interval.toString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 0c47a2040f171..834485ab4bc63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -16,6 +16,8 @@ */ package org.apache.spark.sql.execution +import java.sql.{Date, Timestamp} + import scala.io.Source import org.apache.spark.sql.AnalysisException @@ -106,6 +108,18 @@ class QueryExecutionSuite extends SharedSQLContext { } } + test("date formatting in hive result") { + val date = "2018-12-28" + val result = Seq(Date.valueOf(date)).toDS().queryExecution.hiveResultString() + assert(result.head == date) + } + + test("timestamp formatting in hive result") { + val timestamp = "2018-12-28 01:02:03" + val result = Seq(Timestamp.valueOf(timestamp)).toDS().queryExecution.hiveResultString() + assert(result.head == timestamp) + } + test("toString() exception/error handling") { spark.experimental.extraStrategies = Seq( new SparkStrategy { From 038ad80e643ae110e2f1f6419ebb055b547b21e3 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:32:51 +0100 Subject: [PATCH 08/19] Add local date and timestamp formatters to JDBC --- .../sql/execution/datasources/jdbc/JDBCRelation.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala index db7ba9c74c4f9..13ed105004d70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala @@ -26,7 +26,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.jdbc.JdbcDialects import org.apache.spark.sql.sources._ @@ -185,10 +185,11 @@ private[sql] object JDBCRelation extends Logging { columnType: DataType, timeZoneId: String): String = { def dateTimeToString(): String = { - val timeZone = DateTimeUtils.getTimeZone(timeZoneId) val dateTimeStr = columnType match { - case DateType => DateTimeUtils.dateToString(value.toInt) - case TimestampType => DateTimeUtils.timestampToString(value, timeZone) + case DateType => DateFormatter().format(value.toInt) + case TimestampType => + val timestampFormatter = TimestampFormatter(DateTimeUtils.getTimeZone(timeZoneId)) + DateTimeUtils.timestampToString(timestampFormatter, value) } s"'$dateTimeStr'" } From f6308f62229d8ecb701a79fc743cf5c6bf188c40 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:36:40 +0100 Subject: [PATCH 09/19] Removing unused timestampToString --- .../sql/catalyst/util/DateTimeUtils.scala | 18 ------------------ .../sql/catalyst/util/DateTimeUtilsSuite.scala | 3 ++- 2 files changed, 2 insertions(+), 19 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index e61ed1b2898d0..e0499108e76e3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -130,24 +130,6 @@ object DateTimeUtils { getThreadLocalDateFormat().format(days) // Converts Timestamp to string according to Hive TimestampWritable convention. - def timestampToString(us: SQLTimestamp): String = { - timestampToString(us, defaultTimeZone()) - } - - // Converts Timestamp to string according to Hive TimestampWritable convention. - def timestampToString(us: SQLTimestamp, timeZone: TimeZone): String = { - val ts = toJavaTimestamp(us) - val timestampString = ts.toString - val timestampFormat = getThreadLocalTimestampFormat(timeZone) - val formatted = timestampFormat.format(us) - - if (timestampString.length > 19 && timestampString.substring(19) != ".0") { - formatted + timestampString.substring(19) - } else { - formatted - } - } - def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = { val ts = toJavaTimestamp(us) val timestampString = ts.toString diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala index 0182eeb171215..7b1ba5dbd2441 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/util/DateTimeUtilsSuite.scala @@ -35,10 +35,11 @@ class DateTimeUtilsSuite extends SparkFunSuite { } test("nanoseconds truncation") { + val tf = TimestampFormatter(DateTimeUtils.defaultTimeZone()) def checkStringToTimestamp(originalTime: String, expectedParsedTime: String) { val parsedTimestampOp = DateTimeUtils.stringToTimestamp(UTF8String.fromString(originalTime)) assert(parsedTimestampOp.isDefined, "timestamp with nanoseconds was not parsed correctly") - assert(DateTimeUtils.timestampToString(parsedTimestampOp.get) === expectedParsedTime) + assert(DateTimeUtils.timestampToString(tf, parsedTimestampOp.get) === expectedParsedTime) } checkStringToTimestamp("2015-01-02 00:00:00.123456789", "2015-01-02 00:00:00.123456") From 9f85ac6572d8839984efefb67bd5f842f2d177b9 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:38:16 +0100 Subject: [PATCH 10/19] Removing unused dateToString --- .../spark/sql/catalyst/util/DateTimeUtils.scala | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index e0499108e76e3..2ba908c617878 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -87,14 +87,6 @@ object DateTimeUtils { timestampFormatter.withTimeZone(timeZone) } - private val threadLocalDateFormat = new ThreadLocal[DateFormatter] { - override def initialValue(): DateFormatter = { - DateFormatter("yyyy-MM-dd", Locale.US) - } - } - - def getThreadLocalDateFormat(): DateFormatter = threadLocalDateFormat.get() - private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] private val computeTimeZone = new JFunction[String, TimeZone] { override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) @@ -126,9 +118,6 @@ object DateTimeUtils { millisLocal - getOffsetFromLocalMillis(millisLocal, timeZone) } - def dateToString(days: SQLDate): String = - getThreadLocalDateFormat().format(days) - // Converts Timestamp to string according to Hive TimestampWritable convention. def timestampToString(tf: TimestampFormatter, us: SQLTimestamp): String = { val ts = toJavaTimestamp(us) From d348c07126dd6b28df131baf40f9160b98644ec4 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:38:40 +0100 Subject: [PATCH 11/19] Fix build error --- .../scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 2ba908c617878..7371c872f3980 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -1107,6 +1107,5 @@ object DateTimeUtils { private[util] def resetThreadLocals(): Unit = { threadLocalGmtCalendar.remove() threadLocalTimestampFormat.remove() - threadLocalDateFormat.remove() } } From 36c9f9a2675aa4d7da794850e38ce40eb29bbe11 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:39:52 +0100 Subject: [PATCH 12/19] Remove unused threadLocalTimestampFormat --- .../spark/sql/catalyst/util/DateTimeUtils.scala | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 7371c872f3980..127e60405ea73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -76,17 +76,6 @@ object DateTimeUtils { } } - private val threadLocalTimestampFormat = new ThreadLocal[TimestampFormatter] { - override def initialValue(): TimestampFormatter = { - TimestampFormatter("yyyy-MM-dd HH:mm:ss", TimeZoneUTC, Locale.US) - } - } - - def getThreadLocalTimestampFormat(timeZone: TimeZone): TimestampFormatter = { - val timestampFormatter = threadLocalTimestampFormat.get() - timestampFormatter.withTimeZone(timeZone) - } - private val computedTimeZones = new ConcurrentHashMap[String, TimeZone] private val computeTimeZone = new JFunction[String, TimeZone] { override def apply(timeZoneId: String): TimeZone = TimeZone.getTimeZone(timeZoneId) @@ -1106,6 +1095,5 @@ object DateTimeUtils { */ private[util] def resetThreadLocals(): Unit = { threadLocalGmtCalendar.remove() - threadLocalTimestampFormat.remove() } } From 4580c254122bb8f39b94c6b9a10ffe3c69b7423d Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:42:27 +0100 Subject: [PATCH 13/19] Revert changes in TimestampFormatter --- .../sql/catalyst/util/TimestampFormatter.scala | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 014d6c8bb8731..35b6298d0e4a0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -44,10 +44,9 @@ sealed trait TimestampFormatter extends Serializable { @throws(classOf[DateTimeException]) def parse(s: String): Long def format(us: Long): String - def withTimeZone(tz: TimeZone): TimestampFormatter } -case class Iso8601TimestampFormatter( +class Iso8601TimestampFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends TimestampFormatter with DateTimeFormatterHelper { @@ -78,10 +77,6 @@ case class Iso8601TimestampFormatter( formatter.withZone(timeZone.toZoneId).format(instant) } - - override def withTimeZone(tz: TimeZone): TimestampFormatter = { - this.copy(timeZone = tz) - } } class LegacyTimestampFormatter( @@ -98,23 +93,15 @@ class LegacyTimestampFormatter( override def format(us: Long): String = { format.format(DateTimeUtils.toJavaTimestamp(us)) } - - override def withTimeZone(tz: TimeZone): TimestampFormatter = { - new LegacyTimestampFormatter(pattern, tz, locale) - } } -case class LegacyFallbackTimestampFormatter( +class LegacyFallbackTimestampFormatter( pattern: String, timeZone: TimeZone, locale: Locale) extends LegacyTimestampFormatter(pattern, timeZone, locale) { override def toMillis(s: String): Long = { Try {super.toMillis(s)}.getOrElse(DateTimeUtils.stringToTime(s).getTime) } - - override def withTimeZone(tz: TimeZone): TimestampFormatter = { - this.copy(timeZone = tz) - } } object TimestampFormatter { From 56bdae417366931346970f69bc1384f772660041 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Fri, 28 Dec 2018 12:55:36 +0100 Subject: [PATCH 14/19] apply with default locale --- .../catalyst/expressions/datetimeExpressions.scala | 10 +++++----- .../spark/sql/catalyst/util/DateFormatter.scala | 7 +++++-- .../sql/catalyst/util/TimestampFormatter.scala | 11 +++++++++-- .../apache/spark/sql/util/DateFormatterSuite.scala | 10 +++++----- .../spark/sql/util/TimestampFormatterSuite.scala | 13 +++++-------- .../execution/datasources/PartitioningUtils.scala | 4 ++-- .../parquet/ParquetPartitionDiscoverySuite.scala | 4 ++-- 7 files changed, 33 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 73af0a3c5c2ee..38367c47ff212 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -562,7 +562,7 @@ case class DateFormatClass(left: Expression, right: Expression, timeZoneId: Opti copy(timeZoneId = Option(timeZoneId)) override protected def nullSafeEval(timestamp: Any, format: Any): Any = { - val df = TimestampFormatter(format.toString, timeZone, Locale.US) + val df = TimestampFormatter(format.toString, timeZone) UTF8String.fromString(df.format(timestamp.asInstanceOf[Long])) } @@ -667,7 +667,7 @@ abstract class UnixTime private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, timeZone, Locale.US) + TimestampFormatter(constFormat.toString, timeZone) } catch { case NonFatal(_) => null } @@ -700,7 +700,7 @@ abstract class UnixTime } else { val formatString = f.asInstanceOf[UTF8String].toString try { - TimestampFormatter(formatString, timeZone, Locale.US).parse( + TimestampFormatter(formatString, timeZone).parse( t.asInstanceOf[UTF8String].toString) / MICROS_PER_SECOND } catch { case NonFatal(_) => null @@ -821,7 +821,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ private lazy val constFormat: UTF8String = right.eval().asInstanceOf[UTF8String] private lazy val formatter: TimestampFormatter = try { - TimestampFormatter(constFormat.toString, timeZone, Locale.US) + TimestampFormatter(constFormat.toString, timeZone) } catch { case NonFatal(_) => null } @@ -847,7 +847,7 @@ case class FromUnixTime(sec: Expression, format: Expression, timeZoneId: Option[ null } else { try { - UTF8String.fromString(TimestampFormatter(f.toString, timeZone, Locale.US) + UTF8String.fromString(TimestampFormatter(f.toString, timeZone) .format(time.asInstanceOf[Long] * MICROS_PER_SECOND)) } catch { case NonFatal(_) => null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index af65443cfdb02..7a4e6b29b9682 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -88,7 +88,8 @@ class LegacyFallbackDateFormatter( } object DateFormatter { - val defaultPattern = "yyyy-MM-dd" + val defaultPattern: String = "yyyy-MM-dd" + val defaultLocale: Locale = Locale.US def apply(format: String, locale: Locale): DateFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { @@ -98,5 +99,7 @@ object DateFormatter { } } - def apply(): DateFormatter = apply(defaultPattern, Locale.US) + def apply(format: String): DateFormatter = apply(format, defaultLocale) + + def apply(): DateFormatter = apply(defaultPattern) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 35b6298d0e4a0..649cde1d21da4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -105,7 +105,8 @@ class LegacyFallbackTimestampFormatter( } object TimestampFormatter { - val defaultPattern = "yyyy-MM-dd HH:mm:ss" + val defaultPattern: String = "yyyy-MM-dd HH:mm:ss" + val defaultLocale: Locale = Locale.US def apply(format: String, timeZone: TimeZone, locale: Locale): TimestampFormatter = { if (SQLConf.get.legacyTimeParserEnabled) { @@ -115,5 +116,11 @@ object TimestampFormatter { } } - def apply(timeZone: TimeZone): TimestampFormatter = apply(defaultPattern, timeZone, Locale.US) + def apply(format: String, timeZone: TimeZone): TimestampFormatter = { + apply(format, timeZone, defaultLocale) + } + + def apply(timeZone: TimeZone): TimestampFormatter = { + apply(defaultPattern, timeZone, defaultLocale) + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala index 2dc55e0e1f633..4d5872c92f5a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/DateFormatterSuite.scala @@ -29,7 +29,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { test("parsing dates") { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val formatter = DateFormatter() val daysSinceEpoch = formatter.parse("2018-12-02") assert(daysSinceEpoch === 17867) } @@ -39,7 +39,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { test("format dates") { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val formatter = DateFormatter() val date = formatter.format(17867) assert(date === "2018-12-02") } @@ -59,7 +59,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { "5010-11-17").foreach { date => DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val formatter = DateFormatter() val days = formatter.parse(date) val formatted = formatter.format(days) assert(date === formatted) @@ -82,7 +82,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { 1110657).foreach { days => DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> timeZone) { - val formatter = DateFormatter("yyyy-MM-dd", Locale.US) + val formatter = DateFormatter() val date = formatter.format(days) val parsed = formatter.parse(date) assert(days === parsed) @@ -92,7 +92,7 @@ class DateFormatterSuite extends SparkFunSuite with SQLHelper { } test("parsing date without explicit day") { - val formatter = DateFormatter("yyyy MMM", Locale.US) + val formatter = DateFormatter("yyyy MMM") val daysSinceEpoch = formatter.parse("2018 Dec") assert(daysSinceEpoch === LocalDate.of(2018, 12, 1).toEpochDay) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala index edccbb2a7f5db..34790984b9344 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/TimestampFormatterSuite.scala @@ -41,8 +41,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => val formatter = TimestampFormatter( "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", - TimeZone.getTimeZone(timeZone), - Locale.US) + TimeZone.getTimeZone(timeZone)) val microsSinceEpoch = formatter.parse(localDate) assert(microsSinceEpoch === expectedMicros(timeZone)) } @@ -62,8 +61,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { DateTimeTestUtils.outstandingTimezonesIds.foreach { timeZone => val formatter = TimestampFormatter( "yyyy-MM-dd'T'HH:mm:ss.SSSSSS", - TimeZone.getTimeZone(timeZone), - Locale.US) + TimeZone.getTimeZone(timeZone)) val timestamp = formatter.format(microsSinceEpoch) assert(timestamp === expectedTimestamp(timeZone)) } @@ -81,7 +79,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { 2177456523456789L, 11858049903010203L).foreach { micros => DateTimeTestUtils.outstandingTimezones.foreach { timeZone => - val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone) val timestamp = formatter.format(micros) val parsed = formatter.parse(timestamp) assert(micros === parsed) @@ -101,7 +99,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { "2039-01-01T01:02:03.456789", "2345-10-07T22:45:03.010203").foreach { timestamp => DateTimeTestUtils.outstandingTimezones.foreach { timeZone => - val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone, Locale.US) + val formatter = TimestampFormatter("yyyy-MM-dd'T'HH:mm:ss.SSSSSS", timeZone) val micros = formatter.parse(timestamp) val formatted = formatter.format(micros) assert(timestamp === formatted) @@ -112,8 +110,7 @@ class TimestampFormatterSuite extends SparkFunSuite with SQLHelper { test(" case insensitive parsing of am and pm") { val formatter = TimestampFormatter( "yyyy MMM dd hh:mm:ss a", - TimeZone.getTimeZone("UTC"), - Locale.US) + TimeZone.getTimeZone("UTC")) val micros = formatter.parse("2009 Mar 20 11:30:01 am") assert(micros === TimeUnit.SECONDS.toMicros( LocalDateTime.of(2009, 3, 20, 11, 30, 1).toEpochSecond(ZoneOffset.UTC))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 47f63e53725e7..b4ac973e2a35c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -124,8 +124,8 @@ object PartitioningUtils { Map.empty[String, DataType] } - val dateFormatter = DateFormatter(datePartitionPattern, Locale.US) - val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone, Locale.US) + val dateFormatter = DateFormatter(datePartitionPattern) + val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone) // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => parsePartition(path, typeInference, basePaths, userSpecifiedDataTypes, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index ad2325b01f524..c2e1a81ba3a93 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -56,8 +56,8 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val timeZone = TimeZone.getDefault() val timeZoneId = timeZone.getID - val df = DateFormatter(datePartitionPattern, Locale.US) - val tf = TimestampFormatter(timestampPartitionPattern, timeZone, Locale.US) + val df = DateFormatter(datePartitionPattern) + val tf = TimestampFormatter(timestampPartitionPattern, timeZone) protected override def beforeAll(): Unit = { super.beforeAll() From c3066e1a919ddb3c1942a247d14143b08e9422a2 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sat, 29 Dec 2018 00:08:22 +0100 Subject: [PATCH 15/19] Making dateFormatter lazy in cast --- .../scala/org/apache/spark/sql/catalyst/expressions/Cast.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 4e8b9447b923c..ff6a68b290206 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -230,7 +230,7 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String // [[func]] assumes the input is no longer null because eval already does the null check. @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = func(a.asInstanceOf[T]) - private val dateFormatter = DateFormatter() + private lazy val dateFormatter = DateFormatter() private lazy val timestampFormatter = TimestampFormatter(timeZone) // UDFToString From eed40d7357fbe1390abe455d0b66a59f27f62cef Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Sun, 30 Dec 2018 12:08:20 +0100 Subject: [PATCH 16/19] Removing date partition pattern --- .../spark/sql/execution/datasources/PartitioningUtils.scala | 3 +-- .../datasources/parquet/ParquetPartitionDiscoverySuite.scala | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index b4ac973e2a35c..ee770426e61f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -58,7 +58,6 @@ object PartitionSpec { object PartitioningUtils { - val datePartitionPattern = "yyyy-MM-dd" val timestampPartitionPattern = "yyyy-MM-dd HH:mm:ss[.S]" private[datasources] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) @@ -124,7 +123,7 @@ object PartitioningUtils { Map.empty[String, DataType] } - val dateFormatter = DateFormatter(datePartitionPattern) + val dateFormatter = DateFormatter() val timestampFormatter = TimestampFormatter(timestampPartitionPattern, timeZone) // First, we need to parse every partition's path and see if we can find partition values. val (partitionValues, optDiscoveredBasePaths) = paths.map { path => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala index c2e1a81ba3a93..864c1e99fbfb2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetPartitionDiscoverySuite.scala @@ -56,7 +56,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest with Sha val timeZone = TimeZone.getDefault() val timeZoneId = timeZone.getID - val df = DateFormatter(datePartitionPattern) + val df = DateFormatter() val tf = TimestampFormatter(timestampPartitionPattern, timeZone) protected override def beforeAll(): Unit = { From 5ae58a340fab764a0f117fee448ffda87b3feece Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 3 Jan 2019 13:56:36 +0100 Subject: [PATCH 17/19] Fix merge --- .../spark/sql/execution/HiveResult.scala | 11 +++-- .../spark/sql/execution/HiveResultSuite.scala | 40 +++++++++++++++++++ .../sql/execution/QueryExecutionSuite.scala | 12 ------ 3 files changed, 47 insertions(+), 16 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala index 22d3ca958a210..93287ddd8b0c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/HiveResult.scala @@ -21,7 +21,7 @@ import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -56,6 +56,10 @@ object HiveResult { result.map(_.zip(types).map(toHiveString)).map(_.mkString("\t")) } + private lazy val dateFormatter = DateFormatter() + private lazy val timestampFormatter = TimestampFormatter( + DateTimeUtils.getTimeZone(SQLConf.get.sessionLocalTimeZone)) + /** Formats a datum (based on the given data type) and returns the string representation. */ private def toHiveString(a: (Any, DataType)): String = { val primitiveTypes = Seq(StringType, IntegerType, LongType, DoubleType, FloatType, @@ -103,10 +107,9 @@ object HiveResult { toHiveStructString((key, kType)) + ":" + toHiveStructString((value, vType)) }.toSeq.sorted.mkString("{", ",", "}") case (null, _) => "NULL" - case (d: Date, DateType) => - DateTimeUtils.dateToString(DateTimeUtils.fromJavaDate(d)) + case (d: Date, DateType) => dateFormatter.format(DateTimeUtils.fromJavaDate(d)) case (t: Timestamp, TimestampType) => - DateTimeUtils.timestampToString(DateTimeUtils.fromJavaTimestamp(t), timeZone) + DateTimeUtils.timestampToString(timestampFormatter, DateTimeUtils.fromJavaTimestamp(t)) case (bin: Array[Byte], BinaryType) => new String(bin, StandardCharsets.UTF_8) case (decimal: java.math.BigDecimal, DecimalType()) => formatDecimal(decimal) case (interval, CalendarIntervalType) => interval.toString diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala new file mode 100644 index 0000000000000..f46bad38fe9df --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.sql.{Date, Timestamp} + +import org.apache.spark.sql.test.SharedSQLContext + +class HiveResultSuite extends SharedSQLContext { + import testImplicits._ + + test("date formatting in hive result") { + val date = "2018-12-28" + val executedPlan = Seq(Date.valueOf(date)).toDS().queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) + assert(result.head == date) + } + + test("timestamp formatting in hive result") { + val timestamp = "2018-12-28 01:02:03" + val executedPlan = Seq(Timestamp.valueOf(timestamp)).toDS().queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) + assert(result.head == timestamp) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 35b2d0390411c..e7585203ea978 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -121,18 +121,6 @@ class QueryExecutionSuite extends SharedSQLContext { } } - test("date formatting in hive result") { - val date = "2018-12-28" - val result = Seq(Date.valueOf(date)).toDS().queryExecution.hiveResultString() - assert(result.head == date) - } - - test("timestamp formatting in hive result") { - val timestamp = "2018-12-28 01:02:03" - val result = Seq(Timestamp.valueOf(timestamp)).toDS().queryExecution.hiveResultString() - assert(result.head == timestamp) - } - test("toString() exception/error handling") { spark.experimental.extraStrategies = Seq( new SparkStrategy { From 2397401536ba03aa52e6a29122d9337f00726197 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 7 Jan 2019 12:10:49 +0100 Subject: [PATCH 18/19] Fix merge --- .../spark/sql/execution/HiveResultSuite.scala | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala index 4205b3f79a972..bbce4705871df 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/HiveResultSuite.scala @@ -17,10 +17,27 @@ package org.apache.spark.sql.execution +import java.sql.{Date, Timestamp} + import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT} +import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} + +class HiveResultSuite extends SparkFunSuite with SharedSQLContext { + import testImplicits._ -class HiveResultSuite extends SparkFunSuite { + test("date formatting in hive result") { + val date = "2018-12-28" + val executedPlan = Seq(Date.valueOf(date)).toDS().queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) + assert(result.head == date) + } + + test("timestamp formatting in hive result") { + val timestamp = "2018-12-28 01:02:03" + val executedPlan = Seq(Timestamp.valueOf(timestamp)).toDS().queryExecution.executedPlan + val result = HiveResult.hiveResultString(executedPlan) + assert(result.head == timestamp) + } test("toHiveString correctly handles UDTs") { val point = new ExamplePoint(50.0, 50.0) From 483e95b837c4e40d6ed00e714a811cfe5481ead5 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Mon, 14 Jan 2019 09:00:43 +0100 Subject: [PATCH 19/19] Remove unused import --- .../org/apache/spark/sql/execution/QueryExecutionSuite.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index e7585203ea978..3cc97c995702a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.sql.execution -import java.sql.{Date, Timestamp} - import scala.io.Source import org.apache.spark.sql.AnalysisException