-
Notifications
You must be signed in to change notification settings - Fork 11
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
StreamFrame API alternatives. #17
Conversation
@@ -0,0 +1,28 @@ | |||
package org.apache.spark.sql.streamv1 | |||
|
|||
trait WindowSpec |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ignore this file for now.
* A1. Blocking operations return new StreamFrames, and emits a new tuple for every update. | ||
* As an example, sf.groupby("id").count() will emit a tuple every time we see a new record | ||
* for "id", i.e. a running count. Note that these operations will be expensive, because | ||
* they require ordering all the inputs by time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does "emitting" a tuple mean? In other words, how does this API tie in with event time and stuff like that?
Generally, the way I understood event time is that you want to define a query based on the event time in the data, and then evaluate it at possibly multiple points in "real" (processing) time to get a different answer for the same event time window.
If you mean that this kind of stream is the same as one with an infinite window, that does make sense.
Hey, so looking at this quickly, it seems to me that some applications will want infinite windows, and in that case, we can make the StreamFrame-without-a-window be the same thing as a windowed StreamFrame with an infinite window. So I'd go for A1 with these semantics. |
@mateiz do you mean an infinite window, or a landmark window, i.e. one that is from the beginning of time till now? |
I mean from the beginning of time until now (in event time). Isn't that what Dataflow also has? In any case though, the tricking thing seems to be how it will combine with grouping, since people sometimes want to define a window per group. I think you also have to add that to the API before we can decide whether it makes sense. |
I'd definitely rule out A2 since it seems like a hacky version of option B. Between A1 and B, I'd prefer B. @mateiz Unifying the concepts makes technical sense, but I feel like it's more confusing for people not experienced with streaming. It seems simpler to separate the APIs for expensive operations (blocking) vs. cheap transformations. Also, with A1, the natural default behavior is to use infinite windows since the beginning of time. Unless we think that's the right thing for most use cases, I'd prefer to force users to set a window explicitly. |
Sure, but just look at the Dataflow paper / API for their motivation and see whether you agree with that. Part of their motivation was to let you use the "same" program for both streaming and batch analysis, which is something that people have complained they can't do with Spark. So we should see how that's addressed (common superclass, easy way to get a snapshot of a stream as a DataFrame, or whatever). That's really the main reason I was thinking about this. |
I see, that's a good argument. I'll take a look at the paper. |
@mateiz Google Dataflow uses "event time" (determined by the timestamp on the data element itself). I think Spark Streaming needs to support it also for the sessionization. |
Thanks for commenting, @rbkim. Yes indeed. Those are included here too. |
@rxin yes but we should consider the timestamp in the DStream also because it is not the "event time". |
@rbkim there is nothing about dstream in this API, is there? |
@rxin you're right. nothing about dstream in here. I thought you might have a plan to add some APIs related to dstream. |
…onfig option. ## What changes were proposed in this pull request? Currently, `OptimizeIn` optimizer replaces `In` expression into `InSet` expression if the size of set is greater than a constant, 10. This issue aims to make a configuration `spark.sql.optimizer.inSetConversionThreshold` for that. After this PR, `OptimizerIn` is configurable. ```scala scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain() == Physical Plan == WholeStageCodegen : +- Project [a#7 IN (1,2,3) AS (a IN (1, 2, 3))#8] : +- INPUT +- Generate explode([1,2]), false, false, [a#7] +- Scan OneRowRelation[] scala> sqlContext.setConf("spark.sql.optimizer.inSetConversionThreshold", "2") scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain() == Physical Plan == WholeStageCodegen : +- Project [a#16 INSET (1,2,3) AS (a IN (1, 2, 3))#17] : +- INPUT +- Generate explode([1,2]), false, false, [a#16] +- Scan OneRowRelation[] ``` ## How was this patch tested? Pass the Jenkins tests (with a new testcase) Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#12562 from dongjoon-hyun/SPARK-14796.
…aggregations ## What changes were proposed in this pull request? Partial aggregations are generated in `EnsureRequirements`, but the planner fails to check if partial aggregation satisfies sort requirements. For the following query: ``` val df2 = (0 to 1000).map(x => (x % 2, x.toString)).toDF("a", "b").createOrReplaceTempView("t2") spark.sql("select max(b) from t2 group by a").explain(true) ``` Now, the SortAggregator won't insert Sort operator before partial aggregation, this will break sort-based partial aggregation. ``` == Physical Plan == SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17]) +- *Sort [a#5 ASC], false, 0 +- Exchange hashpartitioning(a#5, 200) +- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19]) +- LocalTableScan [a#5, b#6] ``` Actually, a correct plan is: ``` == Physical Plan == SortAggregate(key=[a#5], functions=[max(b#6)], output=[max(b)#17]) +- *Sort [a#5 ASC], false, 0 +- Exchange hashpartitioning(a#5, 200) +- SortAggregate(key=[a#5], functions=[partial_max(b#6)], output=[a#5, max#19]) +- *Sort [a#5 ASC], false, 0 +- LocalTableScan [a#5, b#6] ``` ## How was this patch tested? Added tests in `PlannerSuite`. Author: Takeshi YAMAMURO <linguin.m.s@gmail.com> Closes apache#14865 from maropu/SPARK-17289.
This pull request proposes 3 alternatives for StreamFrame API.
At this point, we are mostly looking for feedback about the high level alternatives (e.g. not the detailed function names, window specs, etc).