Skip to content

Commit

Permalink
Report top memory consumers when local memory limit is exceeded
Browse files Browse the repository at this point in the history
Extracted-From: https://github.com/prestodb/presto
Original-Author: Nezih Yigitbasi <nezihy@fb.com>
  • Loading branch information
findepi committed Jan 29, 2019
1 parent 96cee95 commit 3fef5b6
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,16 @@ public static ExceededMemoryLimitException exceededGlobalTotalLimit(DataSize max
return new ExceededMemoryLimitException(EXCEEDED_GLOBAL_MEMORY_LIMIT, format("Query exceeded distributed total memory limit of %s", maxMemory));
}

public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize maxMemory, DataSize allocated, DataSize delta)
public static ExceededMemoryLimitException exceededLocalUserMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT, format(
"Query exceeded per-node user memory limit of %s when increasing allocation of %s by %s",
maxMemory,
allocated,
delta));
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-node user memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSize maxMemory, DataSize allocated, DataSize delta)
public static ExceededMemoryLimitException exceededLocalTotalMemoryLimit(DataSize maxMemory, String additionalFailureInfo)
{
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT, format(
"Query exceeded per-node total memory limit of %s when increasing allocation of %s by %s",
maxMemory,
allocated,
delta));
return new ExceededMemoryLimitException(EXCEEDED_LOCAL_MEMORY_LIMIT,
format("Query exceeded per-node total memory limit of %s [%s]", maxMemory, additionalFailureInfo));
}

private ExceededMemoryLimitException(StandardErrorCode errorCode, String message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -40,6 +41,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.succinctBytes;
Expand All @@ -48,6 +50,9 @@
import static io.prestosql.ExceededSpillLimitException.exceededPerQueryLocalLimit;
import static io.prestosql.memory.context.AggregatedMemoryContext.newRootAggregatedMemoryContext;
import static io.prestosql.operator.Operator.NOT_BLOCKED;
import static java.lang.String.format;
import static java.util.Comparator.reverseOrder;
import static java.util.Map.Entry.comparingByValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -320,17 +325,40 @@ private boolean tryReserveMemoryNotSupported(String allocationTag, long bytes)
throw new UnsupportedOperationException("tryReserveMemory is not supported");
}

private static void enforceUserMemoryLimit(long allocated, long delta, long maxMemory)
@GuardedBy("this")
private void enforceUserMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), succinctBytes(allocated), succinctBytes(delta));
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
}
}

private static void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory)
@GuardedBy("this")
private void enforceTotalMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), succinctBytes(allocated), succinctBytes(delta));
throw exceededLocalTotalMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
}
}

@GuardedBy("this")
private String getAdditionalFailureInfo(long allocated, long delta)
{
Map<String, Long> queryAllocations = memoryPool.getTaggedMemoryAllocations().get(queryId);

String additionalInfo = format("Allocated: %s, Delta: %s", succinctBytes(allocated), succinctBytes(delta));

// It's possible that a query tries allocating more than the available memory
// failing immediately before any allocation of that query is tagged
if (queryAllocations == null) {
return additionalInfo;
}

Map<String, DataSize> topConsumers = queryAllocations.entrySet().stream()
.sorted(comparingByValue(reverseOrder()))
.limit(3)
.collect(toImmutableMap(Entry::getKey, e -> succinctBytes(e.getValue())));

return format("%s, Top Consumers: %s", additionalInfo, topConsumers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
Expand All @@ -39,13 +40,17 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.airlift.units.DataSize.succinctBytes;
import static io.prestosql.ExceededMemoryLimitException.exceededLocalUserMemoryLimit;
import static io.prestosql.ExceededSpillLimitException.exceededPerQueryLocalLimit;
import static io.prestosql.memory.context.AggregatedMemoryContext.newRootAggregatedMemoryContext;
import static io.prestosql.operator.Operator.NOT_BLOCKED;
import static java.lang.String.format;
import static java.util.Comparator.reverseOrder;
import static java.util.Map.Entry.comparingByValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -290,10 +295,32 @@ private boolean tryReserveMemoryNotSupported(String allocationTag, long bytes)
throw new UnsupportedOperationException("tryReserveMemory is not supported");
}

private static void enforceUserMemoryLimit(long allocated, long delta, long maxMemory)
@GuardedBy("this")
private void enforceUserMemoryLimit(long allocated, long delta, long maxMemory)
{
if (allocated + delta > maxMemory) {
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), succinctBytes(allocated), succinctBytes(delta));
throw exceededLocalUserMemoryLimit(succinctBytes(maxMemory), getAdditionalFailureInfo(allocated, delta));
}
}

@GuardedBy("this")
private String getAdditionalFailureInfo(long allocated, long delta)
{
Map<String, Long> queryAllocations = memoryPool.getTaggedMemoryAllocations().get(queryId);

String additionalInfo = format("Allocated: %s, Delta: %s", succinctBytes(allocated), succinctBytes(delta));

// It's possible that a query tries allocating more than the available memory
// failing immediately before any allocation of that query is tagged
if (queryAllocations == null) {
return additionalInfo;
}

Map<String, DataSize> topConsumers = queryAllocations.entrySet().stream()
.sorted(comparingByValue(reverseOrder()))
.limit(3)
.collect(toImmutableMap(Entry::getKey, e -> succinctBytes(e.getValue())));

return format("%s, Top Consumers: %s", additionalInfo, topConsumers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public void testLocalTotalMemoryLimitExceeded()
fail("allocation should hit the per-node total memory limit");
}
catch (ExceededMemoryLimitException e) {
assertEquals(e.getMessage(), format("Query exceeded per-node total memory limit of %1$s when increasing allocation of %1$s by 1B", queryMaxTotalMemory));
assertEquals(e.getMessage(), format("Query exceeded per-node total memory limit of %1$s [Allocated: %1$s, Delta: 1B, Top Consumers: {test=%1$s}]", queryMaxTotalMemory));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void testHashAggregationMemoryReservation(boolean hashEnabled, boolean sp
assertEquals(operator.getOperatorContext().getOperatorStats().getUserMemoryReservation().toBytes(), 0);
}

@Test(dataProvider = "hashEnabled", expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B when increasing allocation of 0B by .*")
@Test(dataProvider = "hashEnabled", expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B.*")
public void testMemoryLimit(boolean hashEnabled)
{
MetadataManager metadata = MetadataManager.createTestMetadataManager();
Expand Down Expand Up @@ -425,7 +425,7 @@ public void testMemoryReservationYield(Type type)
assertEquals(count, 6_000 * 600);
}

@Test(dataProvider = "hashEnabled", expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 3MB when increasing allocation of 0B by .*")
@Test(dataProvider = "hashEnabled", expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 3MB.*")
public void testHashBuilderResizeLimit(boolean hashEnabled)
{
BlockBuilder builder = VARCHAR.createBlockBuilder(null, 1, MAX_BLOCK_SIZE_IN_BYTES);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public void testReverseOrder()
assertOperatorEquals(operatorFactory, driverContext, input, expected);
}

@Test(expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B when increasing allocation of 0B by .*")
@Test(expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B.*")
public void testMemoryLimit()
{
List<Page> input = rowPagesBuilder(BIGINT, DOUBLE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void testRowNumberArbitrary()
assertOperatorEquals(operatorFactory, driverContext, input, expected);
}

@Test(expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B when increasing allocation of 0B by .*")
@Test(expectedExceptions = ExceededMemoryLimitException.class, expectedExceptionsMessageRegExp = "Query exceeded per-node user memory limit of 10B.*")
public void testMemoryLimit()
{
List<Page> input = rowPagesBuilder(BIGINT, DOUBLE)
Expand Down

0 comments on commit 3fef5b6

Please sign in to comment.