Skip to content

Commit

Permalink
Add extra information on memory pool leaks
Browse files Browse the repository at this point in the history
When we notice that at the there is memory leak from
DistributedQueryRunner after finishing up tests we print extra
information regarding memory pool and queries from the cluster, so it
is easier to pinpoint the problem in code.

Example output:
```
java.lang.AssertionError: Expected memory reservation on server_0(worker)to be 0 but was 5663866; detailed memory usage:
 20220304_183721_00001_2dtqj:
   SQL: SELECT count(*) FROM tpch.tiny.customer
   memoryReservation: 7428
   taggedMemoryReservaton: {AggregationOperator=408, ExchangeOperator=5184, LazyOutputBuffer=1836}
 20220304_183726_00029_2dtqj:
   SQL: SELECT TIMESTAMP '2017-01-02 09:12:34.123456789 Europe/Paris'
   memoryReservation: 113
   taggedMemoryReservaton: {LazyOutputBuffer=113}
```
  • Loading branch information
losipiuk committed Mar 7, 2022
1 parent 98babf1 commit 4e13342
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,13 @@ private synchronized void updateTaggedMemoryAllocations(QueryId queryId, String
}

@VisibleForTesting
synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations()
public synchronized Map<QueryId, Long> getQueryMemoryReservations()
{
return ImmutableMap.copyOf(queryMemoryReservations);
}

@VisibleForTesting
public synchronized Map<QueryId, Map<String, Long>> getTaggedMemoryAllocations()
{
return ImmutableMap.copyOf(taggedMemoryAllocations);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
package io.trino.testing;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.MoreCollectors;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.airlift.units.Duration;
import io.trino.Session;
import io.trino.execution.QueryStats;
import io.trino.execution.warnings.WarningCollector;
import io.trino.memory.LocalMemoryManager;
import io.trino.memory.MemoryPool;
import io.trino.metadata.QualifiedObjectName;
import io.trino.metadata.TableHandle;
import io.trino.metadata.TableMetadata;
import io.trino.operator.OperatorStats;
import io.trino.server.BasicQueryInfo;
import io.trino.server.DynamicFilterService.DynamicFiltersStats;
import io.trino.server.testing.TestingTrinoServer;
import io.trino.spi.QueryId;
Expand All @@ -50,6 +54,7 @@
import org.testng.annotations.Test;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.function.Consumer;
Expand All @@ -71,6 +76,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;

public abstract class AbstractTestQueryFramework
{
Expand Down Expand Up @@ -124,10 +130,7 @@ private void checkQueryMemoryReleased()
List<TestingTrinoServer> servers = distributedQueryRunner.getServers();
for (int serverId = 0; serverId < servers.size(); ++serverId) {
TestingTrinoServer server = servers.get(serverId);
String serverName = format("server_%d(%s)", serverId, server.isCoordinator() ? "coordinator" : "worker");
assertThat(server.getLocalMemoryManager().getMemoryPool().getReservedBytes())
.describedAs("memory reservation on " + serverName)
.isZero();
assertMemoryPoolReleased(distributedQueryRunner.getCoordinator(), server, serverId);
}

assertThat(distributedQueryRunner.getCoordinator().getClusterMemoryManager().getClusterTotalMemoryReservation())
Expand All @@ -136,6 +139,43 @@ private void checkQueryMemoryReleased()
});
}

private void assertMemoryPoolReleased(TestingTrinoServer coordinator, TestingTrinoServer server, long serverId)
{
String serverName = format("server_%d(%s)", serverId, server.isCoordinator() ? "coordinator" : "worker");
long reservedBytes = server.getLocalMemoryManager().getMemoryPool().getReservedBytes();

if (reservedBytes != 0) {
fail("Expected memory reservation on " + serverName + "to be 0 but was " + reservedBytes + "; detailed memory usage:\n" + describeMemoryPool(coordinator, server));
}
}

private String describeMemoryPool(TestingTrinoServer coordinator, TestingTrinoServer server)
{
LocalMemoryManager memoryManager = server.getLocalMemoryManager();
MemoryPool memoryPool = memoryManager.getMemoryPool();
Map<QueryId, Long> queryReservations = memoryPool.getQueryMemoryReservations();
Map<QueryId, Map<String, Long>> queryTaggedReservations = memoryPool.getTaggedMemoryAllocations();

List<BasicQueryInfo> queriesWithMemory = coordinator.getQueryManager().getQueries().stream()
.filter(query -> queryReservations.keySet().contains(query.getQueryId()))
.collect(toImmutableList());

StringBuilder result = new StringBuilder();
queriesWithMemory.forEach(queryInfo -> {
QueryId queryId = queryInfo.getQueryId();
String querySql = queryInfo.getQuery();
Long memoryReservation = queryReservations.getOrDefault(queryId, 0L);
Map<String, Long> taggedMemoryReservation = queryTaggedReservations.getOrDefault(queryId, ImmutableMap.of());

result.append(" " + queryId + ":\n");
result.append(" SQL: " + querySql + "\n");
result.append(" memoryReservation: " + memoryReservation + "\n");
result.append(" taggedMemoryReservaton: " + taggedMemoryReservation + "\n");
});

return result.toString();
}

@Test
public void ensureTestNamingConvention()
{
Expand Down

0 comments on commit 4e13342

Please sign in to comment.