Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Window function on msq #15470

Merged
merged 54 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
4ef900d
Initial code
somu-imply Nov 14, 2023
5e8dab1
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Nov 17, 2023
9ea01fc
Hacky way of atleast getting things to work
somu-imply Nov 30, 2023
9c4ac74
Temp unfinished changes
somu-imply Dec 1, 2023
54f9ac3
Converting rac back to frames
somu-imply Dec 7, 2023
d6cef47
Working UTs
somu-imply Dec 7, 2023
40a18f1
Fixing running window function in console
somu-imply Dec 8, 2023
3ecc96a
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Dec 31, 2023
7e34aa8
Adding shuffle spec and separating out stages for each window
somu-imply Jan 3, 2024
f5a1f59
serde stuff by adding ops to proc factory
somu-imply Jan 8, 2024
ab6e317
Updating for first set of reviews
somu-imply Jan 9, 2024
f1efec3
Changes for partition boundary detection
somu-imply Jan 16, 2024
1dae450
cleaning up some code, adding some tests
somu-imply Jan 19, 2024
ccfe473
Fixing up shuffle in group by if window afterwards
somu-imply Jan 19, 2024
500f54f
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 20, 2024
98f4ba5
fix after merge
somu-imply Jan 20, 2024
ee61333
More updates and ignoring the insert cases FOR NOW..
somu-imply Jan 20, 2024
8f8bfdc
A possible fix for the insert case
somu-imply Jan 20, 2024
ec1f164
Support for leaf operators in window functions in MSQ
somu-imply Jan 23, 2024
465598a
in case of MSQ engine planning the query with leafOperator as a windo…
somu-imply Jan 24, 2024
d490d78
Removing inspection profile
somu-imply Jan 24, 2024
836b693
Revert "Removing inspection profile"
somu-imply Jan 24, 2024
aee40ba
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 24, 2024
3f3d1b0
Updating inspection profile
somu-imply Jan 24, 2024
aa1b753
Updating scan query kit to handle shuffle for next window
somu-imply Jan 24, 2024
634d5bc
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 27, 2024
bd5f27b
Some comments addressed
somu-imply Jan 29, 2024
5338b76
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Jan 29, 2024
b688755
Throwing exceptions in 2 cases
somu-imply Jan 29, 2024
ceff35d
Moving examples to a new file and adding new examples with join
somu-imply Feb 1, 2024
fc2e2b6
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 1, 2024
751e947
More tests now window functions over unnest
somu-imply Feb 1, 2024
d7840a3
Addressing review commets part 1
somu-imply Feb 2, 2024
681ee1b
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Feb 5, 2024
b487d2a
Changes to frame processor to move from base leaf to frame processor …
somu-imply Feb 5, 2024
c3baa1d
Some more refactoring and addressing comments
somu-imply Feb 5, 2024
9768da2
Updating more tests
somu-imply Feb 6, 2024
584fa8f
Making build pass
somu-imply Feb 6, 2024
0813cc8
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 14, 2024
d3755e1
Addressing latest set of comments part 1
somu-imply Mar 14, 2024
c0c74a0
Addressing latest review comments part 2
somu-imply Mar 14, 2024
60c6290
Minor refactoring and new tests after review
somu-imply Mar 20, 2024
91edf9e
Adding guardrails for materialization and avoiding partition bossting…
somu-imply Mar 20, 2024
399a78c
Adding more javadocs, guardrails and tests
somu-imply Mar 21, 2024
806c801
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 21, 2024
04424d7
Changes to one test case
somu-imply Mar 21, 2024
f0946b1
More fixes around guardrails and addressing last set of review comments
somu-imply Mar 27, 2024
ea7882c
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
83c96b9
Fixing a testcase after the merge
somu-imply Mar 27, 2024
cfca6a5
Fixing a test by using correct in filters for sql compat mode
somu-imply Mar 27, 2024
c3e2c29
Not documenting context flag and 1 more test change
somu-imply Mar 27, 2024
1464dae
Merge remote-tracking branch 'upstream/master' into windowFunctionOnMSQ
somu-imply Mar 27, 2024
520ab4e
New test for inner limit on group by
somu-imply Mar 28, 2024
16b75ce
Adding to known issues
somu-imply Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/multi-stage-query/known-issues.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ properties, and the `indexSpec` [`tuningConfig`](../ingestion/ingestion-spec.md#
- `EXTERN` with input sources that match large numbers of files may exhaust available memory on the controller task.

- `EXTERN` refers to external files. Use `FROM` to access `druid` input sources.

## `WINDOW` Function

- The maximum number of elements in a window cannot exceed a value of 100,000.
- To avoid `leafOperators` in MSQ engine, window functions have an extra scan stage after the window stage for cases
where native engine has a non-empty `leafOperator`.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@
import org.apache.druid.sql.calcite.BaseCalciteQueryTest;
import org.apache.druid.sql.calcite.SqlTestFrameworkConfig;
import org.apache.druid.sql.calcite.filtration.Filtration;
import org.apache.druid.sql.calcite.planner.PlannerContext;
import org.apache.druid.sql.calcite.util.CacheTestHelperModule.ResultCacheMode;
import org.apache.druid.sql.calcite.util.CalciteTests;
import org.apache.druid.sql.calcite.util.TestDataBuilder;
Expand Down Expand Up @@ -1164,7 +1163,6 @@ public void testHllEstimateAsVirtualColumnWithTopN()
public void testHllWithOrderedWindowing()
{
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT dim1,coalesce(cast(l1 as integer),-999),"
+ " HLL_SKETCH_ESTIMATE( DS_HLL(dim1) OVER ( ORDER BY l1 ), true)"
Expand All @@ -1191,7 +1189,6 @@ public void testResultCacheWithWindowing()
skipVectorize();
for (int i = 0; i < 2; i++) {
testBuilder()
.queryContext(ImmutableMap.of(PlannerContext.CTX_ENABLE_WINDOW_FNS, true))
.sql(
"SELECT "
+ " TIME_FLOOR(__time, 'P1D') as dayLvl,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@
import org.apache.druid.msq.querykit.QueryKit;
import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.querykit.ShuffleSpecFactory;
import org.apache.druid.msq.querykit.WindowOperatorQueryKit;
import org.apache.druid.msq.querykit.groupby.GroupByQueryKit;
import org.apache.druid.msq.querykit.results.ExportResultsFrameProcessorFactory;
import org.apache.druid.msq.querykit.results.QueryResultFrameProcessorFactory;
Expand All @@ -186,6 +187,7 @@
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.groupby.GroupByQuery;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.operator.WindowOperatorQuery;
import org.apache.druid.query.scan.ScanQuery;
import org.apache.druid.segment.DimensionHandlerUtils;
import org.apache.druid.segment.column.ColumnHolder;
Expand Down Expand Up @@ -1201,6 +1203,7 @@ private QueryKit makeQueryControllerToolKit()
ImmutableMap.<Class<? extends Query>, QueryKit>builder()
.put(ScanQuery.class, new ScanQueryKit(context.jsonMapper()))
.put(GroupByQuery.class, new GroupByQueryKit(context.jsonMapper()))
.put(WindowOperatorQuery.class, new WindowOperatorQueryKit(context.jsonMapper()))
.build();

return new MultiQueryKit(kitMap);
Expand Down Expand Up @@ -2769,7 +2772,6 @@ private void startStages() throws IOException, InterruptedException
if (isFailOnEmptyInsertEnabled && Boolean.TRUE.equals(isShuffleStageOutputEmpty)) {
throw new MSQException(new InsertCannotBeEmptyFault(task.getDataSource()));
}

final ClusterByPartitions partitionBoundaries =
queryKernel.getResultPartitionBoundariesForStage(shuffleStageId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,11 @@ public class Limits
* Max number of partition buckets for ingestion queries.
*/
public static final int MAX_PARTITION_BUCKETS = 5_000;

/**
* Max number of rows with the same key in a window. This acts as a guardrail for
* data distribution with high cardinality
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
*/
public static final int MAX_ROWS_MATERIALIZED_IN_WINDOW = 100_000;

}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.apache.druid.msq.input.table.TableInputSpec;
import org.apache.druid.msq.kernel.NilExtraInfoHolder;
import org.apache.druid.msq.querykit.InputNumberDataSource;
import org.apache.druid.msq.querykit.WindowOperatorQueryFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.OffsetLimitFrameProcessorFactory;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.querykit.groupby.GroupByPostShuffleFrameProcessorFactory;
Expand Down Expand Up @@ -159,6 +160,7 @@ public List<? extends Module> getJacksonModules()
NilExtraInfoHolder.class,
SortMergeJoinFrameProcessorFactory.class,
QueryResultFrameProcessorFactory.class,
WindowOperatorQueryFrameProcessorFactory.class,
ExportResultsFrameProcessorFactory.class,

// DataSource classes (note: ExternalDataSource is in MSQSqlModule)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.msq.util.MultiStageQueryContext;

import java.util.Objects;

@JsonTypeName(TooManyRowsInAWindowFault.CODE)
public class TooManyRowsInAWindowFault extends BaseMSQFault
{

static final String CODE = "TooManyRowsInAWindow";

private final int numRows;
private final int maxRows;

@JsonCreator
public TooManyRowsInAWindowFault(
@JsonProperty("numRows") final int numRows,
@JsonProperty("maxRows") final int maxRows
)
{
super(
CODE,
"Too many rows in a window (requested = %d, max = %d). "
+ " Try creating a window with a higher cardinality column or change the query shape."
+ " Or you can change the max using query context param %s ."
+ " Use it carefully as a higher value can lead to OutOfMemory errors. ",
numRows,
maxRows,
MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW
);
this.numRows = numRows;
this.maxRows = maxRows;
}

@JsonProperty
public int getNumRows()
{
return numRows;
}

@JsonProperty
public int getMaxRows()
{
return maxRows;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyRowsInAWindowFault that = (TooManyRowsInAWindowFault) o;
return numRows == that.numRows && maxRows == that.maxRows;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numRows, maxRows);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.kernel.StageDefinitionBuilder;
import org.apache.druid.msq.querykit.common.SortMergeJoinFrameProcessorFactory;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.FilteredDataSource;
import org.apache.druid.query.InlineDataSource;
Expand Down Expand Up @@ -88,7 +89,6 @@ public class DataSourcePlan
* of subqueries.
*/
private static final Map<String, Object> CONTEXT_MAP_NO_SEGMENT_GRANULARITY = new HashMap<>();

private static final Logger log = new Logger(DataSourcePlan.class);

static {
Expand Down Expand Up @@ -209,7 +209,8 @@ public static DataSourcePlan forDataSource(
(QueryDataSource) dataSource,
maxWorkerCount,
minStageNumber,
broadcast
broadcast,
queryContext
);
} else if (dataSource instanceof UnionDataSource) {
return forUnion(
Expand Down Expand Up @@ -419,15 +420,25 @@ private static DataSourcePlan forQuery(
final QueryDataSource dataSource,
final int maxWorkerCount,
final int minStageNumber,
final boolean broadcast
final boolean broadcast,
@Nullable final QueryContext parentContext
)
{
// check if parentContext has a window operator
final Map<String, Object> windowShuffleMap = new HashMap<>();
if (parentContext != null && parentContext.containsKey(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL)) {
windowShuffleMap.put(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL, parentContext.get(MultiStageQueryContext.NEXT_WINDOW_SHUFFLE_COL));
}
final QueryDefinition subQueryDef = queryKit.makeQueryDefinition(
queryId,

// Subqueries ignore SQL_INSERT_SEGMENT_GRANULARITY, even if set in the context. It's only used for the
// outermost query, and setting it for the subquery makes us erroneously add bucketing where it doesn't belong.
dataSource.getQuery().withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY),
windowShuffleMap.isEmpty()
? dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
: dataSource.getQuery()
.withOverriddenContext(CONTEXT_MAP_NO_SEGMENT_GRANULARITY)
.withOverriddenContext(windowShuffleMap),
queryKit,
ShuffleSpecFactories.globalSortWithMaxPartitionCount(maxWorkerCount),
maxWorkerCount,
Expand Down Expand Up @@ -683,7 +694,8 @@ private static DataSourcePlan forSortMergeJoin(
(QueryDataSource) dataSource.getLeft(),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
false,
null
somu-imply marked this conversation as resolved.
Show resolved Hide resolved
);
leftPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);

Expand All @@ -696,7 +708,8 @@ private static DataSourcePlan forSortMergeJoin(
(QueryDataSource) dataSource.getRight(),
maxWorkerCount,
Math.max(minStageNumber, subQueryDefBuilder.getNextStageNumber()),
false
false,
null
);
rightPlan.getSubQueryDefBuilder().ifPresent(subQueryDefBuilder::addAll);

Expand Down
Loading
Loading