Skip to content

Commit

Permalink
Drop task allocations informatin from MemoryInfo
Browse files Browse the repository at this point in the history
The information is now present as part of MemoryPoolInfo which is
wrapped by MemoryInfo
  • Loading branch information
losipiuk committed Apr 9, 2022
1 parent 0fe06cf commit 1d430f9
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ private synchronized void callOomKiller(Iterable<QueryExecution> runningQueries)
}

@GuardedBy("this")
private boolean isLastKillTargetGone(Iterable<QueryExecution> runningQueries)
private boolean isLastKillTargetGone()
{
if (lastKillTarget.isEmpty()) {
return true;
Expand Down Expand Up @@ -394,7 +394,7 @@ private String formatKillScenario(Map<String, MemoryInfo> nodes)
stringBuilder.append("Queries ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getQueryMemoryReservations()).append((' '));
stringBuilder.append("Tasks ");
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, nodeMemoryInfo.getTasksMemoryInfo().asMap());
Joiner.on(",").withKeyValueSeparator("=").appendTo(stringBuilder, memoryPoolInfo.getTaskMemoryReservations());
stringBuilder.append('\n');
}
return stringBuilder.toString();
Expand Down
32 changes: 0 additions & 32 deletions core/trino-main/src/main/java/io/trino/memory/LowMemoryKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

package io.trino.memory;

import io.trino.execution.TaskId;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;

Expand Down Expand Up @@ -65,36 +64,5 @@ public String toString()
.add("retryPolicy", retryPolicy)
.toString();
}

public static class TaskMemoryInfo
{
private final TaskId taskId;
private final long memoryReservation;

public TaskMemoryInfo(TaskId taskId, long memoryReservation)
{
this.taskId = requireNonNull(taskId, "taskId is null");
this.memoryReservation = memoryReservation;
}

public TaskId getTaskId()
{
return taskId;
}

public long getMemoryReservation()
{
return memoryReservation;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("taskId", taskId)
.add("memoryReservation", memoryReservation)
.toString();
}
}
}
}
28 changes: 1 addition & 27 deletions core/trino-main/src/main/java/io/trino/memory/MemoryInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ListMultimap;
import io.trino.TaskMemoryInfo;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;

import static com.google.common.base.MoreObjects.toStringHelper;
Expand All @@ -28,24 +24,14 @@ public class MemoryInfo
{
private final int availableProcessors;
private final MemoryPoolInfo pool;
private final ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo;

public MemoryInfo(
int availableProcessors,
MemoryPoolInfo pool)
{
this(availableProcessors, pool, ImmutableListMultimap.of());
}

@JsonCreator
public MemoryInfo(
@JsonProperty("availableProcessors") int availableProcessors,
@JsonProperty("pool") MemoryPoolInfo pool,
@JsonProperty("tasksMemoryInfo") ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo)
@JsonProperty("pool") MemoryPoolInfo pool)
{
this.availableProcessors = availableProcessors;
this.pool = requireNonNull(pool, "pool is null");
this.tasksMemoryInfo = ImmutableListMultimap.copyOf(requireNonNull(tasksMemoryInfo, "tasksMemoryInfo is null"));
}

@JsonProperty
Expand All @@ -60,24 +46,12 @@ public MemoryPoolInfo getPool()
return pool;
}

@JsonProperty
public ListMultimap<QueryId, TaskMemoryInfo> getTasksMemoryInfo()
{
return tasksMemoryInfo;
}

@Override
public String toString()
{
return toStringHelper(this)
.add("availableProcessors", availableProcessors)
.add("pool", pool)
.add("tasksMemoryInfo", tasksMemoryInfo)
.toString();
}

public MemoryInfo withTasksMemoryInfo(ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfo)
{
return new MemoryInfo(availableProcessors, pool, tasksMemoryInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,15 @@
*/
package io.trino.memory;

import com.google.common.collect.ListMultimap;
import io.trino.TaskMemoryInfo;
import io.trino.execution.SqlTask;
import io.trino.execution.SqlTaskManager;
import io.trino.server.security.ResourceSecurity;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;

import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import java.util.List;

import static com.google.common.collect.ImmutableListMultimap.toImmutableListMultimap;
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.operator.RetryPolicy.TASK;
import static io.trino.server.security.ResourceSecurity.AccessType.INTERNAL_ONLY;
import static java.util.Objects.requireNonNull;

Expand All @@ -43,46 +32,18 @@
public class MemoryResource
{
private final LocalMemoryManager memoryManager;
private final SqlTaskManager taskManager;

@Inject
public MemoryResource(LocalMemoryManager memoryManager, SqlTaskManager taskManager)
{
this.memoryManager = requireNonNull(memoryManager, "memoryManager is null");
this.taskManager = requireNonNull(taskManager, "taskManager is null");
}

@ResourceSecurity(INTERNAL_ONLY)
@GET
@Produces(MediaType.APPLICATION_JSON)
public MemoryInfo getMemoryInfo()
{
return memoryManager.getInfo().withTasksMemoryInfo(buildTasksMemoryInfo());
}

private ListMultimap<QueryId, TaskMemoryInfo> buildTasksMemoryInfo()
{
List<SqlTask> tasks = taskManager.getAllTasks();
return tasks.stream()
.filter(task -> !task.getTaskState().isDone())
// we are providing task memory information only for queries which are run with task-level retries.
// task memory information is consumed by low memory killer and it does not make sense to kill individual tasks
// for queries which does not allow task retries.
.filter(task -> task.getTaskContext().map(context -> getRetryPolicy(context.getSession()) == TASK).orElse(false))
.collect(toImmutableListMultimap(
task -> task.getTaskId().getQueryId(),
task -> new TaskMemoryInfo(
task.getTaskId(),
task.getTaskContext()
.map(taskContext -> taskContext.getMemoryReservation().toBytes())
// task context may no longer be available if task completes
.orElse(0L))));
}

private Response toSuccessfulResponse(MemoryPoolInfo memoryInfo)
{
return Response.ok()
.entity(memoryInfo)
.build();
return memoryManager.getInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import io.trino.TaskMemoryInfo;
import io.trino.execution.TaskId;
import io.trino.operator.RetryPolicy;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;

import java.util.AbstractMap.SimpleEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Comparator.comparing;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Comparator.comparingLong;

public class TotalReservationOnBlockedNodesLowMemoryKiller
Expand All @@ -37,15 +37,24 @@ public class TotalReservationOnBlockedNodesLowMemoryKiller
@Override
public Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
Optional<KillTarget> killTarget = chooseTasksToKill(nodes);
Optional<KillTarget> killTarget = chooseTasksToKill(runningQueries, nodes);
if (killTarget.isEmpty()) {
killTarget = chooseWholeQueryToKill(runningQueries, nodes);
}
return killTarget;
}

private Optional<KillTarget> chooseTasksToKill(List<MemoryInfo> nodes)
private Optional<KillTarget> chooseTasksToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
Set<QueryId> queriesWithTaskRetryPolicy = runningQueries.stream()
.filter(query -> query.getRetryPolicy() == RetryPolicy.TASK)
.map(QueryMemoryInfo::getQueryId)
.collect(toImmutableSet());

if (queriesWithTaskRetryPolicy.isEmpty()) {
return Optional.empty();
}

ImmutableSet.Builder<TaskId> tasksToKillBuilder = ImmutableSet.builder();
for (MemoryInfo node : nodes) {
MemoryPoolInfo memoryPool = node.getPool();
Expand All @@ -56,9 +65,12 @@ private Optional<KillTarget> chooseTasksToKill(List<MemoryInfo> nodes)
continue;
}

node.getTasksMemoryInfo().values().stream()
.max(comparing(TaskMemoryInfo::getMemoryReservation))
.map(TaskMemoryInfo::getTaskId)
memoryPool.getTaskMemoryReservations().entrySet().stream()
// consider only tasks from queries with task retries enabled
.map(entry -> new SimpleEntry<>(TaskId.valueOf(entry.getKey()), entry.getValue()))
.filter(entry -> queriesWithTaskRetryPolicy.contains(entry.getKey().getQueryId()))
.max(Map.Entry.comparingByValue())
.map(SimpleEntry::getKey)
.ifPresent(tasksToKillBuilder::add);
}
Set<TaskId> tasksToKill = tasksToKillBuilder.build();
Expand All @@ -82,7 +94,8 @@ private Optional<KillTarget> chooseWholeQueryToKill(List<QueryMemoryInfo> runnin
}
Map<QueryId, Long> queryMemoryReservations = memoryPool.getQueryMemoryReservations();
queryMemoryReservations.forEach((queryId, memoryReservation) -> {
if (queriesById.containsKey(queryId) && queriesById.get(queryId).getRetryPolicy() == RetryPolicy.TASK) {
QueryMemoryInfo queryMemoryInfo = queriesById.get(queryId);
if (queryMemoryInfo != null && queryMemoryInfo.getRetryPolicy() == RetryPolicy.TASK) {
// Do not kill whole queries which run with task retries enabled
// Most of the time if query with task retries enabled is a root cause of cluster out-of-memory error
// individual tasks should be already picked for killing by `chooseTasksToKill`. Yet sometimes there is a discrepancy between
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
package io.trino.memory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import io.trino.TaskMemoryInfo;
import io.trino.client.NodeVersion;
import io.trino.execution.StageId;
import io.trino.execution.TaskId;
import io.trino.metadata.InternalNode;
import io.trino.operator.RetryPolicy;
Expand All @@ -42,7 +40,7 @@ static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String
return toNodeMemoryInfoList(memoryPoolMaxBytes, queries, ImmutableMap.of());
}

static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String, Map<String, Long>> queries, Map<String, Map<String, Map<String, Long>>> tasks)
static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String, Map<String, Long>> queries, Map<String, Map<String, Map<Integer, Long>>> tasks)
{
Map<InternalNode, NodeReservation> nodeReservations = new HashMap<>();

Expand Down Expand Up @@ -70,31 +68,33 @@ static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String
nodeReservation.getReservationByQuery(),
ImmutableMap.of(),
ImmutableMap.of(),
ImmutableMap.of(),
tasksMemoryInfoForNode(entry.getKey().getNodeIdentifier(), tasks),
ImmutableMap.of());
result.add(new MemoryInfo(7, memoryPoolInfo, tasksMemoryInfoForNode(entry.getKey().getNodeIdentifier(), tasks)));
result.add(new MemoryInfo(7, memoryPoolInfo));
}
return result.build();
}

private static ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfoForNode(String nodeIdentifier, Map<String, Map<String, Map<String, Long>>> tasks)
private static Map<String, Long> tasksMemoryInfoForNode(String nodeIdentifier, Map<String, Map<String, Map<Integer, Long>>> tasks)
{
ImmutableListMultimap.Builder<QueryId, TaskMemoryInfo> result = ImmutableListMultimap.builder();
for (Map.Entry<String, Map<String, Map<String, Long>>> queryNodesEntry : tasks.entrySet()) {
QueryId query = QueryId.valueOf(queryNodesEntry.getKey());
for (Map.Entry<String, Map<String, Long>> nodeTasksEntry : queryNodesEntry.getValue().entrySet()) {
ImmutableMap.Builder<String, Long> result = ImmutableMap.builder();
for (Map.Entry<String, Map<String, Map<Integer, Long>>> queryNodesEntry : tasks.entrySet()) {
for (Map.Entry<String, Map<Integer, Long>> nodeTasksEntry : queryNodesEntry.getValue().entrySet()) {
if (!nodeIdentifier.equals(nodeTasksEntry.getKey())) {
continue;
}

for (Map.Entry<String, Long> taskReservationEntry : nodeTasksEntry.getValue().entrySet()) {
TaskId taskId = TaskId.valueOf(taskReservationEntry.getKey());
long taskReservation = taskReservationEntry.getValue();
result.put(query, new TaskMemoryInfo(taskId, taskReservation));
for (Map.Entry<Integer, Long> partitionReservationEntry : nodeTasksEntry.getValue().entrySet()) {
result.put(taskId(queryNodesEntry.getKey(), partitionReservationEntry.getKey()).toString(), partitionReservationEntry.getValue());
}
}
}
return result.build();
return result.buildOrThrow();
}

static TaskId taskId(String query, int partition)
{
return new TaskId(new StageId(QueryId.valueOf(query), 0), partition, 0);
}

static List<LowMemoryKiller.QueryMemoryInfo> toQueryMemoryInfoList(Map<String, Map<String, Long>> queries)
Expand Down
Loading

0 comments on commit 1d430f9

Please sign in to comment.