Skip to content

Commit

Permalink
Prefer killing tasks in TOTAL_RESERVATION_ON_BLOCKED_NODES OOM killer
Browse files Browse the repository at this point in the history
  • Loading branch information
losipiuk committed Mar 8, 2022
1 parent 507724e commit 14f01f2
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,59 @@

package io.trino.memory;

import com.google.common.collect.ImmutableSet;
import io.trino.TaskMemoryInfo;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static java.util.Comparator.comparing;
import static java.util.Comparator.comparingLong;

public class TotalReservationOnBlockedNodesLowMemoryKiller
implements LowMemoryKiller
{
@Override
public Optional<KillTarget> chooseQueryToKill(List<QueryMemoryInfo> runningQueries, List<MemoryInfo> nodes)
{
Optional<KillTarget> killTarget = chooseTasksToKill(nodes);
if (killTarget.isEmpty()) {
killTarget = chooseWholeQueryToKill(nodes);
}
return killTarget;
}

private Optional<KillTarget> chooseTasksToKill(List<MemoryInfo> nodes)
{
ImmutableSet.Builder<TaskId> tasksToKillBuilder = ImmutableSet.builder();
for (MemoryInfo node : nodes) {
MemoryPoolInfo memoryPool = node.getPool();
if (memoryPool == null) {
continue;
}
if (memoryPool.getFreeBytes() + memoryPool.getReservedRevocableBytes() > 0) {
continue;
}

node.getTasksMemoryInfo().values().stream()
.max(comparing(TaskMemoryInfo::getMemoryReservation))
.map(TaskMemoryInfo::getTaskId)
.ifPresent(tasksToKillBuilder::add);
}
Set<TaskId> tasksToKill = tasksToKillBuilder.build();
if (tasksToKill.isEmpty()) {
return Optional.empty();
}
return Optional.of(KillTarget.selectedTasks(tasksToKill));
}

private Optional<KillTarget> chooseWholeQueryToKill(List<MemoryInfo> nodes)
{
Map<QueryId, Long> memoryReservationOnBlockedNodes = new HashMap<>();
for (MemoryInfo node : nodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
package io.trino.memory;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ListMultimap;
import io.trino.TaskMemoryInfo;
import io.trino.client.NodeVersion;
import io.trino.execution.TaskId;
import io.trino.metadata.InternalNode;
import io.trino.spi.QueryId;
import io.trino.spi.memory.MemoryPoolInfo;
Expand All @@ -31,6 +35,11 @@ public final class LowMemoryKillerTestingUtils
private LowMemoryKillerTestingUtils() {}

static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String, Map<String, Long>> queries)
{
return toNodeMemoryInfoList(memoryPoolMaxBytes, queries, ImmutableMap.of());
}

static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String, Map<String, Long>> queries, Map<String, Map<String, Map<String, Long>>> tasks)
{
Map<InternalNode, NodeReservation> nodeReservations = new HashMap<>();

Expand Down Expand Up @@ -58,7 +67,27 @@ static List<MemoryInfo> toNodeMemoryInfoList(long memoryPoolMaxBytes, Map<String
nodeReservation.getReservationByQuery(),
ImmutableMap.of(),
ImmutableMap.of());
result.add(new MemoryInfo(7, memoryPoolInfo));
result.add(new MemoryInfo(7, memoryPoolInfo, tasksMemoryInfoForNode(entry.getKey().getNodeIdentifier(), tasks)));
}
return result.build();
}

private static ListMultimap<QueryId, TaskMemoryInfo> tasksMemoryInfoForNode(String nodeIdentifier, Map<String, Map<String, Map<String, Long>>> tasks)
{
ImmutableListMultimap.Builder<QueryId, TaskMemoryInfo> result = ImmutableListMultimap.builder();
for (Map.Entry<String, Map<String, Map<String, Long>>> queryNodesEntry : tasks.entrySet()) {
QueryId query = QueryId.valueOf(queryNodesEntry.getKey());
for (Map.Entry<String, Map<String, Long>> nodeTasksEntry : queryNodesEntry.getValue().entrySet()) {
if (!nodeIdentifier.equals(nodeTasksEntry.getKey())) {
continue;
}

for (Map.Entry<String, Long> taskReservationEntry : nodeTasksEntry.getValue().entrySet()) {
TaskId taskId = TaskId.valueOf(taskReservationEntry.getKey());
long taskReservation = taskReservationEntry.getValue();
result.put(query, new TaskMemoryInfo(taskId, taskReservation));
}
}
}
return result.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package io.trino.memory;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.execution.TaskId;
import io.trino.spi.QueryId;
import org.testng.annotations.Test;

Expand All @@ -36,6 +38,7 @@ public void testMemoryPoolHasNoReservation()
Map<String, Map<String, Long>> queries = ImmutableMap.<String, Map<String, Long>>builder()
.put("q_1", ImmutableMap.of("n1", 0L, "n2", 0L, "n3", 0L, "n4", 0L, "n5", 0L))
.buildOrThrow();

assertEquals(
lowMemoryKiller.chooseQueryToKill(
toQueryMemoryInfoList(queries),
Expand Down Expand Up @@ -75,4 +78,58 @@ public void testSkewedQuery()
toNodeMemoryInfoList(memoryPool, queries)),
Optional.of(KillTarget.wholeQuery(new QueryId("q_1"))));
}

@Test
public void testPreferKillingTasks()
{
int memoryPool = 12;
Map<String, Map<String, Long>> queries = ImmutableMap.<String, Map<String, Long>>builder()
.put("q_1", ImmutableMap.of("n1", 0L, "n2", 8L, "n3", 0L, "n4", 0L, "n5", 0L))
.put("q_2", ImmutableMap.of("n1", 3L, "n2", 5L, "n3", 2L, "n4", 4L, "n5", 0L))
.put("q_3", ImmutableMap.of("n1", 0L, "n2", 0L, "n3", 11L, "n4", 0L, "n5", 0L))
.buildOrThrow();

Map<String, Map<String, Map<String, Long>>> tasks = ImmutableMap.<String, Map<String, Map<String, Long>>>builder()
.put("q_2", ImmutableMap.of(
"n1", ImmutableMap.of("t1", 1L, "t2", 3L),
"n2", ImmutableMap.of("t3", 3L, "t4", 1L, "t5", 1L),
"n3", ImmutableMap.of("t6", 2L),
"n4", ImmutableMap.of("t7", 2L, "t8", 2L),
"n5", ImmutableMap.of())
).buildOrThrow();

assertEquals(
lowMemoryKiller.chooseQueryToKill(
toQueryMemoryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.of(KillTarget.selectedTasks(ImmutableSet.of(TaskId.valueOf("t3"), TaskId.valueOf("t6")))));
}

@Test
public void testKillsBiggestTasks()
{
int memoryPool = 12;
Map<String, Map<String, Long>> queries = ImmutableMap.<String, Map<String, Long>>builder()
.put("q_1", ImmutableMap.of("n1", 0L, "n2", 8L, "n3", 0L, "n4", 0L, "n5", 0L))
.put("q_2", ImmutableMap.of("n1", 3L, "n2", 5L, "n3", 2L, "n4", 4L, "n5", 0L))
.put("q_3", ImmutableMap.of("n1", 0L, "n2", 0L, "n3", 11L, "n4", 0L, "n5", 0L))
.buildOrThrow();

Map<String, Map<String, Map<String, Long>>> tasks = ImmutableMap.<String, Map<String, Map<String, Long>>>builder()
.put("q_1", ImmutableMap.of(
"n2", ImmutableMap.of("t1_1", 8L)))
.put("q_2", ImmutableMap.of(
"n1", ImmutableMap.of("t2_1", 1L, "t2_2", 3L),
"n2", ImmutableMap.of("t2_3", 3L, "t2_4", 1L, "t2_5", 1L),
"n3", ImmutableMap.of("t2_6", 2L),
"n4", ImmutableMap.of("t2_7", 2L, "t2_8", 2L),
"n5", ImmutableMap.of()))
.buildOrThrow();

assertEquals(
lowMemoryKiller.chooseQueryToKill(
toQueryMemoryInfoList(queries),
toNodeMemoryInfoList(memoryPool, queries, tasks)),
Optional.of(KillTarget.selectedTasks(ImmutableSet.of(TaskId.valueOf("t1_1"), TaskId.valueOf("t2_6")))));
}
}

0 comments on commit 14f01f2

Please sign in to comment.