Skip to content

Commit

Permalink
SQL: Use timestamp_floor when granularity is not safe.
Browse files Browse the repository at this point in the history
PR apache#12944 added a check at the execution layer to avoid materializing
excessive amounts of time-granular buckets. This patch modifies the SQL
planner to avoid generating queries that would throw such errors, by
switching certain plans to use the timestamp_floor function instead of
granularities. This applies both to the Timeseries query type, and the
GroupBy timestampResultFieldGranularity feature.

The patch also goes one step further: we switch to timestamp_floor
not just in the ETERNITY + non-ALL case, but also if the estimated
number of time-granular buckets exceeds 100,000.

Finally, the patch modifies the timestampResultFieldGranularity
field to consistently be a String rather than a Granularity. This
ensures that it can be round-trip serialized and deserialized, which is
useful when trying to execute the results of "EXPLAIN PLAN FOR" with
GroupBy queries that use the timestampResultFieldGranularity feature.
  • Loading branch information
gianm committed Oct 11, 2022
1 parent 2a24c20 commit a9c9877
Show file tree
Hide file tree
Showing 17 changed files with 261 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -401,6 +402,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferPool,
mergeBufferPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.generator.DataGenerator;
Expand Down Expand Up @@ -516,6 +517,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public int getNumThreads()
GroupByQueryConfig::new,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.query.groupby.strategy;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
Expand All @@ -31,6 +32,7 @@
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -97,6 +99,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
private final Supplier<GroupByQueryConfig> configSupplier;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper jsonMapper;
private final ObjectMapper spillMapper;
private final QueryWatcher queryWatcher;

Expand All @@ -106,6 +109,7 @@ public GroupByStrategyV2(
Supplier<GroupByQueryConfig> configSupplier,
@Global NonBlockingPool<ByteBuffer> bufferPool,
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper spillMapper,
QueryWatcher queryWatcher
)
Expand All @@ -114,6 +118,7 @@ public GroupByStrategyV2(
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
this.mergeBufferPool = mergeBufferPool;
this.jsonMapper = jsonMapper;
this.spillMapper = spillMapper;
this.queryWatcher = queryWatcher;
}
Expand Down Expand Up @@ -248,8 +253,16 @@ public Sequence<ResultRow> mergeResults(
// the query generated by "explain plan for select ..." doesn't match to the native query ACTUALLY being executed,
// the granularity and dimensions are slightly different.
// now, part of the query plan logic is handled in GroupByStrategyV2, not only in DruidQuery.toGroupByQuery()
final Granularity timestampResultFieldGranularity
= query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
final Granularity timestampResultFieldGranularity;
try {
timestampResultFieldGranularity = jsonMapper.readValue(
(String) query.getContextValue(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY),
Granularity.class
);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
dimensionSpecs =
query.getDimensions()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
}

/**
* Whether the provided time interval and granularity combination is allowed.
*
* We restrict ETERNITY with non-ALL granularity, because allowing it would involve creating a very high number
* of time grains. This would cause queries to take an excessive amount of time or run out of memory.
*/
public static boolean isQueryGranularityAllowed(final Interval interval, final Granularity granularity)
{
return Granularities.ALL.equals(granularity) || !Intervals.ETERNITY.equals(interval);
}

@Override
public Interval getInterval()
{
Expand Down Expand Up @@ -172,11 +183,13 @@ public Sequence<Cursor> makeCursors(
if (actualInterval == null) {
return Sequences.empty();
}
// Incase time interval is ETERNITY with non-ALL granularity, don't risk creating time grains.
// For all non-ALL granularities, the time grains will be very high in number and that can either OOM the heap
// or create an very very long query.
if (actualInterval.contains(Intervals.ETERNITY) && !gran.equals(Granularities.ALL)) {
throw new IAE("Cannot support ETERNITY interval with %s time granluarity", gran);

if (!isQueryGranularityAllowed(actualInterval, gran)) {
throw new IAE(
"Cannot support interval [%s] with granularity [%s]",
Intervals.ETERNITY.equals(actualInterval) ? "ETERNITY" : actualInterval,
gran
);
}

final RowWalker<RowType> rowWalker = new RowWalker<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -352,6 +353,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand All @@ -369,6 +371,7 @@ public String getFormatString()
configSupplier,
bufferPool,
tooSmallMergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -601,6 +602,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand All @@ -618,6 +620,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool2,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -266,6 +267,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -139,6 +140,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.segment.TestHelper;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -110,6 +111,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
bufferPools.getProcessingPool(),
bufferPools.getMergePool(),
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
Expand Down Expand Up @@ -311,6 +312,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand All @@ -328,6 +330,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool2,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Loading

0 comments on commit a9c9877

Please sign in to comment.