Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planning time limit #7213

Merged
merged 2 commits into from
Apr 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public final class SystemSessionProperties
public static final String QUERY_MAX_MEMORY = "query_max_memory";
public static final String QUERY_MAX_TOTAL_MEMORY = "query_max_total_memory";
public static final String QUERY_MAX_EXECUTION_TIME = "query_max_execution_time";
public static final String QUERY_MAX_PLANNING_TIME = "query_max_planning_time";
public static final String QUERY_MAX_RUN_TIME = "query_max_run_time";
public static final String RESOURCE_OVERCOMMIT = "resource_overcommit";
public static final String QUERY_MAX_CPU_TIME = "query_max_cpu_time";
Expand Down Expand Up @@ -261,6 +262,11 @@ public SystemSessionProperties(
"Maximum execution time of a query",
queryManagerConfig.getQueryMaxExecutionTime(),
false),
durationProperty(
QUERY_MAX_PLANNING_TIME,
"Maximum planning time of a query",
queryManagerConfig.getQueryMaxPlanningTime(),
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
false),
durationProperty(
QUERY_MAX_CPU_TIME,
"Maximum CPU time of a query",
Expand Down Expand Up @@ -737,6 +743,11 @@ public static Duration getQueryMaxExecutionTime(Session session)
return session.getSystemProperty(QUERY_MAX_EXECUTION_TIME, Duration.class);
}

public static Duration getQueryMaxPlanningTime(Session session)
{
return session.getSystemProperty(QUERY_MAX_PLANNING_TIME, Duration.class);
}

public static boolean resourceOvercommit(Session session)
{
return session.getSystemProperty(RESOURCE_OVERCOMMIT, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public Optional<DateTime> getExecutionStartTime()
return getEndTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return Optional.empty();
}

@Override
public Optional<DateTime> getEndTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,12 @@ public Optional<DateTime> getExecutionStartTime()
return stateMachine.getExecutionStartTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

@Override
public Optional<DateTime> getEndTime()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,12 @@ public QueryState getState()
return stateMachine.getQueryState();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

public List<Expression> getParameters()
{
return parameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class QueryManagerConfig
private String queryExecutionPolicy = "all-at-once";
private Duration queryMaxRunTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxExecutionTime = new Duration(100, TimeUnit.DAYS);
private Duration queryMaxPlanningTime = new Duration(10, TimeUnit.MINUTES);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
private Duration queryMaxCpuTime = new Duration(1_000_000_000, TimeUnit.DAYS);
private Optional<DataSize> queryMaxScanPhysicalBytes = Optional.empty();

Expand Down Expand Up @@ -284,6 +285,19 @@ public QueryManagerConfig setQueryMaxExecutionTime(Duration queryMaxExecutionTim
return this;
}

@NotNull
skrzypo987 marked this conversation as resolved.
Show resolved Hide resolved
public Duration getQueryMaxPlanningTime()
{
return queryMaxPlanningTime;
}

@Config("query.max-planning-time")
public QueryManagerConfig setQueryMaxPlanningTime(Duration queryMaxPlanningTime)
{
this.queryMaxPlanningTime = queryMaxPlanningTime;
return this;
}

@NotNull
@MinDuration("1ns")
public Duration getQueryMaxCpuTime()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,13 @@ public Optional<DateTime> getExecutionStartTime()
return queryStateTimer.getExecutionStartTime();
}

public Optional<Duration> getPlanningTime()
{
// Execution start time is empty if planning has not started
return queryStateTimer.getExecutionStartTime()
.map(ignored -> queryStateTimer.getPlanningTime());
}

public DateTime getLastHeartbeat()
{
return queryStateTimer.getLastHeartbeat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@ public Optional<DateTime> getExecutionStartTime()
return toDateTime(beginPlanningNanos);
}

public Optional<DateTime> getPlanningStartTime()
{
return toDateTime(beginPlanningNanos);
}

public Duration getElapsedTime()
{
if (endNanos.get() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static com.google.common.base.Preconditions.checkState;
import static io.trino.SystemSessionProperties.getQueryMaxExecutionTime;
import static io.trino.SystemSessionProperties.getQueryMaxPlanningTime;
import static io.trino.SystemSessionProperties.getQueryMaxRunTime;
import static io.trino.spi.StandardErrorCode.ABANDONED_QUERY;
import static io.trino.spi.StandardErrorCode.EXCEEDED_TIME_LIMIT;
Expand Down Expand Up @@ -178,11 +179,16 @@ private void enforceTimeLimits()
}
Duration queryMaxRunTime = getQueryMaxRunTime(query.getSession());
Duration queryMaxExecutionTime = getQueryMaxExecutionTime(query.getSession());
Duration queryMaxPlanningTime = getQueryMaxPlanningTime(query.getSession());
Optional<DateTime> executionStartTime = query.getExecutionStartTime();
Optional<Duration> planningTime = query.getPlanningTime();
DateTime createTime = query.getCreateTime();
if (executionStartTime.isPresent() && executionStartTime.get().plus(queryMaxExecutionTime.toMillis()).isBeforeNow()) {
query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum execution time limit of " + queryMaxExecutionTime));
}
planningTime
.filter(duration -> duration.compareTo(queryMaxPlanningTime) > 0)
.ifPresent(ignored -> query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded the maximum planning time limit of " + queryMaxPlanningTime)));
if (createTime.plus(queryMaxRunTime.toMillis()).isBeforeNow()) {
query.fail(new TrinoException(EXCEEDED_TIME_LIMIT, "Query exceeded maximum time limit of " + queryMaxRunTime));
}
Expand Down Expand Up @@ -288,6 +294,8 @@ public interface TrackedQuery

Optional<DateTime> getExecutionStartTime();

Optional<Duration> getPlanningTime();

DateTime getLastHeartbeat();

Optional<DateTime> getEndTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,18 @@
import static com.google.common.base.Preconditions.checkArgument;
skrzypo987 marked this conversation as resolved.
Show resolved Hide resolved
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Throwables.throwIfInstanceOf;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.succinctBytes;
import static io.trino.SystemSessionProperties.isEnableDynamicFiltering;
import static io.trino.execution.QueryState.FAILED;
import static io.trino.execution.QueryState.PLANNING;
import static io.trino.execution.buffer.OutputBuffers.BROADCAST_PARTITION_ID;
import static io.trino.execution.buffer.OutputBuffers.createInitialEmptyOutputBuffers;
import static io.trino.execution.scheduler.SqlQueryScheduler.createSqlQueryScheduler;
import static io.trino.server.DynamicFilterService.DynamicFiltersStats;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.sql.ParameterUtils.parameterExtractor;
import static java.lang.Thread.currentThread;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -335,6 +339,12 @@ public Optional<DateTime> getExecutionStartTime()
return stateMachine.getExecutionStartTime();
}

@Override
public Optional<Duration> getPlanningTime()
{
return stateMachine.getPlanningTime();
}

@Override
public DateTime getLastHeartbeat()
{
Expand Down Expand Up @@ -379,11 +389,33 @@ public void start()
return;
}

PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
AtomicReference<Thread> planningThread = new AtomicReference<>(currentThread());
stateMachine.getStateChange(PLANNING).addListener(() -> {
skrzypo987 marked this conversation as resolved.
Show resolved Hide resolved
if (stateMachine.getQueryState() == FAILED) {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
synchronized (this) {
Thread thread = planningThread.get();
if (thread != null) {
thread.interrupt();
}
}
}
}, directExecutor());

try {
PlanRoot plan = planQuery();
// DynamicFilterService needs plan for query to be registered.
// Query should be registered before dynamic filter suppliers are requested in distribution planning.
registerDynamicFilteringQuery(plan);
planDistribution(plan);
}
finally {
synchronized (this) {
planningThread.set(null);
// Clear the interrupted flag in case there was a race condition where
// the planning thread was interrupted right after planning completes above
Thread.interrupted();
}
}

if (!stateMachine.transitionToStarting()) {
// query already started or finished
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,41 +19,45 @@
import org.testng.annotations.Test;

import java.util.Map;
import java.util.concurrent.TimeUnit;

import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.airlift.units.DataSize.Unit.KILOBYTE;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.HOURS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;

public class TestQueryManagerConfig
{
@Test
public void testDefaults()
{
assertRecordedDefaults(recordDefaults(QueryManagerConfig.class)
.setMinQueryExpireAge(new Duration(15, TimeUnit.MINUTES))
.setMinQueryExpireAge(new Duration(15, MINUTES))
.setMaxQueryHistory(100)
.setMaxQueryLength(1_000_000)
.setMaxStageCount(100)
.setStageCountWarningThreshold(50)
.setClientTimeout(new Duration(5, TimeUnit.MINUTES))
.setClientTimeout(new Duration(5, MINUTES))
.setScheduleSplitBatchSize(1000)
.setMinScheduleSplitBatchSize(100)
.setMaxConcurrentQueries(1000)
.setMaxQueuedQueries(5000)
.setInitialHashPartitions(100)
.setQueryManagerExecutorPoolSize(5)
.setRemoteTaskMinErrorDuration(new Duration(5, TimeUnit.MINUTES))
.setRemoteTaskMaxErrorDuration(new Duration(5, TimeUnit.MINUTES))
.setRemoteTaskMinErrorDuration(new Duration(5, MINUTES))
.setRemoteTaskMaxErrorDuration(new Duration(5, MINUTES))
.setRemoteTaskMaxCallbackThreads(1000)
.setQueryExecutionPolicy("all-at-once")
.setQueryMaxRunTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxExecutionTime(new Duration(100, TimeUnit.DAYS))
.setQueryMaxCpuTime(new Duration(1_000_000_000, TimeUnit.DAYS))
.setQueryMaxRunTime(new Duration(100, DAYS))
.setQueryMaxExecutionTime(new Duration(100, DAYS))
.setQueryMaxPlanningTime(new Duration(10, MINUTES))
.setQueryMaxCpuTime(new Duration(1_000_000_000, DAYS))
.setQueryMaxScanPhysicalBytes(null)
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, TimeUnit.MINUTES)));
.setRequiredWorkersMaxWait(new Duration(5, MINUTES)));
}

@Test
Expand All @@ -78,35 +82,37 @@ public void testExplicitPropertyMappings()
.put("query.execution-policy", "phased")
.put("query.max-run-time", "2h")
.put("query.max-execution-time", "3h")
.put("query.max-planning-time", "1h")
.put("query.max-cpu-time", "2d")
.put("query.max-scan-physical-bytes", "1kB")
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
.build();

QueryManagerConfig expected = new QueryManagerConfig()
.setMinQueryExpireAge(new Duration(30, TimeUnit.SECONDS))
.setMinQueryExpireAge(new Duration(30, SECONDS))
.setMaxQueryHistory(10)
.setMaxQueryLength(10000)
.setMaxStageCount(12345)
.setStageCountWarningThreshold(12300)
.setClientTimeout(new Duration(10, TimeUnit.SECONDS))
.setClientTimeout(new Duration(10, SECONDS))
.setScheduleSplitBatchSize(99)
.setMinScheduleSplitBatchSize(9)
.setMaxConcurrentQueries(10)
.setMaxQueuedQueries(15)
.setInitialHashPartitions(16)
.setQueryManagerExecutorPoolSize(11)
.setRemoteTaskMinErrorDuration(new Duration(60, TimeUnit.SECONDS))
.setRemoteTaskMaxErrorDuration(new Duration(60, TimeUnit.SECONDS))
.setRemoteTaskMinErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxErrorDuration(new Duration(60, SECONDS))
.setRemoteTaskMaxCallbackThreads(10)
.setQueryExecutionPolicy("phased")
.setQueryMaxRunTime(new Duration(2, TimeUnit.HOURS))
.setQueryMaxExecutionTime(new Duration(3, TimeUnit.HOURS))
.setQueryMaxCpuTime(new Duration(2, TimeUnit.DAYS))
.setQueryMaxRunTime(new Duration(2, HOURS))
.setQueryMaxExecutionTime(new Duration(3, HOURS))
.setQueryMaxPlanningTime(new Duration(1, HOURS))
.setQueryMaxCpuTime(new Duration(2, DAYS))
.setQueryMaxScanPhysicalBytes(DataSize.of(1, KILOBYTE))
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, TimeUnit.MINUTES));
.setRequiredWorkersMaxWait(new Duration(33, MINUTES));

assertFullMapping(properties, expected);
}
Expand Down
12 changes: 12 additions & 0 deletions docs/src/main/sphinx/admin/properties-query-management.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@ The maximum allowed time for a query to be actively executing on the
cluster, before it is terminated. Compared to the run time below, execution
time does not include analysis, query planning or wait times in a queue.

``query.max-planning-time``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please review @mosabua

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

^^^^^^^^^^^^^^^^^^^^^^^^^^^

* **Type:** ``duration``
* **Default value:** ``10 minutes``
* **Session property:** ``query_max_planning_time``

The maximum allowed time for a query to be actively planning the execution.
After this period the coordinator will make its best effort to stop the
skrzypo987 marked this conversation as resolved.
Show resolved Hide resolved
query. Note that some operations in planning phase are not easily cancellable
and may not terminate immediately.

``query.max-run-time``
^^^^^^^^^^^^^^^^^^^^^^

Expand Down
Loading