Skip to content

Commit

Permalink
[SPARK-4508] [SQL] build native date type to conform behavior to Hive
Browse files Browse the repository at this point in the history
Store daysSinceEpoch as an Int value(4 bytes) to represent DateType, instead of using java.sql.Date(8 bytes as Long) in catalyst row. This ensures the same comparison behavior of Hive and Catalyst.
Subsumes apache#3381
I thinks there are already some tests in JavaSQLSuite, and for python it will not affect python's datetime class.

Author: Daoyuan Wang <daoyuan.wang@intel.com>

Closes apache#3732 from adrian-wang/datenative and squashes the following commits:

0ed0fdc [Daoyuan Wang] fix test data
a2fdd4e [Daoyuan Wang] getDate
c37832b [Daoyuan Wang] row to catalyst
f0005b1 [Daoyuan Wang] add date in sql parser and java type conversion
024c9a6 [Daoyuan Wang] clean some import order
d6715fc [Daoyuan Wang] refactoring Date as Primitive Int internally
374abd5 [Daoyuan Wang] spark native date type support
  • Loading branch information
adrian-wang authored and marmbrus committed Feb 2, 2015
1 parent 8309349 commit 1646f89
Show file tree
Hide file tree
Showing 49 changed files with 191 additions and 112 deletions.
2 changes: 1 addition & 1 deletion sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql
import scala.util.hashing.MurmurHash3

import org.apache.spark.sql.catalyst.expressions.GenericRow

import org.apache.spark.sql.types.DateUtils

object Row {
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.catalyst

import java.sql.{Date, Timestamp}
import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Attribute, AttributeReference, Row}
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.types._


/**
* A default version of ScalaReflection that uses the runtime universe.
*/
Expand Down Expand Up @@ -72,6 +71,7 @@ trait ScalaReflection {
}.toArray)
case (d: BigDecimal, _) => Decimal(d)
case (d: java.math.BigDecimal, _) => Decimal(d)
case (d: java.sql.Date, _) => DateUtils.fromJavaDate(d)
case (other, _) => other
}

Expand All @@ -85,6 +85,7 @@ trait ScalaReflection {
}
case (r: Row, s: StructType) => convertRowToScala(r, s)
case (d: Decimal, _: DecimalType) => d.toJavaBigDecimal
case (i: Int, DateType) => DateUtils.toJavaDate(i)
case (other, _) => other
}

Expand Down Expand Up @@ -159,7 +160,7 @@ trait ScalaReflection {
valueDataType, valueContainsNull = valueNullable), nullable = true)
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[Decimal] => Schema(DecimalType.Unlimited, nullable = true)
Expand Down Expand Up @@ -191,7 +192,7 @@ trait ScalaReflection {
case obj: LongType.JvmType => LongType
case obj: FloatType.JvmType => FloatType
case obj: DoubleType.JvmType => DoubleType
case obj: DateType.JvmType => DateType
case obj: java.sql.Date => DateType
case obj: java.math.BigDecimal => DecimalType.Unlimited
case obj: Decimal => DecimalType.Unlimited
case obj: TimestampType.JvmType => TimestampType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class SqlParser extends AbstractSparkSQLParser {
protected val CAST = Keyword("CAST")
protected val COALESCE = Keyword("COALESCE")
protected val COUNT = Keyword("COUNT")
protected val DATE = Keyword("DATE")
protected val DECIMAL = Keyword("DECIMAL")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
Expand Down Expand Up @@ -383,6 +384,7 @@ class SqlParser extends AbstractSparkSQLParser {
| DOUBLE ^^^ DoubleType
| fixedDecimalType
| DECIMAL ^^^ DecimalType.Unlimited
| DATE ^^^ DateType
)

protected lazy val fixedDecimalType: Parser[DataType] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
// UDFToString
private[this] def castToString(from: DataType): Any => Any = from match {
case BinaryType => buildCast[Array[Byte]](_, new String(_, "UTF-8"))
case DateType => buildCast[Date](_, dateToString)
case DateType => buildCast[Int](_, d => DateUtils.toString(d))
case TimestampType => buildCast[Timestamp](_, timestampToString)
case _ => buildCast[Any](_, _.toString)
}
Expand All @@ -131,7 +131,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
buildCast[Timestamp](_, t => t.getTime() != 0 || t.getNanos() != 0)
case DateType =>
// Hive would return null when cast from date to boolean
buildCast[Date](_, d => null)
buildCast[Int](_, d => null)
case LongType =>
buildCast[Long](_, _ != 0)
case IntegerType =>
Expand Down Expand Up @@ -171,7 +171,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case ByteType =>
buildCast[Byte](_, b => new Timestamp(b))
case DateType =>
buildCast[Date](_, d => new Timestamp(d.getTime))
buildCast[Int](_, d => new Timestamp(DateUtils.toJavaDate(d).getTime))
// TimestampWritable.decimalToTimestamp
case DecimalType() =>
buildCast[Decimal](_, d => decimalToTimestamp(d))
Expand Down Expand Up @@ -224,37 +224,24 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
}
}

// Converts Timestamp to string according to Hive TimestampWritable convention
private[this] def timestampToDateString(ts: Timestamp): String = {
Cast.threadLocalDateFormat.get.format(ts)
}

// DateConverter
private[this] def castToDate(from: DataType): Any => Any = from match {
case StringType =>
buildCast[String](_, s =>
try Date.valueOf(s) catch { case _: java.lang.IllegalArgumentException => null })
try DateUtils.fromJavaDate(Date.valueOf(s))
catch { case _: java.lang.IllegalArgumentException => null }
)
case TimestampType =>
// throw valid precision more than seconds, according to Hive.
// Timestamp.nanos is in 0 to 999,999,999, no more than a second.
buildCast[Timestamp](_, t => new Date(Math.floor(t.getTime / 1000.0).toLong * 1000))
buildCast[Timestamp](_, t => DateUtils.millisToDays(t.getTime))
// Hive throws this exception as a Semantic Exception
// It is never possible to compare result when hive return with exception, so we can return null
// It is never possible to compare result when hive return with exception,
// so we can return null
// NULL is more reasonable here, since the query itself obeys the grammar.
case _ => _ => null
}

// Date cannot be cast to long, according to hive
private[this] def dateToLong(d: Date) = null

// Date cannot be cast to double, according to hive
private[this] def dateToDouble(d: Date) = null

// Converts Date to string according to Hive DateWritable convention
private[this] def dateToString(d: Date): String = {
Cast.threadLocalDateFormat.get.format(d)
}

// LongConverter
private[this] def castToLong(from: DataType): Any => Any = from match {
case StringType =>
Expand All @@ -264,7 +251,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1L else 0L)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t))
case x: NumericType =>
Expand All @@ -280,7 +267,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1 else 0)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toInt)
case x: NumericType =>
Expand All @@ -296,7 +283,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1.toShort else 0.toShort)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toShort)
case x: NumericType =>
Expand All @@ -312,7 +299,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1.toByte else 0.toByte)
case DateType =>
buildCast[Date](_, d => dateToLong(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToLong(t).toByte)
case x: NumericType =>
Expand Down Expand Up @@ -342,7 +329,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => changePrecision(if (b) Decimal(1) else Decimal(0), target))
case DateType =>
buildCast[Date](_, d => null) // date can't cast to decimal in Hive
buildCast[Int](_, d => null) // date can't cast to decimal in Hive
case TimestampType =>
// Note that we lose precision here.
buildCast[Timestamp](_, t => changePrecision(Decimal(timestampToDouble(t)), target))
Expand All @@ -367,7 +354,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1d else 0d)
case DateType =>
buildCast[Date](_, d => dateToDouble(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t))
case x: NumericType =>
Expand All @@ -383,7 +370,7 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
case BooleanType =>
buildCast[Boolean](_, b => if (b) 1f else 0f)
case DateType =>
buildCast[Date](_, d => dateToDouble(d))
buildCast[Int](_, d => null)
case TimestampType =>
buildCast[Timestamp](_, t => timestampToDouble(t).toFloat)
case x: NumericType =>
Expand Down Expand Up @@ -442,16 +429,16 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w

object Cast {
// `SimpleDateFormat` is not thread-safe.
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
override def initialValue() = {
new SimpleDateFormat("yyyy-MM-dd")
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}
}

// `SimpleDateFormat` is not thread-safe.
private[sql] val threadLocalTimestampFormat = new ThreadLocal[DateFormat] {
private[sql] val threadLocalDateFormat = new ThreadLocal[DateFormat] {
override def initialValue() = {
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
new SimpleDateFormat("yyyy-MM-dd")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
new String(${eval.primitiveTerm}.asInstanceOf[Array[Byte]])
""".children

case Cast(child @ DateType(), StringType) =>
child.castOrNull(c => q"org.apache.spark.sql.types.DateUtils.toString($c)", StringType)

case Cast(child @ NumericType(), IntegerType) =>
child.castOrNull(c => q"$c.toInt", IntegerType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ object Literal {
case d: java.math.BigDecimal => Literal(Decimal(d), DecimalType.Unlimited)
case d: Decimal => Literal(d, DecimalType.Unlimited)
case t: Timestamp => Literal(t, TimestampType)
case d: Date => Literal(d, DateType)
case d: Date => Literal(DateUtils.fromJavaDate(d), DateType)
case a: Array[Byte] => Literal(a, BinaryType)
case null => Literal(null, NullType)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.types

import java.sql.Date
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.catalyst.expressions.Cast

/**
* helper function to convert between Int value of days since 1970-01-01 and java.sql.Date
*/
object DateUtils {
private val MILLIS_PER_DAY = 86400000

// Java TimeZone has no mention of thread safety. Use thread local instance to be safe.
private val LOCAL_TIMEZONE = new ThreadLocal[TimeZone] {
override protected def initialValue: TimeZone = {
Calendar.getInstance.getTimeZone
}
}

private def javaDateToDays(d: Date): Int = {
millisToDays(d.getTime)
}

def millisToDays(millisLocal: Long): Int = {
((millisLocal + LOCAL_TIMEZONE.get().getOffset(millisLocal)) / MILLIS_PER_DAY).toInt
}

private def toMillisSinceEpoch(days: Int): Long = {
val millisUtc = days.toLong * MILLIS_PER_DAY
millisUtc - LOCAL_TIMEZONE.get().getOffset(millisUtc)
}

def fromJavaDate(date: java.sql.Date): Int = {
javaDateToDays(date)
}

def toJavaDate(daysSinceEpoch: Int): java.sql.Date = {
new java.sql.Date(toMillisSinceEpoch(daysSinceEpoch))
}

def toString(days: Int): String = Cast.threadLocalDateFormat.get.format(toJavaDate(days))
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.types

import java.sql.{Date, Timestamp}
import java.sql.Timestamp

import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
import scala.reflect.ClassTag
Expand Down Expand Up @@ -387,18 +387,16 @@ case object TimestampType extends NativeType {
*/
@DeveloperApi
case object DateType extends NativeType {
private[sql] type JvmType = Date
private[sql] type JvmType = Int

@transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }

private[sql] val ordering = new Ordering[JvmType] {
def compare(x: Date, y: Date) = x.compareTo(y)
}
private[sql] val ordering = implicitly[Ordering[JvmType]]

/**
* The default size of a value of the DateType is 8 bytes.
* The default size of a value of the DateType is 4 bytes.
*/
override def defaultSize: Int = 8
override def defaultSize: Int = 4
}


Expand Down
Loading

0 comments on commit 1646f89

Please sign in to comment.