diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java index 8e885856bac6..cd6c48a16ea6 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java @@ -66,6 +66,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.generator.DataGenerator; import org.apache.druid.segment.generator.GeneratorBasicSchemas; @@ -401,6 +402,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), QueryBenchmarkUtil.NOOP_QUERYWATCHER ) diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java index bc480c1063b4..4b0d55c2c62a 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/CachingClusteredClientBenchmark.java @@ -378,6 +378,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory( bufferPool, mergeBufferPool, mapper, + mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ) ); diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java index 9dd0ea5c5eac..602126ba4a26 100644 --- a/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java +++ b/benchmarks/src/test/java/org/apache/druid/benchmark/query/GroupByBenchmark.java @@ -81,6 +81,7 @@ import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.generator.DataGenerator; @@ -516,6 +517,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), QueryBenchmarkUtil.NOOP_QUERYWATCHER ) diff --git a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java index 3e0cda4aedbd..cd746aaff78e 100644 --- a/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java +++ b/extensions-contrib/virtual-columns/src/test/java/org/apache/druid/segment/MapVirtualColumnGroupByTest.java @@ -101,6 +101,7 @@ public int getNumThreads() GroupByQueryConfig::new, new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)), new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1), + TestHelper.makeJsonMapper(), new DefaultObjectMapper(), QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java index b5bb6bdd2bfb..cf5a8902a1ce 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java @@ -19,6 +19,7 @@ package org.apache.druid.msq.exec; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; @@ -523,7 +524,8 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer) final QueryDefinition queryDef = makeQueryDefinition( id(), makeQueryControllerToolKit(), - task.getQuerySpec() + task.getQuerySpec(), + context.jsonMapper() ); QueryValidator.validateQueryDef(queryDef); @@ -1311,7 +1313,8 @@ private void cleanUpDurableStorageIfNeeded() private static QueryDefinition makeQueryDefinition( final String queryId, @SuppressWarnings("rawtypes") final QueryKit toolKit, - final MSQSpec querySpec + final MSQSpec querySpec, + final ObjectMapper jsonMapper ) { final MSQTuningConfig tuningConfig = querySpec.getTuningConfig(); @@ -1395,7 +1398,9 @@ private static QueryDefinition makeQueryDefinition( } // Then, add a segment-generation stage. - final DataSchema dataSchema = generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings); + final DataSchema dataSchema = + generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper); + builder.add( StageDefinition.builder(queryDef.getNextStageNumber()) .inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber())) @@ -1421,7 +1426,8 @@ private static DataSchema generateDataSchema( MSQSpec querySpec, RowSignature querySignature, ClusterBy queryClusterBy, - ColumnMappings columnMappings + ColumnMappings columnMappings, + ObjectMapper jsonMapper ) { final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination(); @@ -1442,7 +1448,7 @@ private static DataSchema generateDataSchema( new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null), new DimensionsSpec(dimensionsAndAggregators.lhs), dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]), - makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery), + makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper), new TransformSpec(null, Collections.emptyList()) ); } @@ -1450,18 +1456,25 @@ private static DataSchema generateDataSchema( private static GranularitySpec makeGranularitySpecForIngestion( final Query query, final ColumnMappings columnMappings, - final boolean isRollupQuery + final boolean isRollupQuery, + final ObjectMapper jsonMapper ) { if (isRollupQuery) { - final String queryGranularity = query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); + final String queryGranularityString = + query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, ""); - if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularity.isEmpty()) { - return new ArbitraryGranularitySpec( - Granularity.fromString(queryGranularity), - true, - Intervals.ONLY_ETERNITY - ); + if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) { + final Granularity queryGranularity; + + try { + queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + + return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY); } return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY); } else { diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java b/processing/src/main/java/org/apache/druid/query/QueryContext.java index 0ed8e184664d..624bc1cb3d15 100644 --- a/processing/src/main/java/org/apache/druid/query/QueryContext.java +++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java @@ -19,6 +19,7 @@ package org.apache.druid.query; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularity; @@ -26,7 +27,7 @@ import org.apache.druid.segment.QueryableIndexStorageAdapter; import javax.annotation.Nullable; - +import java.io.IOException; import java.util.Collections; import java.util.Map; import java.util.Objects; @@ -231,16 +232,18 @@ public > E getEnum(String key, Class clazz, E defaultValue) return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue); } - public Granularity getGranularity(String key) + public Granularity getGranularity(String key, ObjectMapper jsonMapper) { - final Object value = get(key); - if (value == null) { + final String granularityString = getString(key); + if (granularityString == null) { return null; } - if (value instanceof Granularity) { - return (Granularity) value; - } else { - throw QueryContexts.badTypeException(key, "a Granularity", value); + + try { + return jsonMapper.readValue(granularityString, Granularity.class); + } + catch (IOException e) { + throw QueryContexts.badTypeException(key, "a Granularity", granularityString); } } diff --git a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java index 0ee5078efd81..cda33f459edf 100644 --- a/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java +++ b/processing/src/main/java/org/apache/druid/query/groupby/strategy/GroupByStrategyV2.java @@ -31,6 +31,7 @@ import org.apache.druid.collections.NonBlockingPool; import org.apache.druid.collections.ReferenceCountingResourceHolder; import org.apache.druid.guice.annotations.Global; +import org.apache.druid.guice.annotations.Json; import org.apache.druid.guice.annotations.Merging; import org.apache.druid.guice.annotations.Smile; import org.apache.druid.java.util.common.Intervals; @@ -98,6 +99,7 @@ public class GroupByStrategyV2 implements GroupByStrategy private final Supplier configSupplier; private final NonBlockingPool bufferPool; private final BlockingPool mergeBufferPool; + private final ObjectMapper jsonMapper; private final ObjectMapper spillMapper; private final QueryWatcher queryWatcher; @@ -107,6 +109,7 @@ public GroupByStrategyV2( Supplier configSupplier, @Global NonBlockingPool bufferPool, @Merging BlockingPool mergeBufferPool, + @Json ObjectMapper jsonMapper, @Smile ObjectMapper spillMapper, QueryWatcher queryWatcher ) @@ -115,6 +118,7 @@ public GroupByStrategyV2( this.configSupplier = configSupplier; this.bufferPool = bufferPool; this.mergeBufferPool = mergeBufferPool; + this.jsonMapper = jsonMapper; this.spillMapper = spillMapper; this.queryWatcher = queryWatcher; } @@ -252,7 +256,7 @@ public Sequence mergeResults( // the granularity and dimensions are slightly different. // now, part of the query plan logic is handled in GroupByStrategyV2, not only in DruidQuery.toGroupByQuery() final Granularity timestampResultFieldGranularity - = queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY); + = queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, jsonMapper); dimensionSpecs = query.getDimensions() .stream() diff --git a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java index 3fed0fc1cd05..b537e9d69d57 100644 --- a/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java +++ b/processing/src/main/java/org/apache/druid/segment/RowBasedStorageAdapter.java @@ -66,6 +66,17 @@ public class RowBasedStorageAdapter implements StorageAdapter this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature"); } + /** + * Whether the provided time interval and granularity combination is allowed. + * + * We restrict ETERNITY with non-ALL granularity, because allowing it would involve creating a very high number + * of time grains. This would cause queries to take an excessive amount of time or run out of memory. + */ + public static boolean isQueryGranularityAllowed(final Interval interval, final Granularity granularity) + { + return Granularities.ALL.equals(granularity) || !Intervals.ETERNITY.equals(interval); + } + @Override public Interval getInterval() { @@ -172,11 +183,13 @@ public Sequence makeCursors( if (actualInterval == null) { return Sequences.empty(); } - // Incase time interval is ETERNITY with non-ALL granularity, don't risk creating time grains. - // For all non-ALL granularities, the time grains will be very high in number and that can either OOM the heap - // or create an very very long query. - if (actualInterval.contains(Intervals.ETERNITY) && !gran.equals(Granularities.ALL)) { - throw new IAE("Cannot support ETERNITY interval with %s time granluarity", gran); + + if (!isQueryGranularityAllowed(actualInterval, gran)) { + throw new IAE( + "Cannot support interval [%s] with granularity [%s]", + Intervals.ETERNITY.equals(actualInterval) ? "ETERNITY" : actualInterval, + gran + ); } final RowWalker rowWalker = new RowWalker<>( diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java index 0dfdac7fb4df..88f3ea260bbf 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java @@ -69,6 +69,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -352,6 +353,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) @@ -369,6 +371,7 @@ public String getFormatString() configSupplier, bufferPool, tooSmallMergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java index 73685d7aa5d8..25b76e9f52a1 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java @@ -74,6 +74,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; @@ -601,6 +602,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) @@ -618,6 +620,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool2, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java index b18b4d9252c2..ef68abbbbb70 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java @@ -66,6 +66,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.column.ColumnConfig; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; @@ -266,6 +267,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java index 00d638af6938..e31249d5b165 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryMergeBufferTest.java @@ -40,6 +40,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; import org.apache.druid.testing.InitializedNullHandlingTest; import org.junit.AfterClass; import org.junit.Assert; @@ -139,6 +140,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( configSupplier, BUFFER_POOL, MERGE_BUFFER_POOL, + TestHelper.makeJsonMapper(), mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java index 9ba93ef63db8..acfc31892f87 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java @@ -43,6 +43,7 @@ import org.apache.druid.query.groupby.strategy.GroupByStrategySelector; import org.apache.druid.query.groupby.strategy.GroupByStrategyV1; import org.apache.druid.query.groupby.strategy.GroupByStrategyV2; +import org.apache.druid.segment.TestHelper; import org.junit.AfterClass; import org.junit.Rule; import org.junit.Test; @@ -110,6 +111,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory( configSupplier, BUFFER_POOL, MERGE_BUFFER_POOL, + TestHelper.makeJsonMapper(), mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java index db1c689e8414..f9060db4c94c 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerTest.java @@ -412,6 +412,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory( configSupplier, bufferPools.getProcessingPool(), bufferPools.getMergePool(), + TestHelper.makeJsonMapper(), mapper, QueryRunnerTestHelper.NOOP_QUERYWATCHER ) diff --git a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java index 0877cccbb498..8204f139141b 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/GroupByTimeseriesQueryRunnerTest.java @@ -19,6 +19,8 @@ package org.apache.druid.query.groupby; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Function; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; @@ -47,6 +49,7 @@ import org.apache.druid.query.timeseries.TimeseriesQuery; import org.apache.druid.query.timeseries.TimeseriesQueryRunnerTest; import org.apache.druid.query.timeseries.TimeseriesResultValue; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnType; @@ -104,6 +107,7 @@ public static Iterable constructorFeeder() @Override public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) { + final ObjectMapper jsonMapper = TestHelper.makeJsonMapper(); TimeseriesQuery tsQuery = (TimeseriesQuery) queryPlus.getQuery(); QueryRunner newRunner = factory.mergeRunners( Execs.directExecutor(), ImmutableList.of(runner) @@ -133,7 +137,15 @@ public Sequence run(QueryPlus queryPlus, ResponseContext responseContext) ); theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timeDimension); - theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity); + try { + theContext.put( + GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, + jsonMapper.writeValueAsString(granularity) + ); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0); } diff --git a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java index 8592fca11e0b..f96805506377 100644 --- a/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java +++ b/processing/src/test/java/org/apache/druid/query/groupby/NestedQueryPushDownTest.java @@ -73,6 +73,7 @@ import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexSegment; import org.apache.druid.segment.Segment; +import org.apache.druid.segment.TestHelper; import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.incremental.OnheapIncrementalIndex; @@ -311,6 +312,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) @@ -328,6 +330,7 @@ public String getFormatString() configSupplier, bufferPool, mergePool2, + TestHelper.makeJsonMapper(), new ObjectMapper(new SmileFactory()), NOOP_QUERYWATCHER ) diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 44b0786cb69b..14197005d7a1 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -63,6 +63,7 @@ import org.apache.druid.query.groupby.orderby.OrderByColumnSpec; import org.apache.druid.query.ordering.StringComparator; import org.apache.druid.query.ordering.StringComparators; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; import org.apache.druid.query.timeseries.TimeseriesQuery; @@ -71,6 +72,7 @@ import org.apache.druid.query.topn.NumericTopNMetricSpec; import org.apache.druid.query.topn.TopNMetricSpec; import org.apache.druid.query.topn.TopNQuery; +import org.apache.druid.segment.RowBasedStorageAdapter; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.segment.column.ColumnCapabilities; @@ -90,6 +92,7 @@ import org.apache.druid.sql.calcite.rule.GroupByRules; import org.apache.druid.sql.calcite.run.EngineFeature; import org.apache.druid.sql.calcite.table.RowSignatures; +import org.joda.time.Interval; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -118,6 +121,13 @@ public class DruidQuery */ public static final String CTX_SCAN_SIGNATURE = "scanSignature"; + /** + * Maximum number of time-granular buckets that we allow for non-Druid tables. + * + * Used by {@link #canUseQueryGranularity}. + */ + private static final int MAX_TIME_GRAINS_NON_DRUID_TABLE = 100000; + private final DataSource dataSource; private final PlannerContext plannerContext; @@ -774,6 +784,53 @@ private static Filtration toFiltration(DimFilter filter, VirtualColumnRegistry v return Filtration.create(filter).optimize(virtualColumnRegistry.getFullRowSignature()); } + /** + * Whether the provided combination of dataSource, filtration, and queryGranularity is safe to use in queries. + * + * Necessary because some combinations are unsafe, mainly because they would lead to the creation of too many + * time-granular buckets during query processing. + */ + private static boolean canUseQueryGranularity( + final DataSource dataSource, + final Filtration filtration, + final Granularity queryGranularity + ) + { + if (Granularities.ALL.equals(queryGranularity)) { + // Always OK: no storage adapter has problem with ALL. + return true; + } + + if (DataSourceAnalysis.forDataSource(dataSource).isConcreteTableBased()) { + // Always OK: queries on concrete tables (regular Druid datasources) use segment-based storage adapters + // (IncrementalIndex or QueryableIndex). These clip query interval to data interval, making wide query + // intervals safer. They do not have special checks for granularity and interval safety. + return true; + } + + // Query is against something other than a regular Druid table. Apply additional checks, because we can't + // count on interval-clipping to save us. + + for (final Interval filtrationInterval : filtration.getIntervals()) { + // Query may be using RowBasedStorageAdapter. We don't know for sure, so check + // RowBasedStorageAdapter#isQueryGranularityAllowed to be safe. + if (!RowBasedStorageAdapter.isQueryGranularityAllowed(filtrationInterval, queryGranularity)) { + return false; + } + + // Validate the interval against MAX_TIME_GRAINS_NON_DRUID_TABLE. + // Estimate based on the size of the first bucket, to avoid computing them all. (That's what we're + // trying to avoid!) + final Interval firstBucket = queryGranularity.bucket(filtrationInterval.getStart()); + final long estimatedNumBuckets = filtrationInterval.toDurationMillis() / firstBucket.toDurationMillis(); + if (estimatedNumBuckets > MAX_TIME_GRAINS_NON_DRUID_TABLE) { + return false; + } + } + + return true; + } + public DataSource getDataSource() { return dataSource; @@ -1004,6 +1061,10 @@ private TimeseriesQuery toTimeseriesQuery() final DataSource newDataSource = dataSourceFiltrationPair.lhs; final Filtration filtration = dataSourceFiltrationPair.rhs; + if (!canUseQueryGranularity(dataSource, filtration, queryGranularity)) { + return null; + } + final List postAggregators = new ArrayList<>(grouping.getPostAggregators()); if (sorting != null && sorting.getProjection() != null) { postAggregators.addAll(sorting.getProjection().getPostAggregators()); @@ -1208,7 +1269,8 @@ private GroupByQuery toGroupByQuery() dimensionExpression.getDruidExpression(), plannerContext.getExprMacroTable() ); - if (granularity == null) { + if (granularity == null || !canUseQueryGranularity(dataSource, filtration, granularity)) { + // Can't, or won't, convert this dimension to a query granularity. continue; } if (queryGranularity != null) { @@ -1219,10 +1281,20 @@ private GroupByQuery toGroupByQuery() } queryGranularity = granularity; int timestampDimensionIndexInDimensions = grouping.getDimensions().indexOf(dimensionExpression); + // these settings will only affect the most inner query sent to the down streaming compute nodes theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, dimensionExpression.getOutputName()); theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, timestampDimensionIndexInDimensions); - theContext.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, queryGranularity); + + try { + theContext.put( + GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, + plannerContext.getJsonMapper().writeValueAsString(queryGranularity) + ); + } + catch (Exception e) { + throw new RuntimeException(e); + } } } if (queryGranularity == null) { diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java index 617e5acd58fc..020166d11028 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/BaseCalciteQueryTest.java @@ -1275,7 +1275,14 @@ protected Map withTimestampResultContext( { Map output = new HashMap<>(input); output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField); - output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity); + + try { + output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, queryJsonMapper.writeValueAsString(granularity)); + } + catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, timestampResultFieldIndex); return output; } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java index b019ad090ba2..3a1ac2db9aa2 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteCorrelatedQueryTest.java @@ -49,7 +49,6 @@ import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.Map; @RunWith(JUnitParamsRunner.class) @@ -546,10 +545,6 @@ private Map withTimestampResultContext( Granularity granularity ) { - Map output = new HashMap<>(input); - output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD, timestampResultField); - output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, granularity); - output.put(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_INDEX, 0); - return output; + return withTimestampResultContext(input, timestampResultField, 0, granularity); } } diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 0e0579c1a2e7..87338a40e68d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -28,7 +28,6 @@ import org.apache.druid.common.config.NullHandling; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.HumanReadableBytes; -import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.java.util.common.JodaUtils; import org.apache.druid.java.util.common.StringUtils; @@ -6948,6 +6947,87 @@ public void testExactCountDistinctUsingSubqueryOnUnionAllTables() ); } + @Test + public void testUseTimeFloorInsteadOfGranularityOnJoinResult() + { + cannotVectorize(); + + testQuery( + "WITH main AS (SELECT * FROM foo LIMIT 2)\n" + + "SELECT TIME_FLOOR(__time, 'PT1H') AS \"time\", dim1, COUNT(*)\n" + + "FROM main\n" + + "WHERE dim1 IN (SELECT dim1 FROM main GROUP BY 1 ORDER BY COUNT(*) DESC LIMIT 5)\n" + + "GROUP BY 1, 2", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource( + join( + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("__time", "dim1") + .limit(2) + .build() + ), + new QueryDataSource( + GroupByQuery.builder() + .setDataSource( + new QueryDataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Intervals.ETERNITY)) + .columns("dim1") + .limit(2) + .build() + ) + ) + .setInterval(querySegmentSpec(Intervals.ETERNITY)) + .setGranularity(Granularities.ALL) + .setDimensions(dimensions(new DefaultDimensionSpec("dim1", "d0"))) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec( + "a0", + Direction.DESCENDING, + StringComparators.NUMERIC + ) + ), + 5 + ) + ) + .build() + ), + "j0.", + "(\"dim1\" == \"j0.d0\")", + JoinType.INNER + ) + ) + .setInterval(querySegmentSpec(Filtration.eternity())) + .setGranularity(Granularities.ALL) + .setVirtualColumns( + expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'PT1H',null,'UTC')", + ColumnType.LONG + ) + ) + .setDimensions(dimensions( + new DefaultDimensionSpec("v0", "d0", ColumnType.LONG), + new DefaultDimensionSpec("dim1", "d1") + )) + .setAggregatorSpecs(aggregators(new CountAggregatorFactory("a0"))) + .setContext(QUERY_CONTEXT_DEFAULT) + .build() + ), + NullHandling.sqlCompatible() + ? ImmutableList.of(new Object[]{946684800000L, "", 1L}, new Object[]{946771200000L, "10.1", 1L}) + : ImmutableList.of(new Object[]{946771200000L, "10.1", 1L}) + ); + } + @Test public void testMinMaxAvgDailyCountWithLimit() { @@ -14168,29 +14248,47 @@ public void testTimeseriesQueryWithEmptyInlineDatasourceAndGranularity() { // the SQL query contains an always FALSE filter ('bar' = 'baz'), which optimizes the query to also remove time // filter. the converted query hence contains ETERNITY interval but still a MONTH granularity due to the grouping. - // Such a query should fail since it will create a huge amount of time grains which can lead to OOM or a very very - // high query time. - Assert.assertThrows(IAE.class, () -> - testQuery( - "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n" - + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH AND 'bar'='baz'\n" - + "GROUP BY 1\n" - + "ORDER BY 1 DESC", - ImmutableList.of( - Druids.newTimeseriesQueryBuilder() - .dataSource( - InlineDataSource.fromIterable( - ImmutableList.of(), - RowSignature.builder().addTimeColumn().add("m1", ColumnType.STRING).build() - )) - .intervals(ImmutableList.of(Intervals.ETERNITY)) - .descending(true) - .granularity(Granularities.MONTH) - .aggregators(new LongMaxAggregatorFactory("a0", "m1")) - .build() - ), - ImmutableList.of() - ) + // Such a query should plan into a GroupBy query with a timestamp_floor function, instead of a timeseries + // with granularity MONTH, to avoid excessive materialization of time grains. + // + // See DruidQuery#canUseQueryGranularity for the relevant check. + + cannotVectorize(); + + testQuery( + "SELECT TIME_FLOOR(__time, 'P1m'), max(m1) from \"foo\"\n" + + "WHERE __time > CURRENT_TIMESTAMP - INTERVAL '3' MONTH AND 'bar'='baz'\n" + + "GROUP BY 1\n" + + "ORDER BY 1 DESC", + ImmutableList.of( + GroupByQuery.builder() + .setDataSource(InlineDataSource.fromIterable( + ImmutableList.of(), + RowSignature.builder() + .addTimeColumn() + .add("m1", ColumnType.FLOAT) + .build() + )) + .setInterval(querySegmentSpec(Intervals.ETERNITY)) + .setVirtualColumns(expressionVirtualColumn( + "v0", + "timestamp_floor(\"__time\",'P1m',null,'UTC')", + ColumnType.LONG + )) + .setGranularity(Granularities.ALL) + .addDimension(new DefaultDimensionSpec("v0", "d0", ColumnType.LONG)) + .addAggregator(new FloatMaxAggregatorFactory("a0", "m1")) + .setLimitSpec( + new DefaultLimitSpec( + ImmutableList.of( + new OrderByColumnSpec("d0", Direction.DESCENDING, StringComparators.NUMERIC) + ), + null + ) + ) + .build() + ), + ImmutableList.of() ); } }