Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
phatak-dev committed Apr 23, 2015
2 parents 2a802c6 + f60bece commit 00bc819
Show file tree
Hide file tree
Showing 24 changed files with 159 additions and 153 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.catalyst

import java.sql.Timestamp

import org.apache.spark.util.Utils
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
Expand Down Expand Up @@ -110,7 +108,7 @@ trait ScalaReflection {
StructField(p.name.toString, dataType, nullable)
}), nullable = true)
case t if t <:< typeOf[String] => Schema(StringType, nullable = true)
case t if t <:< typeOf[Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[java.sql.Timestamp] => Schema(TimestampType, nullable = true)
case t if t <:< typeOf[java.sql.Date] => Schema(DateType, nullable = true)
case t if t <:< typeOf[BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
case t if t <:< typeOf[java.math.BigDecimal] => Schema(DecimalType.Unlimited, nullable = true)
Expand All @@ -136,20 +134,20 @@ trait ScalaReflection {

def typeOfObject: PartialFunction[Any, DataType] = {
// The data type can be determined without ambiguity.
case obj: BooleanType.JvmType => BooleanType
case obj: BinaryType.JvmType => BinaryType
case obj: Boolean => BooleanType
case obj: Array[Byte] => BinaryType
case obj: String => StringType
case obj: StringType.JvmType => StringType
case obj: ByteType.JvmType => ByteType
case obj: ShortType.JvmType => ShortType
case obj: IntegerType.JvmType => IntegerType
case obj: LongType.JvmType => LongType
case obj: FloatType.JvmType => FloatType
case obj: DoubleType.JvmType => DoubleType
case obj: UTF8String => StringType
case obj: Byte => ByteType
case obj: Short => ShortType
case obj: Int => IntegerType
case obj: Long => LongType
case obj: Float => FloatType
case obj: Double => DoubleType
case obj: java.sql.Date => DateType
case obj: java.math.BigDecimal => DecimalType.Unlimited
case obj: Decimal => DecimalType.Unlimited
case obj: TimestampType.JvmType => TimestampType
case obj: java.sql.Timestamp => TimestampType
case null => NullType
// For other cases, there is no obvious mapping from the type of the given object to a
// Catalyst data type. A user should provide his/her specific rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ case class MaxOf(left: Expression, right: Expression) extends Expression {
}

lazy val ordering = left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}

Expand Down Expand Up @@ -391,7 +391,7 @@ case class MinOf(left: Expression, right: Expression) extends Expression {
}

lazy val ordering = left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,7 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
protected def getColumn(inputRow: TermName, dataType: DataType, ordinal: Int) = {
dataType match {
case StringType => q"$inputRow($ordinal).asInstanceOf[org.apache.spark.sql.types.UTF8String]"
case dt @ NativeType() => q"$inputRow.${accessorForType(dt)}($ordinal)"
case dt: DataType if isNativeType(dt) => q"$inputRow.${accessorForType(dt)}($ordinal)"
case _ => q"$inputRow.apply($ordinal).asInstanceOf[${termForType(dataType)}]"
}
}
Expand All @@ -635,7 +635,8 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
value: TermName) = {
dataType match {
case StringType => q"$destinationRow.update($ordinal, $value)"
case dt @ NativeType() => q"$destinationRow.${mutatorForType(dt)}($ordinal, $value)"
case dt: DataType if isNativeType(dt) =>
q"$destinationRow.${mutatorForType(dt)}($ordinal, $value)"
case _ => q"$destinationRow.update($ordinal, $value)"
}
}
Expand Down Expand Up @@ -675,7 +676,18 @@ abstract class CodeGenerator[InType <: AnyRef, OutType <: AnyRef] extends Loggin
}

protected def termForType(dt: DataType) = dt match {
case n: NativeType => n.tag
case n: AtomicType => n.tag
case _ => typeTag[Any]
}

/**
* List of data types that have special accessors and setters in [[Row]].
*/
protected val nativeTypes =
Seq(IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)

/**
* Returns true if the data type has a special accessor and setter in [[Row]].
*/
protected def isNativeType(dt: DataType) = nativeTypes.contains(dt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
q"override def update(i: Int, value: Any): Unit = { ..$cases; $accessorFailure }"
}

val specificAccessorFunctions = NativeType.all.map { dataType =>
val specificAccessorFunctions = nativeTypes.map { dataType =>
val ifStatements = expressions.zipWithIndex.flatMap {
// getString() is not used by expressions
case (e, i) if e.dataType == dataType && dataType != StringType =>
Expand All @@ -135,7 +135,7 @@ object GenerateProjection extends CodeGenerator[Seq[Expression], Projection] {
}
}

val specificMutatorFunctions = NativeType.all.map { dataType =>
val specificMutatorFunctions = nativeTypes.map { dataType =>
val ifStatements = expressions.zipWithIndex.flatMap {
// setString() is not used by expressions
case (e, i) if e.dataType == dataType && dataType != StringType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.analysis.UnresolvedException
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, BinaryType, BooleanType, NativeType}
import org.apache.spark.sql.types.{DataType, BinaryType, BooleanType, AtomicType}

object InterpretedPredicate {
def create(expression: Expression, inputSchema: Seq[Attribute]): (Row => Boolean) =
Expand Down Expand Up @@ -211,7 +211,7 @@ case class LessThan(left: Expression, right: Expression) extends BinaryCompariso
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -240,7 +240,7 @@ case class LessThanOrEqual(left: Expression, right: Expression) extends BinaryCo
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -269,7 +269,7 @@ case class GreaterThan(left: Expression, right: Expression) extends BinaryCompar
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down Expand Up @@ -298,7 +298,7 @@ case class GreaterThanOrEqual(left: Expression, right: Expression) extends Binar
s"Types do not match ${left.dataType} != ${right.dataType}")
}
left.dataType match {
case i: NativeType => i.ordering.asInstanceOf[Ordering[Any]]
case i: AtomicType => i.ordering.asInstanceOf[Ordering[Any]]
case other => sys.error(s"Type $other does not support ordered operations")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.types.{UTF8String, DataType, StructType, NativeType}
import org.apache.spark.sql.types.{UTF8String, DataType, StructType, AtomicType}

/**
* An extended interface to [[Row]] that allows the values for each column to be updated. Setting
Expand Down Expand Up @@ -227,9 +227,9 @@ class RowOrdering(ordering: Seq[SortOrder]) extends Ordering[Row] {
return if (order.direction == Ascending) 1 else -1
} else {
val comparison = order.dataType match {
case n: NativeType if order.direction == Ascending =>
case n: AtomicType if order.direction == Ascending =>
n.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case n: NativeType if order.direction == Descending =>
case n: AtomicType if order.direction == Descending =>
n.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other => sys.error(s"Type $other does not support ordered operations")
}
Expand Down
Loading

0 comments on commit 00bc819

Please sign in to comment.