Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQL: Use timestamp_floor when granularity is not safe. #13206

Merged
merged 6 commits into from
Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.generator.DataGenerator;
import org.apache.druid.segment.generator.GeneratorBasicSchemas;
Expand Down Expand Up @@ -401,6 +402,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ private static GroupByQueryRunnerFactory makeGroupByQueryRunnerFactory(
bufferPool,
mergeBufferPool,
mapper,
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.generator.DataGenerator;
Expand Down Expand Up @@ -516,6 +517,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
QueryBenchmarkUtil.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public int getNumThreads()
GroupByQueryConfig::new,
new StupidPool<>("map-virtual-column-groupby-test", () -> ByteBuffer.allocate(1024)),
new DefaultBlockingPool<>(() -> ByteBuffer.allocate(1024), 1),
TestHelper.makeJsonMapper(),
new DefaultObjectMapper(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.druid.msq.exec;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -523,7 +524,8 @@ private QueryDefinition initializeQueryDefAndState(final Closer closer)
final QueryDefinition queryDef = makeQueryDefinition(
id(),
makeQueryControllerToolKit(),
task.getQuerySpec()
task.getQuerySpec(),
context.jsonMapper()
);

QueryValidator.validateQueryDef(queryDef);
Expand Down Expand Up @@ -1311,7 +1313,8 @@ private void cleanUpDurableStorageIfNeeded()
private static QueryDefinition makeQueryDefinition(
final String queryId,
@SuppressWarnings("rawtypes") final QueryKit toolKit,
final MSQSpec querySpec
final MSQSpec querySpec,
final ObjectMapper jsonMapper
)
{
final MSQTuningConfig tuningConfig = querySpec.getTuningConfig();
Expand Down Expand Up @@ -1395,7 +1398,9 @@ private static QueryDefinition makeQueryDefinition(
}

// Then, add a segment-generation stage.
final DataSchema dataSchema = generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings);
final DataSchema dataSchema =
generateDataSchema(querySpec, querySignature, queryClusterBy, columnMappings, jsonMapper);

builder.add(
StageDefinition.builder(queryDef.getNextStageNumber())
.inputs(new StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
Expand All @@ -1421,7 +1426,8 @@ private static DataSchema generateDataSchema(
MSQSpec querySpec,
RowSignature querySignature,
ClusterBy queryClusterBy,
ColumnMappings columnMappings
ColumnMappings columnMappings,
ObjectMapper jsonMapper
)
{
final DataSourceMSQDestination destination = (DataSourceMSQDestination) querySpec.getDestination();
Expand All @@ -1442,26 +1448,33 @@ private static DataSchema generateDataSchema(
new TimestampSpec(ColumnHolder.TIME_COLUMN_NAME, "millis", null),
new DimensionsSpec(dimensionsAndAggregators.lhs),
dimensionsAndAggregators.rhs.toArray(new AggregatorFactory[0]),
makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery),
makeGranularitySpecForIngestion(querySpec.getQuery(), querySpec.getColumnMappings(), isRollupQuery, jsonMapper),
new TransformSpec(null, Collections.emptyList())
);
}

private static GranularitySpec makeGranularitySpecForIngestion(
final Query<?> query,
final ColumnMappings columnMappings,
final boolean isRollupQuery
final boolean isRollupQuery,
final ObjectMapper jsonMapper
)
{
if (isRollupQuery) {
final String queryGranularity = query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, "");
final String queryGranularityString =
query.context().getString(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, "");

if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularity.isEmpty()) {
return new ArbitraryGranularitySpec(
Granularity.fromString(queryGranularity),
true,
Intervals.ONLY_ETERNITY
);
if (timeIsGroupByDimension((GroupByQuery) query, columnMappings) && !queryGranularityString.isEmpty()) {
final Granularity queryGranularity;

try {
queryGranularity = jsonMapper.readValue(queryGranularityString, Granularity.class);
}
catch (JsonProcessingException e) {
throw new RuntimeException(e);
}

return new ArbitraryGranularitySpec(queryGranularity, true, Intervals.ONLY_ETERNITY);
}
return new ArbitraryGranularitySpec(Granularities.NONE, true, Intervals.ONLY_ETERNITY);
} else {
Expand Down
19 changes: 11 additions & 8 deletions processing/src/main/java/org/apache/druid/query/QueryContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.druid.query;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts.Vectorize;
import org.apache.druid.segment.QueryableIndexStorageAdapter;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -231,16 +232,18 @@ public <E extends Enum<E>> E getEnum(String key, Class<E> clazz, E defaultValue)
return QueryContexts.getAsEnum(key, get(key), clazz, defaultValue);
}

public Granularity getGranularity(String key)
public Granularity getGranularity(String key, ObjectMapper jsonMapper)
{
final Object value = get(key);
if (value == null) {
final String granularityString = getString(key);
if (granularityString == null) {
return null;
}
if (value instanceof Granularity) {
return (Granularity) value;
} else {
throw QueryContexts.badTypeException(key, "a Granularity", value);

try {
return jsonMapper.readValue(granularityString, Granularity.class);
}
catch (IOException e) {
throw QueryContexts.badTypeException(key, "a Granularity", granularityString);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.collections.ReferenceCountingResourceHolder;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.Intervals;
Expand Down Expand Up @@ -98,6 +99,7 @@ public class GroupByStrategyV2 implements GroupByStrategy
private final Supplier<GroupByQueryConfig> configSupplier;
private final NonBlockingPool<ByteBuffer> bufferPool;
private final BlockingPool<ByteBuffer> mergeBufferPool;
private final ObjectMapper jsonMapper;
private final ObjectMapper spillMapper;
private final QueryWatcher queryWatcher;

Expand All @@ -107,6 +109,7 @@ public GroupByStrategyV2(
Supplier<GroupByQueryConfig> configSupplier,
@Global NonBlockingPool<ByteBuffer> bufferPool,
@Merging BlockingPool<ByteBuffer> mergeBufferPool,
@Json ObjectMapper jsonMapper,
@Smile ObjectMapper spillMapper,
QueryWatcher queryWatcher
)
Expand All @@ -115,6 +118,7 @@ public GroupByStrategyV2(
this.configSupplier = configSupplier;
this.bufferPool = bufferPool;
this.mergeBufferPool = mergeBufferPool;
this.jsonMapper = jsonMapper;
this.spillMapper = spillMapper;
this.queryWatcher = queryWatcher;
}
Expand Down Expand Up @@ -252,7 +256,7 @@ public Sequence<ResultRow> mergeResults(
// the granularity and dimensions are slightly different.
// now, part of the query plan logic is handled in GroupByStrategyV2, not only in DruidQuery.toGroupByQuery()
final Granularity timestampResultFieldGranularity
= queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY);
= queryContext.getGranularity(GroupByQuery.CTX_TIMESTAMP_RESULT_FIELD_GRANULARITY, jsonMapper);
dimensionSpecs =
query.getDimensions()
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,17 @@ public class RowBasedStorageAdapter<RowType> implements StorageAdapter
this.rowSignature = Preconditions.checkNotNull(rowSignature, "rowSignature");
}

/**
* Whether the provided time interval and granularity combination is allowed.
*
* We restrict ETERNITY with non-ALL granularity, because allowing it would involve creating a very high number
* of time grains. This would cause queries to take an excessive amount of time or run out of memory.
*/
public static boolean isQueryGranularityAllowed(final Interval interval, final Granularity granularity)
{
return Granularities.ALL.equals(granularity) || !Intervals.ETERNITY.equals(interval);
}

@Override
public Interval getInterval()
{
Expand Down Expand Up @@ -172,11 +183,13 @@ public Sequence<Cursor> makeCursors(
if (actualInterval == null) {
return Sequences.empty();
}
// Incase time interval is ETERNITY with non-ALL granularity, don't risk creating time grains.
// For all non-ALL granularities, the time grains will be very high in number and that can either OOM the heap
// or create an very very long query.
if (actualInterval.contains(Intervals.ETERNITY) && !gran.equals(Granularities.ALL)) {
throw new IAE("Cannot support ETERNITY interval with %s time granluarity", gran);

if (!isQueryGranularityAllowed(actualInterval, gran)) {
throw new IAE(
"Cannot support interval [%s] with granularity [%s]",
Intervals.ETERNITY.equals(actualInterval) ? "ETERNITY" : actualInterval,
gran
);
}

final RowWalker<RowType> rowWalker = new RowWalker<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -352,6 +353,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand All @@ -369,6 +371,7 @@ public String getFormatString()
configSupplier,
bufferPool,
tooSmallMergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.column.ColumnType;
Expand Down Expand Up @@ -601,6 +602,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand All @@ -618,6 +620,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool2,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexSegment;
import org.apache.druid.segment.Segment;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.ColumnConfig;
import org.apache.druid.segment.incremental.IncrementalIndex;
import org.apache.druid.segment.incremental.IncrementalIndexSchema;
Expand Down Expand Up @@ -266,6 +267,7 @@ public String getFormatString()
configSupplier,
bufferPool,
mergePool,
TestHelper.makeJsonMapper(),
new ObjectMapper(new SmileFactory()),
NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -139,6 +140,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.druid.query.groupby.strategy.GroupByStrategySelector;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV1;
import org.apache.druid.query.groupby.strategy.GroupByStrategyV2;
import org.apache.druid.segment.TestHelper;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -110,6 +111,7 @@ private static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
BUFFER_POOL,
MERGE_BUFFER_POOL,
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ public static GroupByQueryRunnerFactory makeQueryRunnerFactory(
configSupplier,
bufferPools.getProcessingPool(),
bufferPools.getMergePool(),
TestHelper.makeJsonMapper(),
mapper,
QueryRunnerTestHelper.NOOP_QUERYWATCHER
)
Expand Down
Loading