-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Window function on msq #15470
Window function on msq #15470
Conversation
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Fixed
Show fixed
Hide fixed
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
processing/src/main/java/org/apache/druid/query/rowsandcols/LazilyDecoratedRowsAndColumns.java
Outdated
Show resolved
Hide resolved
catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
return Operator.Signal.GO; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of returning GO check if the frames can be paused. In such a case return that. Also need to test pausing frames through the MSQ framework correctly
.inputs(new StageInputSpec(firstStageNumber - 1)) | ||
.signature(rowSignature) | ||
.maxWorkerCount(maxWorkerCount) | ||
.shuffleSpec(null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the shuffle spec is null. Tell the previous stage to shuffle by the appropriate partition here so that the data comes correctly. For example if previous stage is a groupByPostShuffle, find a way to tell it to set a shuffle spec for the next stage. Since the inner query has no idea of the outer operators, we can use the context to pass the information
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shuffle spec for a stage tells it how to partition the data for the next stage. Therefore it should use a combination of the resultShuffleSpecFactory to construct the final shuffleSpec.
If you want the data in a particular format inside a stage, its input should always be a stage, and the shuffle spec of that stage should be set accordingly. Hash Shuffle uses similar logic.
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Fixed
Show fixed
Hide fixed
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Fixed
Show fixed
Hide fixed
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Fixed
Show fixed
Hide fixed
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/Limits.java
Show resolved
Hide resolved
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Outdated
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CodeQL found more than 20 potential problems in the proposed changes. Check the Files changed tab for more details.
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
...stage-query/src/main/java/org/apache/druid/msq/indexing/error/TooManyRowsInAWindowFault.java
Outdated
Show resolved
Hide resolved
...s-core/multi-stage-query/src/main/java/org/apache/druid/msq/util/MultiStageQueryContext.java
Outdated
Show resolved
Hide resolved
...e/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/groupby/GroupByQueryKit.java
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Show resolved
Hide resolved
...age-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java
Outdated
Show resolved
Hide resolved
735b621
to
1464dae
Compare
{ | ||
super( | ||
CODE, | ||
"Too many rows in a window (requested = %d, max = %d). Try creating a window with a higher cardinality column or change the query shape.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also mention the user can set MAX_ROWS_MATERIALIZED_IN_WINDOW
config in the query context. We should also tell the user that setting this config can lead to OOM errors so use with caution.
.inputs(new StageInputSpec(firstStageNumber)) | ||
.signature(stageSignature) | ||
.maxWorkerCount(maxWorkerCount) | ||
.shuffleSpec(nextShuffleWindowSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of limit, this should not be nextShuffeWIndowSpec no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of a limit on the inner query, the window is going to operate on the result of the limit, so I think it should be the nextShuffleSpec as it contains the partition by for the next window
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add a UT for this if its already not there
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the limit and offset should be applied on the grouping key. So it should be shuffleSpecFactoryPostAggregation != null ? : null
Also we can actually short circuit the shuffle spec of the OffsetLimitProcessor to null since limit always gets applied on 1 worker and 1 partition. So we would be okay in case a window processor is the next stage since the data would already be sorted :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel the PR is almost there. Left some comments.
Thanks for working on this.
|
||
private final List<OperatorFactory> operatorFactoryList; | ||
private final ObjectMapper jsonMapper; | ||
private final ArrayList<RowsAndColumns> frameRowsAndCols; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
frameRowsAndCols who clears this array list, I was expecting after we add stuff to the result, the frameRowsAndCols can be cleared no ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it is being cleared once the result is written
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems to be cleared after we are done writing results to the frames which seems suspect.
Shouldn't it be cleared once we have added stuff to resultRowsAndCols ?
.inputs(new StageInputSpec(firstStageNumber)) | ||
.signature(stageSignature) | ||
.maxWorkerCount(maxWorkerCount) | ||
.shuffleSpec(nextShuffleWindowSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets add a UT for this if its already not there
...ry/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changes LGTM. Thanks for the patience @somu-imply !!.
Updated the release notes taking into account the follow up PR as well #16229 |
This PR aims to introduce Window functions on MSQ by doing the following:
WINDOW_LEAF_OPERATOR
which is set only for MSQ engine. In presence of this feature, the planner plans without the leaf operators by creating a window query over an inner scan query. In case of native this is set to false and the planner generates the leafOperatorsRelease notes
Add support in MSQE to run window functions using a context flag
enableWindowing:true
. In the native engine, we need a group by clause to enable window functions. In the MSQE the requirement of providing a mandatory group by clause to enable window functions is removed.This PR has: