Skip to content

Commit

Permalink
Add a limit to the number of columns in the CLUSTERED BY clause (#13352)
Browse files Browse the repository at this point in the history
* Add clustered by limit

* change semantics, add docs

* add fault class to the module

* add test

* unambiguate test
  • Loading branch information
LakshSingla authored Nov 15, 2022
1 parent 309cae7 commit 9e938b5
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 0 deletions.
2 changes: 2 additions & 0 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ The following table lists query limits:
| Number of input files/segments per worker. | 10,000 | `TooManyInputFiles` |
| Number of output partitions for any one stage. Number of segments generated during ingestion. |25,000 | `TooManyPartitions` |
| Number of output columns for any one stage. | 2,000 | `TooManyColumns` |
| Number of cluster by columns that can appear in a stage | 1,500 | `TooManyClusteredByColumns` |
| Number of workers for any one stage. | Hard limit is 1,000. Memory-dependent soft limit may be lower. | `TooManyWorkers` |
| Maximum memory occupied by broadcasted tables. | 30% of each [processor memory bundle](concepts.md#memory-usage). | `BroadcastTablesTooLarge` |

Expand Down Expand Up @@ -262,6 +263,7 @@ The following table describes error codes you may encounter in the `multiStageQu
| `TooManyBuckets` | Exceeded the number of partition buckets for a stage. Partition buckets are only used for `segmentGranularity` during INSERT queries. The most common reason for this error is that your `segmentGranularity` is too narrow relative to the data. See the [Limits](#limits) table for the specific limit. | `maxBuckets`: The limit on buckets. |
| `TooManyInputFiles` | Exceeded the number of input files/segments per worker. See the [Limits](#limits) table for the specific limit. | `numInputFiles`: The total number of input files/segments for the stage.<br /><br />`maxInputFiles`: The maximum number of input files/segments per worker per stage.<br /><br />`minNumWorker`: The minimum number of workers required for a successful run. |
| `TooManyPartitions` | Exceeded the number of partitions for a stage. The most common reason for this is that the final stage of an INSERT or REPLACE query generated too many segments. See the [Limits](#limits) table for the specific limit. | `maxPartitions`: The limit on partitions which was exceeded |
| `TooManyClusteredByColumns` | Exceeded the number of cluster by columns for a stage. See the [Limits](#limits) table for the specific limit. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded.`stage`: The stage number exceeding the limit<br /><br /> |
| `TooManyColumns` | Exceeded the number of columns for a stage. See the [Limits](#limits) table for the specific limit. | `numColumns`: The number of columns requested.<br /><br />`maxColumns`: The limit on columns which was exceeded. |
| `TooManyWarnings` | Exceeded the allowed number of warnings of a particular type. | `rootErrorCode`: The error code corresponding to the exception that exceeded the required limit. <br /><br />`maxWarnings`: Maximum number of warnings that are allowed for the corresponding `rootErrorCode`. |
| `TooManyWorkers` | Exceeded the supported number of workers running simultaneously. See the [Limits](#limits) table for the specific limit. | `workers`: The number of simultaneously running workers that exceeded a hard or soft limit. This may be larger than the number of workers in any one stage if multiple stages are running simultaneously. <br /><br />`maxWorkers`: The hard or soft limit on workers that was exceeded. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ public class Limits
*/
public static final int MAX_FRAME_COLUMNS = 2000;

/**
* Maximum number of columns that can appear in the clustered by clause
*
* There is some arbitrariness in the limit, but it is chosen such that the datasketches sketches do not blow up in
* memory while computing the partitions for the clustered by keys.
* This limit along sequential merge of the sketches will help prevent OOMs in both the workers and the controller
* tasks
*/
public static final int MAX_CLUSTERED_BY_COLUMNS = (int) (MAX_FRAME_COLUMNS * 0.75);

/**
* Maximum number of workers that can be used in a stage, regardless of available memory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.primitives.Ints;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyWorkersFault;
Expand All @@ -48,6 +49,17 @@ public static void validateQueryDef(final QueryDefinition queryDef)
throw new MSQException(new TooManyColumnsFault(numColumns, Limits.MAX_FRAME_COLUMNS));
}

final int numClusteredByColumns = stageDef.getClusterBy().getColumns().size();
if (numClusteredByColumns > Limits.MAX_CLUSTERED_BY_COLUMNS) {
throw new MSQException(
new TooManyClusteredByColumnsFault(
numClusteredByColumns,
Limits.MAX_CLUSTERED_BY_COLUMNS,
stageDef.getStageNumber()
)
);
}

final int numWorkers = stageDef.getMaxWorkerCount();
if (numWorkers > Limits.MAX_WORKERS) {
throw new MSQException(new TooManyWorkersFault(numWorkers, Limits.MAX_WORKERS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TaskStartTimeoutFault;
import org.apache.druid.msq.indexing.error.TooManyBucketsFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyColumnsFault;
import org.apache.druid.msq.indexing.error.TooManyInputFilesFault;
import org.apache.druid.msq.indexing.error.TooManyPartitionsFault;
Expand Down Expand Up @@ -118,6 +119,7 @@ public class MSQIndexingModule implements DruidModule
RowTooLargeFault.class,
TaskStartTimeoutFault.class,
TooManyBucketsFault.class,
TooManyClusteredByColumnsFault.class,
TooManyColumnsFault.class,
TooManyInputFilesFault.class,
TooManyPartitionsFault.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.msq.indexing.error;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;

import java.util.Objects;

@JsonTypeName(TooManyClusteredByColumnsFault.CODE)
public class TooManyClusteredByColumnsFault extends BaseMSQFault
{
static final String CODE = "TooManyClusteredByColumns";

private final int numColumns;
private final int maxColumns;
private final int stage;

@JsonCreator
public TooManyClusteredByColumnsFault(
@JsonProperty("numColumns") final int numColumns,
@JsonProperty("maxColumns") final int maxColumns,
@JsonProperty("stage") final int stage
)
{
super(CODE, "Too many cluster by columns present in stage [%s] (requested = %d, max = %d)", stage, numColumns, maxColumns);
this.numColumns = numColumns;
this.maxColumns = maxColumns;
this.stage = stage;
}

@JsonProperty
public int getNumColumns()
{
return numColumns;
}

@JsonProperty
public int getMaxColumns()
{
return maxColumns;
}

@JsonProperty
public int getStage()
{
return stage;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
if (!super.equals(o)) {
return false;
}
TooManyClusteredByColumnsFault that = (TooManyClusteredByColumnsFault) o;
return numColumns == that.numColumns && maxColumns == that.maxColumns && stage == that.stage;
}

@Override
public int hashCode()
{
return Objects.hash(super.hashCode(), numColumns, maxColumns, stage);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import org.apache.druid.hll.HyperLogLogCollector;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.msq.indexing.error.ColumnNameRestrictedFault;
import org.apache.druid.msq.indexing.error.InsertTimeNullFault;
import org.apache.druid.msq.indexing.error.RowTooLargeFault;
import org.apache.druid.msq.indexing.error.TooManyClusteredByColumnsFault;
import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand All @@ -53,6 +55,8 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class MSQInsertTest extends MSQTestBase
{
Expand Down Expand Up @@ -444,6 +448,49 @@ public void testIncorrectInsertQuery()
.verifyPlanningErrors();
}

@Test
public void testInsertWithHugeClusteringKeys()
{
RowSignature dummyRowSignature = RowSignature.builder().add("__time", ColumnType.LONG).build();

final int numColumns = 1700;

String columnNames = IntStream.range(1, numColumns)
.mapToObj(i -> "col" + i).collect(Collectors.joining(", "));

String clusteredByClause = IntStream.range(1, numColumns + 1)
.mapToObj(String::valueOf)
.collect(Collectors.joining(", "));

String externSignature = IntStream.range(1, numColumns)
.mapToObj(i -> StringUtils.format(
"{\"name\": \"col%d\", \"type\": \"string\"}",
i
))
.collect(Collectors.joining(", "));

testIngestQuery()
.setSql(StringUtils.format(
" insert into foo1 SELECT\n"
+ " floor(TIME_PARSE(\"timestamp\") to day) AS __time,\n"
+ " %s\n"
+ "FROM TABLE(\n"
+ " EXTERN(\n"
+ " '{ \"files\": [\"ignored\"],\"type\":\"local\"}',\n"
+ " '{\"type\": \"json\"}',\n"
+ " '[{\"name\": \"timestamp\", \"type\": \"string\"}, %s]'\n"
+ " )\n"
+ ") PARTITIONED by day CLUSTERED BY %s",
columnNames,
externSignature,
clusteredByClause
))
.setExpectedDataSource("foo1")
.setExpectedRowSignature(dummyRowSignature)
.setExpectedMSQFault(new TooManyClusteredByColumnsFault(numColumns + 2, 1500, 0))
.verifyResults();
}

@Test
public void testInsertRestrictedColumns()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public void testFaultSerde() throws IOException
assertFaultSerde(new TaskStartTimeoutFault(10));
assertFaultSerde(new TooManyBucketsFault(10));
assertFaultSerde(new TooManyColumnsFault(10, 8));
assertFaultSerde(new TooManyClusteredByColumnsFault(10, 8, 1));
assertFaultSerde(new TooManyInputFilesFault(15, 10, 5));
assertFaultSerde(new TooManyPartitionsFault(10));
assertFaultSerde(new TooManyWarningsFault(10, "the error"));
Expand Down

0 comments on commit 9e938b5

Please sign in to comment.