Skip to content

Commit

Permalink
Detect and fail tasks containing stuck splits
Browse files Browse the repository at this point in the history
  • Loading branch information
leetcode-1533 authored and arhimondr committed Jul 19, 2022
1 parent 1f8a79f commit 89bc6af
Show file tree
Hide file tree
Showing 7 changed files with 420 additions and 18 deletions.
150 changes: 148 additions & 2 deletions core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.log.Logger;
Expand All @@ -35,10 +36,13 @@
import io.trino.execution.buffer.OutputBuffers;
import io.trino.execution.buffer.OutputBuffers.OutputBufferId;
import io.trino.execution.executor.TaskExecutor;
import io.trino.execution.executor.TaskExecutor.RunningSplitInfo;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.NodeMemoryConfig;
import io.trino.memory.QueryContext;
import io.trino.operator.RetryPolicy;
import io.trino.operator.scalar.JoniRegexpFunctions;
import io.trino.operator.scalar.JoniRegexpReplaceLambdaFunction;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.VersionEmbedder;
Expand All @@ -62,10 +66,12 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Throwables.throwIfUnchecked;
Expand All @@ -76,19 +82,30 @@
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.collect.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.SqlTask.createSqlTask;
import static io.trino.execution.executor.PrioritizedSplitRunner.SPLIT_RUN_QUANTA;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.spi.StandardErrorCode.ABANDONED_TASK;
import static io.trino.spi.StandardErrorCode.GENERIC_USER_ERROR;
import static io.trino.spi.StandardErrorCode.SERVER_SHUTTING_DOWN;
import static java.lang.Math.min;
import static java.lang.String.format;
import static java.lang.System.lineSeparator;
import static java.util.Arrays.asList;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static java.util.concurrent.Executors.newScheduledThreadPool;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;

public class SqlTaskManager
implements Closeable
{
private static final Logger log = Logger.get(SqlTaskManager.class);
private static final Set<String> JONI_REGEXP_FUNCTION_CLASS_NAMES = ImmutableSet.of(
JoniRegexpFunctions.class.getName(),
JoniRegexpReplaceLambdaFunction.class.getName());
private static final Predicate<List<StackTraceElement>> STUCK_SPLIT_STACK_TRACE_PREDICATE =
elements -> elements.stream().anyMatch(stackTraceElement -> JONI_REGEXP_FUNCTION_CLASS_NAMES.contains(stackTraceElement.getClassName()));

private final VersionEmbedder versionEmbedder;
private final ExecutorService taskNotificationExecutor;
Expand All @@ -109,6 +126,7 @@ public class SqlTaskManager
private final long queryMaxMemoryPerNode;

private final CounterStat failedTasks = new CounterStat();
private final Optional<StuckSplitTasksInterrupter> stuckSplitTasksInterrupter;

@Inject
public SqlTaskManager(
Expand All @@ -126,6 +144,41 @@ public SqlTaskManager(
NodeSpillConfig nodeSpillConfig,
GcMonitor gcMonitor,
ExchangeManagerRegistry exchangeManagerRegistry)
{
this(versionEmbedder,
planner,
locationFactory,
taskExecutor,
splitMonitor,
nodeInfo,
localMemoryManager,
taskManagementExecutor,
config,
nodeMemoryConfig,
localSpillManager,
nodeSpillConfig,
gcMonitor,
exchangeManagerRegistry,
STUCK_SPLIT_STACK_TRACE_PREDICATE);
}

@VisibleForTesting
public SqlTaskManager(
VersionEmbedder versionEmbedder,
LocalExecutionPlanner planner,
LocationFactory locationFactory,
TaskExecutor taskExecutor,
SplitMonitor splitMonitor,
NodeInfo nodeInfo,
LocalMemoryManager localMemoryManager,
TaskManagementExecutor taskManagementExecutor,
TaskManagerConfig config,
NodeMemoryConfig nodeMemoryConfig,
LocalSpillManager localSpillManager,
NodeSpillConfig nodeSpillConfig,
GcMonitor gcMonitor,
ExchangeManagerRegistry exchangeManagerRegistry,
Predicate<List<StackTraceElement>> stuckSplitStackTracePredicate)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -165,6 +218,14 @@ public SqlTaskManager(
maxBroadcastBufferSize,
requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null"),
failedTasks)));

stuckSplitTasksInterrupter = createStuckSplitTasksInterrupter(
config.isInterruptStuckSplitTasksEnabled(),
config.getInterruptStuckSplitTasksWarningThreshold(),
config.getInterruptStuckSplitTasksTimeout(),
config.getInterruptStuckSplitTasksDetectionInterval(),
stuckSplitStackTracePredicate,
taskExecutor);
}

private QueryContext createQueryContext(
Expand Down Expand Up @@ -211,7 +272,19 @@ public void start()
catch (Throwable e) {
log.warn(e, "Error updating stats");
}
}, 0, 1, TimeUnit.SECONDS);
}, 0, 1, SECONDS);

stuckSplitTasksInterrupter.ifPresent(interrupter -> {
long intervalSeconds = interrupter.getStuckSplitsDetectionInterval().roundTo(SECONDS);
taskManagementExecutor.scheduleAtFixedRate(() -> {
try {
failStuckSplitTasks();
}
catch (Throwable e) {
log.warn(e, "Error failing stuck split tasks");
}
}, 0, intervalSeconds, SECONDS);
});
}

@PreDestroy
Expand All @@ -228,7 +301,7 @@ public void close()
}
if (taskCanceled) {
try {
TimeUnit.SECONDS.sleep(5);
SECONDS.sleep(5);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -597,4 +670,77 @@ public QueryContext getQueryContext(QueryId queryId)
{
return queryContexts.getUnchecked(queryId);
}

@VisibleForTesting
public void failStuckSplitTasks()
{
stuckSplitTasksInterrupter.ifPresent(StuckSplitTasksInterrupter::failStuckSplitTasks);
}

private Optional<StuckSplitTasksInterrupter> createStuckSplitTasksInterrupter(
boolean enableInterruptStuckSplitTasks,
Duration stuckSplitsWarningThreshold,
Duration interruptStuckSplitTasksTimeout,
Duration stuckSplitsDetectionInterval,
Predicate<List<StackTraceElement>> stuckSplitStackTracePredicate,
TaskExecutor taskExecutor)
{
if (!enableInterruptStuckSplitTasks) {
return Optional.empty();
}
return Optional.of(
new StuckSplitTasksInterrupter(
stuckSplitsWarningThreshold,
interruptStuckSplitTasksTimeout,
stuckSplitsDetectionInterval,
stuckSplitStackTracePredicate,
taskExecutor));
}

private class StuckSplitTasksInterrupter
{
private final Duration interruptStuckSplitTasksTimeout;
private final Duration stuckSplitsDetectionInterval;
private final Predicate<List<StackTraceElement>> stuckSplitStackTracePredicate;
private final TaskExecutor taskExecutor;

public StuckSplitTasksInterrupter(
Duration stuckSplitsWarningThreshold,
Duration interruptStuckSplitTasksTimeout,
Duration stuckSplitDetectionInterval,
Predicate<List<StackTraceElement>> stuckSplitStackTracePredicate,
TaskExecutor taskExecutor)
{
checkArgument(interruptStuckSplitTasksTimeout.compareTo(SPLIT_RUN_QUANTA) >= 0, "interruptStuckSplitTasksTimeout must be at least %s", SPLIT_RUN_QUANTA);
checkArgument(stuckSplitsWarningThreshold.compareTo(interruptStuckSplitTasksTimeout) <= 0, "interruptStuckSplitTasksTimeout cannot be less than stuckSplitsWarningThreshold");

this.interruptStuckSplitTasksTimeout = requireNonNull(interruptStuckSplitTasksTimeout, "interruptStuckSplitTasksTimeout is null");
this.stuckSplitsDetectionInterval = requireNonNull(stuckSplitDetectionInterval, "stuckSplitsDetectionInterval is null");
this.stuckSplitStackTracePredicate = requireNonNull(stuckSplitStackTracePredicate, "stuckSplitStackTracePredicate is null");
this.taskExecutor = requireNonNull(taskExecutor, "taskExecutor is null");
}

public Duration getStuckSplitsDetectionInterval()
{
return stuckSplitsDetectionInterval;
}

private void failStuckSplitTasks()
{
Set<TaskId> stuckSplitTaskIds = taskExecutor.getStuckSplitTaskIds(interruptStuckSplitTasksTimeout,
(RunningSplitInfo splitInfo) -> {
List<StackTraceElement> stackTraceElements = asList(splitInfo.getThread().getStackTrace());
if (!splitInfo.isPrinted()) {
splitInfo.setPrinted();
log.warn("%s is long running with stackTrace:\n%s", splitInfo.getSplitInfo(), stackTraceElements.stream().map(Object::toString).collect(joining(lineSeparator())));
}

return stuckSplitStackTracePredicate.test(stackTraceElements);
});

for (TaskId stuckSplitTaskId : stuckSplitTaskIds) {
failTask(stuckSplitTaskId, new TrinoException(GENERIC_USER_ERROR, format("Task %s is failed, due to containing long running stuck splits.", stuckSplitTaskId)));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public class TaskManagerConfig
private Duration statusRefreshMaxWait = new Duration(1, TimeUnit.SECONDS);
private Duration infoUpdateInterval = new Duration(3, TimeUnit.SECONDS);

private boolean interruptStuckSplitTasksEnabled = true;
private Duration interruptStuckSplitTasksWarningThreshold = new Duration(10, TimeUnit.MINUTES);
private Duration interruptStuckSplitTasksTimeout = new Duration(15, TimeUnit.MINUTES);
private Duration interruptStuckSplitTasksDetectionInterval = new Duration(2, TimeUnit.MINUTES);

private int writerCount = 1;
// cap task concurrency to 32 in order to avoid small pages produced by local partitioning exchanges
private int taskConcurrency = min(nextPowerOfTwo(getAvailablePhysicalProcessorCount()), 32);
Expand Down Expand Up @@ -463,4 +468,58 @@ public TaskManagerConfig setTaskYieldThreads(int taskYieldThreads)
this.taskYieldThreads = taskYieldThreads;
return this;
}

public boolean isInterruptStuckSplitTasksEnabled()
{
return interruptStuckSplitTasksEnabled;
}

@Config("task.interrupt-stuck-split-tasks-enabled")
public TaskManagerConfig setInterruptStuckSplitTasksEnabled(boolean interruptStuckSplitTasksEnabled)
{
this.interruptStuckSplitTasksEnabled = interruptStuckSplitTasksEnabled;
return this;
}

@MinDuration("1m")
public Duration getInterruptStuckSplitTasksWarningThreshold()
{
return interruptStuckSplitTasksWarningThreshold;
}

@Config("task.interrupt-stuck-split-tasks-warning-threshold")
@ConfigDescription("Print out call stacks and generate JMX metrics for splits running longer than the threshold")
public TaskManagerConfig setInterruptStuckSplitTasksWarningThreshold(Duration interruptStuckSplitTasksWarningThreshold)
{
this.interruptStuckSplitTasksWarningThreshold = interruptStuckSplitTasksWarningThreshold;
return this;
}

@MinDuration("3m")
public Duration getInterruptStuckSplitTasksTimeout()
{
return interruptStuckSplitTasksTimeout;
}

@Config("task.interrupt-stuck-split-tasks-timeout")
@ConfigDescription("Interrupt task processing thread after this timeout if the thread is stuck in certain external libraries used by Trino functions")
public TaskManagerConfig setInterruptStuckSplitTasksTimeout(Duration interruptStuckSplitTasksTimeout)
{
this.interruptStuckSplitTasksTimeout = interruptStuckSplitTasksTimeout;
return this;
}

@MinDuration("1m")
public Duration getInterruptStuckSplitTasksDetectionInterval()
{
return interruptStuckSplitTasksDetectionInterval;
}

@Config("task.interrupt-stuck-split-tasks-detection-interval")
@ConfigDescription("Interval between detecting stuck split")
public TaskManagerConfig setInterruptStuckSplitTasksDetectionInterval(Duration interruptStuckSplitTasksDetectionInterval)
{
this.interruptStuckSplitTasksDetectionInterval = interruptStuckSplitTasksDetectionInterval;
return this;
}
}
Loading

0 comments on commit 89bc6af

Please sign in to comment.