-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4205][SQL] Timestamp and Date classes which work in the catalyst DSL. #3066
Changes from 2 commits
495eacd
0c389a7
d6e4c59
001acc4
76386e1
7c8b2c0
2aca97c
3cca196
75690de
df607da
2b6e1ce
24544fb
c238fb4
e83f13e
25bef7e
2812815
15b58a2
97a466e
4f035dd
c5912ec
04450d1
39b8ad1
3dd0da9
1a9c6cd
9bdc841
b671ce0
e4f4263
bcecd73
f90ad5d
5e73138
515abb9
c8abddc
5f13759
73d8017
a911240
5b3b6f6
3a9e31b
4c42986
a46497e
f37817b
61a5cce
868cd4c
f7ac8c2
cb0eae3
c315d13
3d2b5bc
db45f5a
5f27ae1
b41a39e
6e87d72
a5205b5
76a18dc
b6a4374
0dc0ff0
23eaf0e
d15c6e9
0d2e389
470881b
96136f2
6e9ef10
f165b2b
48a19a6
3abdb1b
c3f9ce1
908fc6a
dc2ed72
b003619
a006ddb
1935289
d59e5d9
43406fe
c304b16
32df474
d52e6d7
45f9478
f126042
2ec6f6b
3a14915
d0a27ab
098bb5d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,9 +125,9 @@ package object dsl { | |
implicit def floatToLiteral(f: Float) = Literal(f) | ||
implicit def doubleToLiteral(d: Double) = Literal(d) | ||
implicit def stringToLiteral(s: String) = Literal(s) | ||
implicit def dateToLiteral(d: Date) = Literal(d) | ||
implicit def bigDecimalToLiteral(d: BigDecimal) = Literal(d) | ||
implicit def decimalToLiteral(d: Decimal) = Literal(d) | ||
implicit def dateToLiteral(d: Date) = Literal(d) | ||
implicit def timestampToLiteral(t: Timestamp) = Literal(t) | ||
implicit def binaryToLiteral(a: Array[Byte]) = Literal(a) | ||
|
||
|
@@ -146,6 +146,31 @@ package object dsl { | |
def upper(e: Expression) = Upper(e) | ||
def lower(e: Expression) = Lower(e) | ||
|
||
/* | ||
* Conversions to provide the standard operators in the special case | ||
* where a literal is being combined with a symbol. Without these an | ||
* expression such as 0 < 'x is not recognized. | ||
*/ | ||
implicit class InitialLiteral(x: Any) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about case class LhsLiteral(x: Any) {
...
}
// Similar to all the xxToLiteral implicit conversions:
implicit booleanToLhsLiteral(b: Boolean) = new LhsLiteral(b)
implicit byteToLhsLiteral(b: Byte) = new LhsLiteral(b)
... |
||
val literal = Literal(x) | ||
def + (other: Symbol):Expression = {literal + other} | ||
def - (other: Symbol):Expression = {literal - other} | ||
def * (other: Symbol):Expression = {literal * other} | ||
def / (other: Symbol):Expression = {literal / other} | ||
def % (other: Symbol):Expression = {literal % other} | ||
|
||
def && (other: Symbol):Expression = {literal && other} | ||
def || (other: Symbol):Expression = {literal || other} | ||
|
||
def < (other: Symbol):Expression = {literal < other} | ||
def <= (other: Symbol):Expression = {literal <= other} | ||
def > (other: Symbol):Expression = {literal > other} | ||
def >= (other: Symbol):Expression = {literal >= other} | ||
def === (other: Symbol):Expression = {literal === other} | ||
def <=> (other: Symbol):Expression = {literal <=> other} | ||
def !== (other: Symbol):Expression = {literal !== other} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a space after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks. |
||
} | ||
|
||
implicit class DslSymbol(sym: Symbol) extends ImplicitAttribute { def s = sym.name } | ||
// TODO more implicit class for literal? | ||
implicit class DslString(val s: String) extends ImplicitOperators { | ||
|
@@ -182,16 +207,16 @@ package object dsl { | |
/** Creates a new AttributeReference of type string */ | ||
def string = AttributeReference(s, StringType, nullable = true)() | ||
|
||
/** Creates a new AttributeReference of type date */ | ||
def date = AttributeReference(s, DateType, nullable = true)() | ||
|
||
/** Creates a new AttributeReference of type decimal */ | ||
def decimal = AttributeReference(s, DecimalType.Unlimited, nullable = true)() | ||
|
||
/** Creates a new AttributeReference of type decimal */ | ||
def decimal(precision: Int, scale: Int) = | ||
AttributeReference(s, DecimalType(precision, scale), nullable = true)() | ||
|
||
/** Creates a new AttributeReference of type date */ | ||
def date = AttributeReference(s, DateType, nullable = true)() | ||
|
||
/** Creates a new AttributeReference of type timestamp */ | ||
def timestamp = AttributeReference(s, TimestampType, nullable = true)() | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ | |
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import java.sql.{Date, Timestamp} | ||
|
||
/** | ||
* A [[Projection]] that is calculated by calling the `eval` of each of the specified expressions. | ||
|
@@ -139,6 +140,12 @@ class JoinedRow extends Row { | |
def getString(i: Int): String = | ||
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) | ||
|
||
def getDate(i: Int): Date = | ||
if (i < row1.size) row1.getDate(i) else row2.getDate(i - row1.size) | ||
|
||
def getTimestamp(i: Int): Timestamp = | ||
if (i < row1.size) row1.getTimestamp(i) else row2.getTimestamp(i - row1.size) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Getters like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And you can always replace these specialized getters with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. I am removing them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry for not being clear enough, all getters/setters for
|
||
override def getAs[T](i: Int): T = | ||
if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) | ||
|
||
|
@@ -231,6 +238,13 @@ class JoinedRow2 extends Row { | |
def getString(i: Int): String = | ||
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) | ||
|
||
|
||
def getDate(i: Int): Date = | ||
if (i < row1.size) row1.getDate(i) else row2.getDate(i - row1.size) | ||
|
||
def getTimestamp(i: Int): Timestamp = | ||
if (i < row1.size) row1.getTimestamp(i) else row2.getTimestamp(i - row1.size) | ||
|
||
override def getAs[T](i: Int): T = | ||
if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) | ||
|
||
|
@@ -317,6 +331,13 @@ class JoinedRow3 extends Row { | |
def getString(i: Int): String = | ||
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) | ||
|
||
|
||
def getDate(i: Int): Date = | ||
if (i < row1.size) row1.getDate(i) else row2.getDate(i - row1.size) | ||
|
||
def getTimestamp(i: Int): Timestamp = | ||
if (i < row1.size) row1.getTimestamp(i) else row2.getTimestamp(i - row1.size) | ||
|
||
override def getAs[T](i: Int): T = | ||
if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) | ||
|
||
|
@@ -403,6 +424,13 @@ class JoinedRow4 extends Row { | |
def getString(i: Int): String = | ||
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) | ||
|
||
|
||
def getDate(i: Int): Date = | ||
if (i < row1.size) row1.getDate(i) else row2.getDate(i - row1.size) | ||
|
||
def getTimestamp(i: Int): Timestamp = | ||
if (i < row1.size) row1.getTimestamp(i) else row2.getTimestamp(i - row1.size) | ||
|
||
override def getAs[T](i: Int): T = | ||
if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) | ||
|
||
|
@@ -489,6 +517,13 @@ class JoinedRow5 extends Row { | |
def getString(i: Int): String = | ||
if (i < row1.size) row1.getString(i) else row2.getString(i - row1.size) | ||
|
||
|
||
def getDate(i: Int): Date = | ||
if (i < row1.size) row1.getDate(i) else row2.getDate(i - row1.size) | ||
|
||
def getTimestamp(i: Int): Timestamp = | ||
if (i < row1.size) row1.getTimestamp(i) else row2.getTimestamp(i - row1.size) | ||
|
||
override def getAs[T](i: Int): T = | ||
if (i < row1.size) row1.getAs[T](i) else row2.getAs[T](i - row1.size) | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,9 @@ | |
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import org.apache.spark.sql.catalyst.types.NativeType | ||
import org.apache.spark.sql.catalyst.types._ | ||
import java.sql.{Date, Timestamp} | ||
import java.math.BigDecimal | ||
|
||
object Row { | ||
/** | ||
|
@@ -42,6 +44,31 @@ object Row { | |
* This method can be used to construct a [[Row]] from a [[Seq]] of values. | ||
*/ | ||
def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray) | ||
|
||
/** | ||
* This method can be used to construct a [[Row]] from a [[Seq]] of Strings, | ||
* converting each item to the type specified in a [[StructType]] schema. | ||
* Only primitive types can be used. | ||
*/ | ||
def fromStringsBySchema(strings: Seq[String], schema: StructType): Row = { | ||
val values = for { | ||
(field, str) <- schema.fields zip strings | ||
item = field.dataType match { | ||
case IntegerType => str.toInt | ||
case LongType => str.toLong | ||
case DoubleType => str.toDouble | ||
case FloatType => str.toFloat | ||
case ByteType => str.toByte | ||
case ShortType => str.toShort | ||
case StringType => str | ||
case BooleanType => (str != "") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can be rather error prone: string |
||
case DateType => Date.valueOf(str) | ||
case TimestampType => Timestamp.valueOf(str) | ||
case DecimalType() => new BigDecimal(str) | ||
} | ||
} yield item | ||
new GenericRow(values.toArray) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would you mind to elaborate on the reason why we need this function? Esp. I don't see it's used/tested in this PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It doesn't need to be a Row method. I will move it outside ( and worry about the Boolean case.) There is an issue lurking here, however. A SchemaRDD should have Rows which match its schema. But mismatches generate no errors until the last stage, e.g. when one collects the results of a query. My thought was that it would be helpful to have a "safe" Row constructor which would check its data against a schema and raise an error if they do not match (after applying whatever implicit conversions are available. (Yes, this is a separate issue and does not belong here.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK I see you point. However, |
||
} | ||
|
||
/** | ||
|
@@ -64,6 +91,8 @@ trait Row extends Seq[Any] with Serializable { | |
def getShort(i: Int): Short | ||
def getByte(i: Int): Byte | ||
def getString(i: Int): String | ||
def getDate(i: Int): Date | ||
def getTimestamp(i: Int): Timestamp | ||
def getAs[T](i: Int): T = apply(i).asInstanceOf[T] | ||
|
||
override def toString() = | ||
|
@@ -99,6 +128,8 @@ trait MutableRow extends Row { | |
def setByte(ordinal: Int, value: Byte) | ||
def setFloat(ordinal: Int, value: Float) | ||
def setString(ordinal: Int, value: String) | ||
def setDate(ordinal: Int, value: Date) | ||
def setTimestamp(ordinal: Int, value: Timestamp) | ||
} | ||
|
||
/** | ||
|
@@ -119,6 +150,9 @@ object EmptyRow extends Row { | |
def getShort(i: Int): Short = throw new UnsupportedOperationException | ||
def getByte(i: Int): Byte = throw new UnsupportedOperationException | ||
def getString(i: Int): String = throw new UnsupportedOperationException | ||
def getDate(i: Int): Date = throw new UnsupportedOperationException | ||
def getTimestamp(i: Int): Timestamp = throw new UnsupportedOperationException | ||
|
||
override def getAs[T](i: Int): T = throw new UnsupportedOperationException | ||
|
||
def copy() = this | ||
|
@@ -183,6 +217,16 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row { | |
values(i).asInstanceOf[String] | ||
} | ||
|
||
def getDate(i: Int): Date = { | ||
if (values(i) == null) sys.error("Failed to check null bit for primitive String value.") | ||
values(i).asInstanceOf[Date] | ||
} | ||
|
||
def getTimestamp(i: Int): Timestamp = { | ||
if (values(i) == null) sys.error("Failed to check null bit for primitive String value.") | ||
values(i).asInstanceOf[Timestamp] | ||
} | ||
|
||
// Custom hashCode function that matches the efficient code generated version. | ||
override def hashCode(): Int = { | ||
var result: Int = 37 | ||
|
@@ -226,6 +270,8 @@ class GenericMutableRow(size: Int) extends GenericRow(size) with MutableRow { | |
override def setInt(ordinal: Int, value: Int): Unit = { values(ordinal) = value } | ||
override def setLong(ordinal: Int, value: Long): Unit = { values(ordinal) = value } | ||
override def setString(ordinal: Int, value: String): Unit = { values(ordinal) = value } | ||
override def setDate(ordinal: Int,value: Date): Unit = { values(ordinal) = value } | ||
override def setTimestamp(ordinal: Int,value: Timestamp): Unit = { values(ordinal) = value } | ||
|
||
override def setNullAt(i: Int): Unit = { values(i) = null } | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -92,9 +92,9 @@ object DataType { | |
| "LongType" ^^^ LongType | ||
| "BinaryType" ^^^ BinaryType | ||
| "BooleanType" ^^^ BooleanType | ||
| "DateType" ^^^ DateType | ||
| "DecimalType()" ^^^ DecimalType.Unlimited | ||
| fixedDecimalType | ||
| "DateType" ^^^ DateType | ||
| "TimestampType" ^^^ TimestampType | ||
) | ||
|
||
|
@@ -187,7 +187,8 @@ case object NullType extends DataType | |
|
||
object NativeType { | ||
val all = Seq( | ||
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType) | ||
IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, | ||
ByteType, StringType, DateType, TimestampType) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The original line doesn't exceed 100 columns thus doesn't require a wrap. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, but the new line does exceed 100 columns due to the addition of two new items DateType and TimestampType. (Or so the compiler tells me.) Perhaps there is a reason why these types should not be regarded as "Native" but I could not see any such reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yes, sorry I missed the newly added |
||
|
||
def unapply(dt: DataType): Boolean = all.contains(dt) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess you moved this line here because
DateType
andTimestampType
are more tightly related. This is consideration is legitimate and I'd also choose this order if I were the original author. However, this change introduces larger probability of future merge conflicts, while the benefit is rather limited. Same principle applies to all other similar reordering changes related toDate
andTimestamp
in this PR.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. Well, the truth is that I was originally working with the source for Spark 1.1.0. When I pulled the current code it turned out that many lines such as this one had already been added after the release of 1.1.0, so I had to go through and remove many duplicates. In the course of that process it seemed better for me to have some logical scheme to guide me in choosing which of the two duplicates to delete.