From 6c5eb59250b0db0ba880c7d9d9cf7d296b1530dd Mon Sep 17 00:00:00 2001 From: skrzypo987 Date: Thu, 28 Apr 2022 14:59:15 +0300 Subject: [PATCH] Change temporary data storage in MultiChannelGroupByHash Temporary output data used to be kept in a list of variable-size pages. That required using a two-level address to point an exact row. Since the hash table can only store 2^30 positions, the number of groups can be stored in a 4-byte int variable, given that the page size is fixed. This way we can get rid of some data structures used to map between group id and output row position skipping some calculations and reducing memory footprint from 8+8+4+1 bytes per hash bucket to 4+1 bytes which improves memory locality. Since the memory footprint of MultiChannelGroupByHash is now smaller, tests using testMemoryReservationYield needed to change because we yield less often due to QueryContext.GUARANTEED_MEMORY threshold and reserve less memory. --- .../operator/MultiChannelGroupByHash.java | 95 ++++++++----------- .../operator/GroupByHashYieldAssertion.java | 20 +++- .../operator/TestDistinctLimitOperator.java | 8 +- .../operator/TestHashAggregationOperator.java | 7 +- .../operator/TestHashSemiJoinOperator.java | 7 +- .../operator/TestMarkDistinctOperator.java | 8 +- .../trino/operator/TestRowNumberOperator.java | 8 +- .../operator/TestTopNRankingOperator.java | 2 +- .../io/trino/memory/TestMemoryManager.java | 3 +- 9 files changed, 77 insertions(+), 81 deletions(-) diff --git a/core/trino-main/src/main/java/io/trino/operator/MultiChannelGroupByHash.java b/core/trino-main/src/main/java/io/trino/operator/MultiChannelGroupByHash.java index 0d9cda0bedca..b6a437d1b675 100644 --- a/core/trino-main/src/main/java/io/trino/operator/MultiChannelGroupByHash.java +++ b/core/trino-main/src/main/java/io/trino/operator/MultiChannelGroupByHash.java @@ -16,7 +16,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import io.trino.array.LongBigArray; import io.trino.spi.Page; import io.trino.spi.PageBuilder; import io.trino.spi.TrinoException; @@ -41,8 +40,6 @@ import static com.google.common.base.Preconditions.checkState; import static com.google.common.base.Verify.verify; import static io.airlift.slice.SizeOf.sizeOf; -import static io.trino.operator.SyntheticAddress.decodePosition; -import static io.trino.operator.SyntheticAddress.decodeSliceIndex; import static io.trino.operator.SyntheticAddress.encodeSyntheticAddress; import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static io.trino.spi.type.BigintType.BIGINT; @@ -62,6 +59,9 @@ public class MultiChannelGroupByHash private static final float FILL_RATIO = 0.75f; // Max (page value count / cumulative dictionary size) to trigger the low cardinality case private static final double SMALL_DICTIONARIES_MAX_CARDINALITY_RATIO = .25; + private static final int VALUES_PAGE_BITS = 14; // 16k positions + private static final int VALUES_PAGE_MAX_ROW_COUNT = 1 << VALUES_PAGE_BITS; + private static final int VALUES_PAGE_MASK = VALUES_PAGE_MAX_ROW_COUNT - 1; private final List types; private final List hashTypes; @@ -80,12 +80,11 @@ public class MultiChannelGroupByHash private int hashCapacity; private int maxFill; private int mask; - private long[] groupAddressByHash; + // Group ids are assigned incrementally. Therefore, since values page size is constant and power of two, + // the group id is also an address (slice index and position within slice) to group row in channelBuilders. private int[] groupIdsByHash; private byte[] rawHashByHashPosition; - private final LongBigArray groupAddressByGroupId; - private int nextGroupId; private DictionaryLookBack dictionaryLookBack; private long hashCollisions; @@ -147,15 +146,9 @@ public MultiChannelGroupByHash( maxFill = calculateMaxFill(hashCapacity); mask = hashCapacity - 1; - groupAddressByHash = new long[hashCapacity]; - Arrays.fill(groupAddressByHash, -1); - rawHashByHashPosition = new byte[hashCapacity]; - groupIdsByHash = new int[hashCapacity]; - - groupAddressByGroupId = new LongBigArray(); - groupAddressByGroupId.ensureCapacity(maxFill); + Arrays.fill(groupIdsByHash, -1); // This interface is used for actively reserving memory (push model) for rehash. // The caller can also query memory usage on this object (pull model) @@ -165,9 +158,8 @@ public MultiChannelGroupByHash( @Override public long getRawHash(int groupId) { - long address = groupAddressByGroupId.get(groupId); - int blockIndex = decodeSliceIndex(address); - int position = decodePosition(address); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int position = groupId & VALUES_PAGE_MASK; return hashStrategy.hashPosition(blockIndex, position); } @@ -178,9 +170,7 @@ public long getEstimatedSize() (sizeOf(channelBuilders.get(0).elements()) * channelBuilders.size()) + completedPagesMemorySize + currentPageBuilder.getRetainedSizeInBytes() + - sizeOf(groupAddressByHash) + sizeOf(groupIdsByHash) + - groupAddressByGroupId.sizeOf() + sizeOf(rawHashByHashPosition) + preallocatedMemoryInBytes; } @@ -212,9 +202,8 @@ public int getGroupCount() @Override public void appendValuesTo(int groupId, PageBuilder pageBuilder) { - long address = groupAddressByGroupId.get(groupId); - int blockIndex = decodeSliceIndex(address); - int position = decodePosition(address); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int position = groupId & VALUES_PAGE_MASK; hashStrategy.appendTo(blockIndex, position, pageBuilder, 0); } @@ -265,8 +254,8 @@ public boolean contains(int position, Page page, int[] hashChannels, long rawHas int hashPosition = getHashPosition(rawHash, mask); // look for a slot containing this key - while (groupAddressByHash[hashPosition] != -1) { - if (positionNotDistinctFromCurrentRow(groupAddressByHash[hashPosition], hashPosition, position, page, (byte) rawHash, hashChannels)) { + while (groupIdsByHash[hashPosition] != -1) { + if (positionNotDistinctFromCurrentRow(groupIdsByHash[hashPosition], hashPosition, position, page, (byte) rawHash, hashChannels)) { // found an existing slot for this key return true; } @@ -296,8 +285,8 @@ private int putIfAbsent(int position, Page page, long rawHash) // look for an empty slot or a slot containing this key int groupId = -1; - while (groupAddressByHash[hashPosition] != -1) { - if (positionNotDistinctFromCurrentRow(groupAddressByHash[hashPosition], hashPosition, position, page, (byte) rawHash, channels)) { + while (groupIdsByHash[hashPosition] != -1) { + if (positionNotDistinctFromCurrentRow(groupIdsByHash[hashPosition], hashPosition, position, page, (byte) rawHash, channels)) { // found an existing slot for this key groupId = groupIdsByHash[hashPosition]; @@ -336,13 +325,11 @@ private int addNewGroup(int hashPosition, int position, Page page, long rawHash) // record group id in hash int groupId = nextGroupId++; - groupAddressByHash[hashPosition] = address; rawHashByHashPosition[hashPosition] = (byte) rawHash; groupIdsByHash[hashPosition] = groupId; - groupAddressByGroupId.set(groupId, address); // create new page builder if this page is full - if (currentPageBuilder.isFull()) { + if (currentPageBuilder.getPositionCount() == VALUES_PAGE_MAX_ROW_COUNT) { startNewPage(); } @@ -362,6 +349,7 @@ private void startNewPage() { if (currentPageBuilder != null) { completedPagesMemorySize += currentPageBuilder.getRetainedSizeInBytes(); + // TODO: (https://github.com/trinodb/trino/issues/12484) pre-size new PageBuilder to OUTPUT_PAGE_SIZE currentPageBuilder = currentPageBuilder.newPageBuilderLike(); } else { @@ -382,10 +370,9 @@ private boolean tryRehash() int newCapacity = toIntExact(newCapacityLong); // An estimate of how much extra memory is needed before we can go ahead and expand the hash table. - // This includes the new capacity for groupAddressByHash, rawHashByHashPosition, groupIdsByHash, and groupAddressByGroupId as well as the size of the current page - preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Long.BYTES + Integer.BYTES + Byte.BYTES) + - (long) (calculateMaxFill(newCapacity) - maxFill) * Long.BYTES + - currentPageSizeInBytes; + // This includes the new capacity for rawHashByHashPosition, groupIdsByHash as well as the size of the current page + preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Integer.BYTES + Byte.BYTES) + + currentPageSizeInBytes; if (!updateMemory.update()) { // reserved memory but has exceeded the limit return false; @@ -395,54 +382,46 @@ private boolean tryRehash() expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity); int newMask = newCapacity - 1; - long[] newKey = new long[newCapacity]; byte[] rawHashes = new byte[newCapacity]; - Arrays.fill(newKey, -1); - int[] newValue = new int[newCapacity]; + int[] newGroupIdByHash = new int[newCapacity]; + Arrays.fill(newGroupIdByHash, -1); - int oldIndex = 0; - for (int groupId = 0; groupId < nextGroupId; groupId++) { + for (int i = 0; i < hashCapacity; i++) { // seek to the next used slot - while (groupAddressByHash[oldIndex] == -1) { - oldIndex++; + int groupId = groupIdsByHash[i]; + if (groupId == -1) { + continue; } - // get the address for this slot - long address = groupAddressByHash[oldIndex]; - - long rawHash = hashPosition(address); + long rawHash = hashPosition(groupId); // find an empty slot for the address int pos = getHashPosition(rawHash, newMask); - while (newKey[pos] != -1) { + while (newGroupIdByHash[pos] != -1) { pos = (pos + 1) & newMask; hashCollisions++; } // record the mapping - newKey[pos] = address; rawHashes[pos] = (byte) rawHash; - newValue[pos] = groupIdsByHash[oldIndex]; - oldIndex++; + newGroupIdByHash[pos] = groupId; } this.mask = newMask; this.hashCapacity = newCapacity; this.maxFill = calculateMaxFill(newCapacity); - this.groupAddressByHash = newKey; this.rawHashByHashPosition = rawHashes; - this.groupIdsByHash = newValue; - groupAddressByGroupId.ensureCapacity(maxFill); + this.groupIdsByHash = newGroupIdByHash; return true; } - private long hashPosition(long sliceAddress) + private long hashPosition(int groupId) { - int sliceIndex = decodeSliceIndex(sliceAddress); - int position = decodePosition(sliceAddress); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int blockPosition = groupId & VALUES_PAGE_MASK; if (precomputedHashChannel.isPresent()) { - return getRawHash(sliceIndex, position, precomputedHashChannel.getAsInt()); + return getRawHash(blockIndex, blockPosition, precomputedHashChannel.getAsInt()); } - return hashStrategy.hashPosition(sliceIndex, position); + return hashStrategy.hashPosition(blockIndex, blockPosition); } private long getRawHash(int sliceIndex, int position, int hashChannel) @@ -450,12 +429,14 @@ private long getRawHash(int sliceIndex, int position, int hashChannel) return channelBuilders.get(hashChannel).get(sliceIndex).getLong(position, 0); } - private boolean positionNotDistinctFromCurrentRow(long address, int hashPosition, int position, Page page, byte rawHash, int[] hashChannels) + private boolean positionNotDistinctFromCurrentRow(int groupId, int hashPosition, int position, Page page, byte rawHash, int[] hashChannels) { if (rawHashByHashPosition[hashPosition] != rawHash) { return false; } - return hashStrategy.positionNotDistinctFromRow(decodeSliceIndex(address), decodePosition(address), position, page, hashChannels); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int blockPosition = groupId & VALUES_PAGE_MASK; + return hashStrategy.positionNotDistinctFromRow(blockIndex, blockPosition, position, page, hashChannels); } private static int getHashPosition(long rawHash, int mask) diff --git a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java index bbc0734761f3..e1f8f85ce0c4 100644 --- a/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java +++ b/core/trino-main/src/test/java/io/trino/operator/GroupByHashYieldAssertion.java @@ -167,8 +167,8 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List< expectedReservedExtraBytes = oldCapacity * (long) (Long.BYTES * 1.75 + Integer.BYTES) + page.getRetainedSizeInBytes(); } else { - // groupAddressByHash, groupIdsByHash, and rawHashByHashPosition double by hashCapacity; while groupAddressByGroupId double by maxFill = hashCapacity / 0.75 - expectedReservedExtraBytes = oldCapacity * (long) (Long.BYTES * 1.75 + Integer.BYTES + Byte.BYTES) + page.getRetainedSizeInBytes(); + // groupIdsByHash, and rawHashByHashPosition double by hashCapacity + expectedReservedExtraBytes = oldCapacity * (long) (Integer.BYTES + Byte.BYTES); } assertBetweenInclusive(actualIncreasedMemory, expectedReservedExtraBytes, expectedReservedExtraBytes + additionalMemoryInBytes); @@ -190,10 +190,24 @@ public static GroupByHashYieldResult finishOperatorWithYieldingGroupByHash(List< // Assert the estimated reserved memory before rehash is very close to the one after rehash long rehashedMemoryUsage = operator.getOperatorContext().getDriverContext().getMemoryUsage(); - assertBetweenInclusive(rehashedMemoryUsage * 1.0 / newMemoryUsage, 0.99, 1.01); + double memoryUsageErrorUpperBound = 1.01; + double memoryUsageError = rehashedMemoryUsage * 1.0 / newMemoryUsage; + if (memoryUsageError > memoryUsageErrorUpperBound) { + // Usually the error is < 1%, but since MultiChannelGroupByHash.getEstimatedSize + // accounts for changes in completedPagesMemorySize, which is increased if new page is + // added by addNewGroup (an even that cannot be predicted as it depends on the number of unique groups + // in the current page being processed), the difference includes size of the added new page. + // Lower bound is 1% lower than normal because additionalMemoryInBytes includes also aggregator state. + assertBetweenInclusive(rehashedMemoryUsage * 1.0 / (newMemoryUsage + additionalMemoryInBytes), 0.98, memoryUsageErrorUpperBound, + "rehashedMemoryUsage " + rehashedMemoryUsage + ", newMemoryUsage: " + newMemoryUsage); + } + else { + assertBetweenInclusive(memoryUsageError, 0.99, memoryUsageErrorUpperBound); + } // unblocked assertTrue(operator.needsInput()); + assertTrue(operator.getOperatorContext().isWaitingForMemory().isDone()); } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestDistinctLimitOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestDistinctLimitOperator.java index e5ff25fa33bd..c02353d65aee 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestDistinctLimitOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestDistinctLimitOperator.java @@ -34,7 +34,7 @@ import java.util.concurrent.ScheduledExecutorService; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.operator.GroupByHashYieldAssertion.createPagesWithDistinctHashKeys; @@ -167,9 +167,9 @@ public void testMemoryReservationYield(Type type) joinCompiler, blockTypeOperators); - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((DistinctLimitOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((DistinctLimitOperator) operator).getCapacity(), 450_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); assertEquals(result.getOutput().stream().mapToInt(Page::getPositionCount).sum(), 6_000 * 600); } } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java index 496b6eb8d066..3f05b637aaf2 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHashAggregationOperator.java @@ -62,6 +62,7 @@ import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.airlift.units.DataSize.succinctBytes; @@ -410,9 +411,9 @@ public void testMemoryReservationYield(Type type) // get result with yield; pick a relatively small buffer for aggregator's memory usage GroupByHashYieldResult result; - result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, this::getHashCapacity, 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, this::getHashCapacity, 450_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java index 984bd4453044..92697c462537 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestHashSemiJoinOperator.java @@ -38,7 +38,6 @@ import static com.google.common.collect.Iterables.concat; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; @@ -244,10 +243,10 @@ public void testSemiJoinMemoryReservationYield(Type type) type, setBuilderOperatorFactory, operator -> ((SetBuilderOperator) operator).getCapacity(), - 1_400_000); + 450_000); - assertGreaterThanOrEqual(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + assertGreaterThanOrEqual(result.getYieldCount(), 4); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 19); assertEquals(result.getOutput().stream().mapToInt(Page::getPositionCount).sum(), 0); } diff --git a/core/trino-main/src/test/java/io/trino/operator/TestMarkDistinctOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestMarkDistinctOperator.java index 3ee3e6f9d816..a5807719e758 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestMarkDistinctOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestMarkDistinctOperator.java @@ -38,7 +38,7 @@ import static com.google.common.base.Throwables.throwIfUnchecked; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.testing.Assertions.assertInstanceOf; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; @@ -176,9 +176,9 @@ public void testMemoryReservationYield(Type type) OperatorFactory operatorFactory = new MarkDistinctOperatorFactory(0, new PlanNodeId("test"), ImmutableList.of(type), ImmutableList.of(0), Optional.of(1), joinCompiler, blockTypeOperators); // get result with yield; pick a relatively small buffer for partitionRowCount's memory usage - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((MarkDistinctOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((MarkDistinctOperator) operator).getCapacity(), 450_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestRowNumberOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestRowNumberOperator.java index 5e2a62569ad0..2c8bad1809b4 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestRowNumberOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestRowNumberOperator.java @@ -40,7 +40,7 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.trino.RowPagesBuilder.rowPagesBuilder; import static io.trino.SessionTestUtils.TEST_SESSION; import static io.trino.operator.GroupByHashYieldAssertion.createPagesWithDistinctHashKeys; @@ -171,9 +171,9 @@ public void testMemoryReservationYield(Type type) blockTypeOperators); // get result with yield; pick a relatively small buffer for partitionRowCount's memory usage - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((RowNumberOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((RowNumberOperator) operator).getCapacity(), 280_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/core/trino-main/src/test/java/io/trino/operator/TestTopNRankingOperator.java b/core/trino-main/src/test/java/io/trino/operator/TestTopNRankingOperator.java index c71f50c97233..714bdb047c29 100644 --- a/core/trino-main/src/test/java/io/trino/operator/TestTopNRankingOperator.java +++ b/core/trino-main/src/test/java/io/trino/operator/TestTopNRankingOperator.java @@ -302,7 +302,7 @@ public void testMemoryReservationYield() type, operatorFactory, operator -> ((TopNRankingOperator) operator).getGroupedTopNBuilder() == null ? 0 : ((GroupedTopNRowNumberBuilder) ((TopNRankingOperator) operator).getGroupedTopNBuilder()).getGroupByHash().getCapacity(), - 1_000_000); + 450_000); assertGreaterThan(result.getYieldCount(), 3); assertGreaterThan(result.getMaxReservedBytes(), 5L << 20); diff --git a/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java b/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java index 2bbe55477b09..5c0dedb5220b 100644 --- a/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java +++ b/testing/trino-tests/src/test/java/io/trino/memory/TestMemoryManager.java @@ -234,7 +234,8 @@ public void testClusterPools() List> queryFutures = new ArrayList<>(); for (int i = 0; i < 2; i++) { - queryFutures.add(executor.submit(() -> queryRunner.execute("SELECT COUNT(*), clerk FROM orders GROUP BY clerk"))); + // for this test to work, the query has to have enough groups for HashAggregationOperator to go over QueryContext.GUARANTEED_MEMORY + queryFutures.add(executor.submit(() -> queryRunner.execute("SELECT COUNT(*), cast(orderkey as varchar), partkey FROM lineitem GROUP BY cast(orderkey as varchar), partkey"))); } ClusterMemoryManager memoryManager = queryRunner.getCoordinator().getClusterMemoryManager();