-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23927][SQL] Add "sequence" expression #21155
Conversation
5cc7d87
to
8fb427c
Compare
ok to test |
cc @ueshin |
Test build #89851 has finished for PR 21155 at commit
|
val stepMonths = ctx.freshName("stepMonths") | ||
val stepMicros = ctx.freshName("stepMicros") | ||
|
||
lazy val stepScaled = ctx.freshName("stepScaled") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason to use lazy
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No reason. It's a left over, sorry.
Fixed.
Test build #89854 has finished for PR 21155 at commit
|
Test build #89852 has finished for PR 21155 at commit
|
Test build #89860 has finished for PR 21155 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you annotate the pr title as [SPARK-23927][SQL]
?
I left some comments, but since this is kind of large, let me have some more time to review thoroughly.
Thanks!
@@ -536,6 +536,15 @@ object TypeCoercion { | |||
case None => c | |||
} | |||
|
|||
case s @ Sequence(_, _, _, timeZoneId) if !haveSameType(s.children) => | |||
val types = s.children.map(_.dataType) | |||
findWiderCommonType(types) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if step is interval?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we should use ExpectsInputTypes
or ImplicitCastInputTypes
instead of TypeCoercion
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thank you. I've added a unit test and fixed coercion when the step is interval.
Regarding ImplicitCastInputTypes
I tried it, but it isn't flexible enough for my case. To support actual type coercion for both integral and temporal sequences I had to resort to TypeCoercion
. Please check the fixed version. Is there any better way how to do it?
middle: Expression, | ||
right: Expression, | ||
timeZoneId: Option[String] = None) | ||
extends TernaryExpression with TimeZoneAwareExpression { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: style
case class Sequence(
left: Expression,
middle: Expression,
right: Expression,
timeZoneId: Option[String] = None)
extends TernaryExpression with TimeZoneAwareExpression {
...
And should we use start
, stop
, step
instead of left
, middle
, right
respectively for readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about making step: Option[Expression]
so we don't need UnresolvedLiteral
, I guess. Similar usage is in ArrayJoin
expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to Option[Expression]
, removed UnresolvedLiteral
. Thanks for suggestion.
|
||
import Sequence._ | ||
|
||
def this(arg0: Expression, arg1: Expression) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start
, stop
?
def this(arg0: Expression, arg1: Expression) = | ||
this(arg0, arg1, Sequence.defaultStepExpression(arg0, arg1), None) | ||
|
||
def this(arg0: Expression, arg1: Expression, arg2: Expression) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start
, stop
, step
?
@@ -1059,3 +1063,316 @@ case class Flatten(child: Expression) extends UnaryExpression { | |||
|
|||
override def prettyName: String = "flatten" | |||
} | |||
|
|||
@ExpressionDescription( | |||
usage = """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to fix indent in the usage, arguments, examples.
object Sequence { | ||
|
||
private trait SequenceImpl { | ||
def eval(input1: Any, input2: Any, input3: Any): Any |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start
, stop
, step
?
|
||
def genCode(start: String, stop: String, step: String) | ||
(arr: String, elemType: String) | ||
(implicit ctx: CodegenContext): String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: style
def genCode
(start: String, stop: String, step: String)
(arr: String, elemType: String)
(implicit ctx: CodegenContext): String
|
||
override def genCode(start: String, stop: String, step: String) | ||
(arr: String, elemType: String) | ||
(implicit ctx: CodegenContext): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
| $i--; | ||
| $arr[$i] = ($elemType) ($start + $step * $i); | ||
| } | ||
""".stripMargin |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
} | ||
|
||
private class TemporalSequenceImpl[T: ClassTag] | ||
(dt: IntegralType, scale: Long, fromLong: Long => T, timeZone: TimeZone) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indent
Thanks you for the comments. I'll review and make changes shortly. |
Regarding
|
a7c0ccd
to
d15dad8
Compare
Test build #90207 has finished for PR 21155 at commit
|
d15dad8
to
8786192
Compare
override lazy val dataType: ArrayType = ArrayType(start.dataType, containsNull = false) | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = { | ||
val Seq(startType, stopType) = Seq(start, stop).map(_.dataType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about val StartType = start.dataType
? Then, replace stopType
with stop.dataType
at line 1540.
Test build #90330 has finished for PR 21155 at commit
|
Test build #90329 has finished for PR 21155 at commit
|
Seq(elemType, elemType) ++ | ||
stepOpt.map(_ => elemType match { | ||
case DateType | TimestampType => CalendarIntervalType | ||
case _: IntegralType => elemType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might need to override checkInputDataTypes
here to check if elemType
is one of DateType
, TimestampType
or IntegralType
. And also the validity of type of step expression.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, isn't it exactly what I had before? Just yesterday I followed @ueshin suggestion and replaced checkInputDataTypes
with ExpectsInputTypes
trait + inputTypes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both implementations basically do the same thing - check the validity of the element and step types. Unless I am missing something. There are unit tests for mixed types cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Default checkInputDataTypes
just checks if data types of children matches inputTypes
. Looks like for now here we don't have any limit. So even an invalid data type is given, checkInputDataTypes
won't complain because inputTypes
simply picks up children's data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems there are no test cases with invalid data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would fail with match error, but I see your point.
Reverted to checkInputDataTypes
, improved type check conditions, added tests for invalid data types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering what was wrong with checkInputDataTypes
, but I can't see the difference because the commit was just overwritten.
Test build #90428 has finished for PR 21155 at commit
|
Test build #90456 has finished for PR 21155 at commit
|
0a0a1e6
to
3ef9566
Compare
Test build #90892 has finished for PR 21155 at commit
|
4b8af57
to
2ccb9bb
Compare
Test build #90899 has finished for PR 21155 at commit
|
Test build #90893 has finished for PR 21155 at commit
|
Test build #90909 has finished for PR 21155 at commit
|
Any other comments on this one? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry for the super delay.
I left some comments. Thanks.
|
||
def coercibleChildren: Seq[Expression] = children.filter(_.dataType != CalendarIntervalType) | ||
|
||
def castChildrenTo(widerType: DataType): Expression = Sequence( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can use withNewChildren
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose? Copy constructor works the best in this case as it's type safe and does exactly what I need to do.
I tried to use withNewChildren
like this:
def castChildrenTo(widerType: DataType): Expression = withNewChildren(Seq(
Cast(start, widerType),
Cast(stop, widerType)) ++
stepOpt.map(step => if (step.dataType != CalendarIntervalType) Cast(step, widerType) else step))
but it doesn't work as expected for some reason. I didn't grasp all the magic it does yet, but do we really need to complicate things? Why is a copy constructor a bad choice in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just wanted to confirm if we can use withNewChildren
or not, and it's okay to use the copy constructor if it's hard to use the method.
| final int $stepSign = $stopMicros > $startMicros ? +1 : -1; | ||
| final long $exclusiveItem = $stopMicros + $stepSign; | ||
| final java.util.TimeZone $genTimeZone = | ||
| java.util.TimeZone.getTimeZone("${timeZone.getID}"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should use ctx.addReferenctObj("timeZone", timeZone)
instead to follow the implementations of datetime expressions? Doing TimeZone.getTimeZone()
for every row should be slow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Done.
|
||
val len = if (start == stop) 1L else 1L + (stop.toLong - start.toLong) / step.toLong | ||
|
||
require(len <= Int.MaxValue, s"Too long sequence: $len. Should be <= ${Int.MaxValue}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ByteArrayMethods.MAX_ROUNDED_ARRAY_LENGTH
instead of Int.MaxValue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
s""" | ||
|if (!($step > 0 && $start <= $stop|| | ||
| $step < 0 && $start >= $stop|| | ||
| $step == 0 && $start == $stop)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add more parenthesis to be clear the boundaries between &&
and ||
? The combinations of &&
and ||
without parenthesis may cause an unexpected behavior.
( ... && ... ) || ( ... && ... ) || ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
require( | ||
step > num.zero && start <= stop | ||
|| step < num.zero && start >= stop | ||
|| step == 0 && start == stop, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
} else { | ||
ev.copy(code = | ||
s""" | ||
|boolean ${ev.isNull} = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we don't need this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -308,6 +313,292 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper | |||
ArrayMax(Literal.create(Seq(1.123, 0.1234, 1.121), ArrayType(DoubleType))), 1.123) | |||
} | |||
|
|||
test("Sequence") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need additional words in the test title?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
} | ||
|
||
test("Sequence on DST boundaries") { | ||
val timeZone = TimeZone.getTimeZone("CET") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you use a long name? 3-letter name might not work in some environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed
ok to test |
Test build #91617 has finished for PR 21155 at commit
|
2ccb9bb
to
c54857a
Compare
Test build #92217 has finished for PR 21155 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you split the current updates into some PRs to add sequence
function and refactor other functions?
We usually do one thing in one PR unless we have a special reason. Thanks!
def castChildrenTo(widerType: DataType): Expression = Sequence( | ||
Cast(start, widerType), | ||
Cast(stop, widerType), | ||
stepOpt.map(step => if (step.dataType != CalendarIntervalType) Cast(step, widerType) else step), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can try to cast to CalendarIntervalType
if widerType
is DateType
or TimestampType
and step.dataType
is StringType
? We might want to use StringType
for the interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can it be useful? Do you have a use case example?
When using SQL notation an interval step is automatically parsed as a correct type, so I never get it as String
in this method. Unless I am mistaken the only way for step
to be of String
type is if Sequence
expression is created programmatically with a string type expression passed as 3rd argument. But in this case having strong typing is rather a good thing, isn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Thanks!
| "Illegal sequence boundaries: " + $start + " to " + $stop + " by " + $step); | ||
|} | ||
|long $longLen = $stop == $start ? 1L : 1L + ((long) $stop - $start) / $step; | ||
|if ($longLen > Integer.MAX_VALUE) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MAX_ROUNDED_ARRAY_LENGTH
instead of Integer.MAX_VALUE
?
c54857a
to
def678b
Compare
Test build #92295 has finished for PR 21155 at commit
|
def678b
to
140225d
Compare
Test build #92326 has finished for PR 21155 at commit
|
please retest |
Jenkins, retest this please. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except for one comment.
require( | ||
(step > num.zero && start <= stop) | ||
|| (step < num.zero && start >= stop) | ||
|| (step == 0 && start == stop), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num.zero
instead of 0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed. Although I wonder what difference it makes besides the readability? Doesn't equals(0)
work consistently on every numeric types in Scala? Or is it related to autoboxing somehow? It's not very clear to me in this context. Thanks.
Test build #92331 has finished for PR 21155 at commit
|
Test build #92334 has finished for PR 21155 at commit
|
Thanks! merging to master. |
## What changes were proposed in this pull request? This pr is a follow-up pr of #21155. The #21155 removed unnecessary import at that time, but the import became necessary in another pr. ## How was this patch tested? Existing tests. Author: Takuya UESHIN <ueshin@databricks.com> Closes #21646 from ueshin/issues/SPARK-23927/fup1.
What changes were proposed in this pull request?
The PR adds the SQL function
sequence
.https://issues.apache.org/jira/browse/SPARK-23927
The behavior of the function is based on Presto's one.
Ref: https://prestodb.io/docs/current/functions/array.html
sequence(start, stop) → array<bigint>
Generate a sequence of integers from
start
tostop
, incrementing by1
ifstart
is less than or equal tostop
, otherwise-1
.sequence(start, stop, step) → array<bigint>
Generate a sequence of integers from
start
tostop
, incrementing bystep
.sequence(start_date, stop_date) → array<date>
Generate a sequence of dates from
start_date
tostop_date
, incrementing byinterval 1 day
ifstart_date
is less than or equal tostop_date
, otherwise- interval 1 day
.sequence(start_date, stop_date, step_interval) → array<date>
Generate a sequence of dates from
start_date
tostop_date
, incrementing bystep_interval
. The type ofstep_interval
isCalendarInterval
.sequence(start_timestemp, stop_timestemp) → array<timestamp>
Generate a sequence of timestamps from
start_timestamps
tostop_timestamps
, incrementing byinterval 1 day
ifstart_date
is less than or equal tostop_date
, otherwise- interval 1 day
.sequence(start_timestamp, stop_timestamp, step_interval) → array<timestamp>
Generate a sequence of timestamps from
start_timestamps
tostop_timestamps
, incrementing bystep_interval
. The type ofstep_interval
isCalendarInterval
.How was this patch tested?
Added unit tests.