Skip to content

Commit

Permalink
Separate retries configuration for queries and tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 10, 2022
1 parent 2988194 commit 99ef4ec
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ public final class SystemSessionProperties
public static final String INCREMENTAL_HASH_ARRAY_LOAD_FACTOR_ENABLED = "incremental_hash_array_load_factor_enabled";
public static final String MAX_PARTIAL_TOP_N_MEMORY = "max_partial_top_n_memory";
public static final String RETRY_POLICY = "retry_policy";
public static final String RETRY_ATTEMPTS = "retry_attempts";
public static final String QUERY_RETRY_ATTEMPTS = "query_retry_attempts";
public static final String TASK_RETRY_ATTEMPTS_OVERALL = "task_retry_attempts_overall";
public static final String TASK_RETRY_ATTEMPTS_PER_TASK = "task_retry_attempts_per_task";
public static final String RETRY_INITIAL_DELAY = "retry_initial_delay";
public static final String RETRY_MAX_DELAY = "retry_max_delay";
public static final String HIDE_INACCESSIBLE_COLUMNS = "hide_inaccessible_columns";
Expand Down Expand Up @@ -683,9 +685,19 @@ public SystemSessionProperties(
queryManagerConfig.getRetryPolicy(),
false),
integerProperty(
RETRY_ATTEMPTS,
"Maximum number of retry attempts",
queryManagerConfig.getRetryAttempts(),
QUERY_RETRY_ATTEMPTS,
"Maximum number of query retry attempts",
queryManagerConfig.getQueryRetryAttempts(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_OVERALL,
"Maximum number of task retry attempts overall",
queryManagerConfig.getTaskRetryAttemptsOverall(),
false),
integerProperty(
TASK_RETRY_ATTEMPTS_PER_TASK,
"Maximum number of task retry attempts per single task",
queryManagerConfig.getTaskRetryAttemptsPerTask(),
false),
durationProperty(
RETRY_INITIAL_DELAY,
Expand Down Expand Up @@ -1264,9 +1276,19 @@ public static RetryPolicy getRetryPolicy(Session session)
return retryPolicy;
}

public static int getRetryAttempts(Session session)
public static int getQueryRetryAttempts(Session session)
{
return session.getSystemProperty(QUERY_RETRY_ATTEMPTS, Integer.class);
}

public static int getTaskRetryAttemptsOverall(Session session)
{
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_OVERALL, Integer.class);
}

public static int getTaskRetryAttemptsPerTask(Session session)
{
return session.getSystemProperty(RETRY_ATTEMPTS, Integer.class);
return session.getSystemProperty(TASK_RETRY_ATTEMPTS_PER_TASK, Integer.class);
}

public static Duration getRetryInitialDelay(Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ public class QueryManagerConfig
private Duration requiredWorkersMaxWait = new Duration(5, TimeUnit.MINUTES);

private RetryPolicy retryPolicy = RetryPolicy.NONE;
private int retryAttempts = 4;
private int queryRetryAttempts = 4;
private int taskRetryAttemptsPerTask = 2;
private int taskRetryAttemptsOverall = Integer.MAX_VALUE;
private Duration retryInitialDelay = new Duration(10, SECONDS);
private Duration retryMaxDelay = new Duration(1, MINUTES);

Expand Down Expand Up @@ -414,15 +416,42 @@ public QueryManagerConfig setRetryPolicy(RetryPolicy retryPolicy)
}

@Min(0)
public int getRetryAttempts()
public int getQueryRetryAttempts()
{
return retryAttempts;
return queryRetryAttempts;
}

@Config("retry-attempts")
public QueryManagerConfig setRetryAttempts(int retryAttempts)
@Config("query-retry-attempts")
@LegacyConfig("retry-attempts")
public QueryManagerConfig setQueryRetryAttempts(int queryRetryAttempts)
{
this.retryAttempts = retryAttempts;
this.queryRetryAttempts = queryRetryAttempts;
return this;
}

@Min(0)
public int getTaskRetryAttemptsOverall()
{
return taskRetryAttemptsOverall;
}

@Config("task-retry-attempts-overall")
public QueryManagerConfig setTaskRetryAttemptsOverall(int taskRetryAttemptsOverall)
{
this.taskRetryAttemptsOverall = taskRetryAttemptsOverall;
return this;
}

@Min(0)
public int getTaskRetryAttemptsPerTask()
{
return taskRetryAttemptsPerTask;
}

@Config("task-retry-attempts-per-task")
public QueryManagerConfig setTaskRetryAttemptsPerTask(int taskRetryAttemptsPerTask)
{
this.taskRetryAttemptsPerTask = taskRetryAttemptsPerTask;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class FaultTolerantStageScheduler
private final NodeAllocator nodeAllocator;
private final TaskDescriptorStorage taskDescriptorStorage;
private final PartitionMemoryEstimator partitionMemoryEstimator;
private final int maxRetryAttemptsPerTask;

private final TaskLifecycleListener taskLifecycleListener;
// empty when the results are consumed via a direct exchange
Expand Down Expand Up @@ -130,7 +131,9 @@ public class FaultTolerantStageScheduler
@GuardedBy("this")
private final Set<Integer> finishedPartitions = new HashSet<>();
@GuardedBy("this")
private int remainingRetryAttempts;
private int remainingRetryAttemptsOverall;
@GuardedBy("this")
private final Map<Integer, Integer> remainingAttemptsPerTask = new HashMap<>();
@GuardedBy("this")
private Map<Integer, MemoryRequirements> partitionMemoryRequirements = new HashMap<>();

Expand All @@ -153,7 +156,8 @@ public FaultTolerantStageScheduler(
Map<PlanFragmentId, Exchange> sourceExchanges,
Optional<int[]> sourceBucketToPartitionMap,
Optional<BucketNodeMap> sourceBucketNodeMap,
int retryAttempts)
int taskRetryAttemptsOverall,
int taskRetryAttemptsPerTask)
{
checkArgument(!stage.getFragment().getStageExecutionDescriptor().isStageGroupedExecution(), "grouped execution is expected to be disabled");

Expand All @@ -170,8 +174,9 @@ public FaultTolerantStageScheduler(
this.sourceExchanges = ImmutableMap.copyOf(requireNonNull(sourceExchanges, "sourceExchanges is null"));
this.sourceBucketToPartitionMap = requireNonNull(sourceBucketToPartitionMap, "sourceBucketToPartitionMap is null");
this.sourceBucketNodeMap = requireNonNull(sourceBucketNodeMap, "sourceBucketNodeMap is null");
checkArgument(retryAttempts >= 0, "retryAttempts must be greater than or equal to 0: %s", retryAttempts);
this.remainingRetryAttempts = retryAttempts;
checkArgument(taskRetryAttemptsOverall >= 0, "taskRetryAttemptsOverall must be greater than or equal to 0: %s", taskRetryAttemptsOverall);
this.remainingRetryAttemptsOverall = taskRetryAttemptsOverall;
this.maxRetryAttemptsPerTask = taskRetryAttemptsPerTask;
}

public StageId getStageId()
Expand Down Expand Up @@ -531,8 +536,11 @@ private void updateTaskStatus(TaskStatus taskStatus, Optional<ExchangeSinkInstan
.orElse(toFailure(new TrinoException(GENERIC_INTERNAL_ERROR, "A task failed for an unknown reason")));
log.warn(failureInfo.toException(), "Task failed: %s", taskId);
ErrorCode errorCode = failureInfo.getErrorCode();
if (remainingRetryAttempts > 0 && (errorCode == null || errorCode.getType() != USER_ERROR)) {
remainingRetryAttempts--;

int taskRemainingAttempts = remainingAttemptsPerTask.getOrDefault(partitionId, maxRetryAttemptsPerTask);
if (remainingRetryAttemptsOverall > 0 && taskRemainingAttempts > 0 && (errorCode == null || errorCode.getType() != USER_ERROR)) {
remainingRetryAttemptsOverall--;
remainingAttemptsPerTask.put(partitionId, taskRemainingAttempts - 1);

// update memory limits for next attempt
MemoryRequirements memoryLimits = partitionMemoryRequirements.get(partitionId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,12 @@
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.trino.SystemSessionProperties.getConcurrentLifespansPerNode;
import static io.trino.SystemSessionProperties.getHashPartitionCount;
import static io.trino.SystemSessionProperties.getRetryAttempts;
import static io.trino.SystemSessionProperties.getQueryRetryAttempts;
import static io.trino.SystemSessionProperties.getRetryInitialDelay;
import static io.trino.SystemSessionProperties.getRetryMaxDelay;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsOverall;
import static io.trino.SystemSessionProperties.getTaskRetryAttemptsPerTask;
import static io.trino.SystemSessionProperties.getWriterMinSize;
import static io.trino.connector.CatalogName.isInternalSystemConnector;
import static io.trino.execution.BasicStageStats.aggregateBasicStageStats;
Expand Down Expand Up @@ -193,7 +195,9 @@ public class SqlQueryScheduler
private final CoordinatorStagesScheduler coordinatorStagesScheduler;

private final RetryPolicy retryPolicy;
private final int maxRetryAttempts;
private final int maxQueryRetryAttempts;
private final int maxTaskRetryAttemptsOverall;
private final int maxTaskRetryAttemptsPerTask;
private final AtomicInteger currentAttempt = new AtomicInteger();
private final Duration retryInitialDelay;
private final Duration retryMaxDelay;
Expand Down Expand Up @@ -270,7 +274,9 @@ public SqlQueryScheduler(
coordinatorTaskManager);

retryPolicy = getRetryPolicy(queryStateMachine.getSession());
maxRetryAttempts = getRetryAttempts(queryStateMachine.getSession());
maxQueryRetryAttempts = getQueryRetryAttempts(queryStateMachine.getSession());
maxTaskRetryAttemptsOverall = getTaskRetryAttemptsOverall(queryStateMachine.getSession());
maxTaskRetryAttemptsPerTask = getTaskRetryAttemptsPerTask(queryStateMachine.getSession());
retryInitialDelay = getRetryInitialDelay(queryStateMachine.getSession());
retryMaxDelay = getRetryMaxDelay(queryStateMachine.getSession());
}
Expand Down Expand Up @@ -344,7 +350,8 @@ private synchronized Optional<DistributedStagesScheduler> createDistributedStage
exchangeManager,
nodePartitioningManager,
coordinatorStagesScheduler.getTaskLifecycleListener(),
maxRetryAttempts,
maxTaskRetryAttemptsOverall,
maxTaskRetryAttemptsPerTask,
schedulerExecutor,
schedulerStats,
nodeAllocatorService,
Expand Down Expand Up @@ -422,7 +429,7 @@ else if (state == DistributedStagesSchedulerState.CANCELED) {

private boolean shouldRetry(ErrorCode errorCode)
{
return retryPolicy == RetryPolicy.QUERY && currentAttempt.get() < maxRetryAttempts && isRetryableErrorCode(errorCode);
return retryPolicy == RetryPolicy.QUERY && currentAttempt.get() < maxQueryRetryAttempts && isRetryableErrorCode(errorCode);
}

private static boolean isRetryableErrorCode(ErrorCode errorCode)
Expand Down Expand Up @@ -1745,7 +1752,8 @@ public static FaultTolerantDistributedStagesScheduler create(
ExchangeManager exchangeManager,
NodePartitioningManager nodePartitioningManager,
TaskLifecycleListener coordinatorTaskLifecycleListener,
int retryAttempts,
int taskRetryAttemptsOverall,
int taskRetryAttemptsPerTask,
ScheduledExecutorService scheduledExecutorService,
SplitSchedulerStats schedulerStats,
NodeAllocatorService nodeAllocatorService,
Expand Down Expand Up @@ -1818,7 +1826,8 @@ public static FaultTolerantDistributedStagesScheduler create(
sourceExchanges.buildOrThrow(),
inputBucketToPartition.getBucketToPartitionMap(),
inputBucketToPartition.getBucketNodeMap(),
retryAttempts);
taskRetryAttemptsOverall,
taskRetryAttemptsPerTask);

schedulers.add(scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void testDefaults()
.setRequiredWorkers(1)
.setRequiredWorkersMaxWait(new Duration(5, MINUTES))
.setRetryPolicy(RetryPolicy.NONE)
.setRetryAttempts(4)
.setQueryRetryAttempts(4)
.setTaskRetryAttemptsOverall(Integer.MAX_VALUE)
.setTaskRetryAttemptsPerTask(2)
.setRetryInitialDelay(new Duration(10, SECONDS))
.setRetryMaxDelay(new Duration(1, MINUTES))
.setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(1, GIGABYTE))
Expand Down Expand Up @@ -101,7 +103,9 @@ public void testExplicitPropertyMappings()
.put("query-manager.required-workers", "333")
.put("query-manager.required-workers-max-wait", "33m")
.put("retry-policy", "QUERY")
.put("retry-attempts", "0")
.put("query-retry-attempts", "0")
.put("task-retry-attempts-overall", "17")
.put("task-retry-attempts-per-task", "9")
.put("retry-initial-delay", "1m")
.put("retry-max-delay", "1h")
.put("fault-tolerant-execution-target-task-input-size", "222MB")
Expand Down Expand Up @@ -136,7 +140,9 @@ public void testExplicitPropertyMappings()
.setRequiredWorkers(333)
.setRequiredWorkersMaxWait(new Duration(33, MINUTES))
.setRetryPolicy(RetryPolicy.QUERY)
.setRetryAttempts(0)
.setQueryRetryAttempts(0)
.setTaskRetryAttemptsOverall(17)
.setTaskRetryAttemptsPerTask(9)
.setRetryInitialDelay(new Duration(1, MINUTES))
.setRetryMaxDelay(new Duration(1, HOURS))
.setFaultTolerantExecutionTargetTaskInputSize(DataSize.of(222, MEGABYTE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ private FaultTolerantStageScheduler createFaultTolerantTaskScheduler(
sourceExchanges,
Optional.empty(),
Optional.empty(),
retryAttempts,
retryAttempts);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ protected final QueryRunner createQueryRunner()
.put("exchange.max-error-duration", MAX_ERROR_DURATION.toString())
.put("retry-policy", retryPolicy.toString())
.put("retry-initial-delay", "0s")
.put("retry-attempts", "1")
.put("query-retry-attempts", "1")
.put("task-retry-attempts-overall", "1")
.put("failure-injection.request-timeout", new Duration(REQUEST_TIMEOUT.toMillis() * 2, MILLISECONDS).toString())
// making http timeouts shorter so tests which simulate communication timeouts finish in reasonable amount of time
.put("exchange.http-client.idle-timeout", REQUEST_TIMEOUT.toString())
Expand Down

0 comments on commit 99ef4ec

Please sign in to comment.