Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4508] [SQL] build native date type to conform behavior to Hive #4325

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow

import org.apache.spark.sql.types.DateUtils

object Row {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.catalyst

import java.sql.{Date, Timestamp}
import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._


/**
* A default version of ScalaReflection that uses the runtime universe.
*/
Expand Down Expand Up @@ -67,6 +66,7 @@ trait ScalaReflection {
}.toArray)
case (d: BigDecimal, _) => Decimal(d)
case (d: java.math.BigDecimal, _) => Decimal(d)
case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
case (other, _) => other
}

Expand All @@ -80,6 +80,7 @@ trait ScalaReflection {
}
case (r: Row, s: StructType) => convertRowToScala(r, s)
case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
case (i: Int, DateType) => DateUtils.toJavaDate(i)
case (other, _) => other
}

Expand Down Expand Up @@ -152,7 +153,7 @@ trait ScalaReflection {
valueDataType, valueContainsNull = valueNullable), nullable = true)
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true)
Expand Down Expand Up @@ -184,7 +185,7 @@ trait ScalaReflection {
case obj: LongType.JvmType => LongType
case obj: FloatType.JvmType => FloatType
case obj: DoubleType.JvmType => DoubleType
case obj: DateType.JvmType => DateType
case obj: java.sql.Date => DateType
case obj: java.math.BigDecimal => DecimalType.Unlimited
case obj: Decimal => DecimalType.Unlimited
case obj: TimestampType.JvmType => TimestampType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ class SqlParser extends AbstractSparkSQLParser {
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val COUNT = Keyword("COUNT")
protected val DATE = Keyword("DATE")
protected val DECIMAL = Keyword("DECIMAL")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
Expand Down Expand Up @@ -381,6 +382,7 @@ class SqlParser extends AbstractSparkSQLParser {
| DOUBLE ^^^ DoubleType
| fixedDecimalType
| DECIMAL ^^^ DecimalType.Unlimited
| DATE ^^^ DateType
)

protected lazy val fixedDecimalType: Parser[DataType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
case DateType => buildCast[Date](_, dateToString)
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
case TimestampType => buildCast[Timestamp](_, timestampToString)
case _ => buildCast[Any](_, _.toString)
}
Expand All @@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
case DateType =>
// Hive would return null when cast from date to boolean
buildCast[Date](_, d => null)
buildCast[Int](_, d => null)
case LongType =>
buildCast[Long](_, _ != 0)
case IntegerType =>
Expand Down Expand Up @@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => new Timestamp(b))
case DateType =>
buildCast[Date](_, d => new Timestamp(d.getTime))
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand Down Expand Up @@ -224,37 +224,24 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}
}

// Converts Timestamp to string according to Hive TimestampWritable convention
private[this] def timestampToDateString(ts: Timestamp): String = {
Cast.threadLocalDateFormat.get.format(ts)
}

// DateConverter
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[String](_, s =>
try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null })
try DateUtils.fromJavaDate(Date.valueOf(s))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000))
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception, so we can return null
// It is never possible to compare result when hive return with exception,
// so we can return null
// NULL is more reasonable here, since the query itself obeys the grammar.
case _ => _ => null
}

// Date cannot be cast to long, according to hive
private[this] def dateToLong(d: Date) = null

// Date cannot be cast to double, according to hive
private[this] def dateToDouble(d: Date) = null

// Converts Date to string according to Hive DateWritable convention
private[this] def dateToString(d: Date): String = {
Cast.threadLocalDateFormat.get.format(d)
}

// LongConverter
private[this] def castToLong(from: DataType): Any => Any = from match {
case StringType =>
Expand All @@ -264,7 +251,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1L else 0L)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t))
case x: NumericType =>
Expand All @@ -280,7 +267,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1 else 0)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
case x: NumericType =>
Expand All @@ -296,7 +283,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
case x: NumericType =>
Expand All @@ -312,7 +299,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
case x: NumericType =>
Expand Down Expand Up @@ -342,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target))
case DateType =>
buildCast[Date](_, d => null) // date can't cast to decimal in Hive
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
case TimestampType =>
// Note that we lose precision here.
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
Expand All @@ -367,7 +354,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1d else 0d)
case DateType =>
buildCast[Date](_, d => dateToDouble(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t))
case x: NumericType =>
Expand All @@ -383,7 +370,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1f else 0f)
case DateType =>
buildCast[Date](_, d => dateToDouble(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
case x: NumericType =>
Expand Down Expand Up @@ -442,16 +429,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w

object Cast {
// `SimpleDateFormat` is not thread-safe.
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
override def initialValue() = {
new SimpleDateFormat("yyyy-MM-dd")
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}
}

// `SimpleDateFormat` is not thread-safe.
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
override def initialValue() = {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
new SimpleDateFormat("yyyy-MM-dd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]])
""".children

case Cast(child @ DateType(), StringType) =>
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)

case Cast(child @ NumericType(), IntegerType) =>
child.castOrNull(c => q"$c.toInt", IntegerType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Literal {
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
case t: Timestamp => Literal(t, TimestampType)
case d: Date => Literal(d, DateType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

import java.sql.Date
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
*/
object DateUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark it private[spark]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be useful even outside of spark.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not necessary, just do not make it public, or we can not change it anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code gen need that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That make sense, thanks!

private val MILLIS_PER_DAY = 86400000

// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
override protected def initialValue: TimeZone = {
Calendar.getInstance.getTimeZone
}
}

private def javaDateToDays(d: Date): Int = {
millisToDays(d.getTime)
}

def millisToDays(millisLocal: Long): Int = {
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
}

private def toMillisSinceEpoch(days: Int): Long = {
val millisUtc = days.toLong * MILLIS_PER_DAY
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
}

def fromJavaDate(date: java.sql.Date): Int = {
javaDateToDays(date)
}

def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
}

def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.types

import java.sql.{Date, Timestamp}
import java.sql.Timestamp

import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
Expand Down Expand Up @@ -387,18 +387,16 @@ case object TimestampType extends NativeType {
*/
@DeveloperApi
case object DateType extends NativeType {
private[sql] type JvmType = Date
private[sql] type JvmType = Int

@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }

private[sql] val ordering = new Ordering[JvmType] {
def compare(x: Date, y: Date) = x.compareTo(y)
}
private[sql] val ordering = implicitly[Ordering[JvmType]]

/**
* The default size of a value of the DateType is 8 bytes.
* The default size of a value of the DateType is 4 bytes.
*/
override def defaultSize: Int = 8
override def defaultSize: Int = 4
}


Expand Down
Loading