Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove hash collisions reporting #15333

Merged
merged 1 commit into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.type.TypeUtils.NULL_HASH_CODE;
import static io.trino.util.HashCollisionsEstimator.estimateNumberOfHashCollisions;
import static it.unimi.dsi.fastutil.HashCommon.arraySize;
import static it.unimi.dsi.fastutil.HashCommon.murmurHash3;
import static java.lang.Math.min;
Expand Down Expand Up @@ -75,8 +74,6 @@ public class BigintGroupByHash

private int nextGroupId;
private DictionaryLookBack dictionaryLookBack;
private long hashCollisions;
private double expectedHashCollisions;

// reserve enough memory before rehash
private final UpdateMemory updateMemory;
Expand Down Expand Up @@ -116,18 +113,6 @@ public long getEstimatedSize()
preallocatedMemoryInBytes;
}

@Override
public long getHashCollisions()
{
return hashCollisions;
}

@Override
public double getExpectedHashCollisions()
{
return expectedHashCollisions + estimateNumberOfHashCollisions(getGroupCount(), hashCapacity);
}

@Override
public List<Type> getTypes()
{
Expand Down Expand Up @@ -259,7 +244,6 @@ private int putIfAbsent(int position, Block block)

// increment position and mask to handle wrap around
hashPosition = (hashPosition + 1) & mask;
hashCollisions++;
}

return addNewGroup(hashPosition, value);
Expand Down Expand Up @@ -298,8 +282,6 @@ private boolean tryRehash()
}
preallocatedMemoryInBytes = 0;

expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity);

int newMask = newCapacity - 1;
long[] newValues = new long[newCapacity];
int[] newGroupIds = new int[newCapacity];
Expand All @@ -315,7 +297,6 @@ private boolean tryRehash()
// find an empty slot for the address
while (newGroupIds[hashPosition] != -1) {
hashPosition = (hashPosition + 1) & newMask;
hashCollisions++;
}

// record the mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@ static GroupByHash createGroupByHash(

long getEstimatedSize();

long getHashCollisions();

double getExpectedHashCollisions();

List<Type> getTypes();

int getGroupCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,6 @@ public OperatorFactory duplicate()
private final BlockTypeOperators blockTypeOperators;

private final List<Type> types;
private final HashCollisionsCounter hashCollisionsCounter;

private HashAggregationBuilder aggregationBuilder;
private final LocalMemoryContext memoryContext;
Expand Down Expand Up @@ -339,8 +338,6 @@ private HashAggregationOperator(
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null");
this.joinCompiler = requireNonNull(joinCompiler, "joinCompiler is null");
this.blockTypeOperators = requireNonNull(blockTypeOperators, "blockTypeOperators is null");
this.hashCollisionsCounter = new HashCollisionsCounter(operatorContext);
operatorContext.setInfoSupplier(hashCollisionsCounter);

this.memoryContext = operatorContext.localUserMemoryContext();
}
Expand Down Expand Up @@ -534,7 +531,6 @@ private void closeAggregationBuilder()
{
outputPages = null;
if (aggregationBuilder != null) {
aggregationBuilder.recordHashCollisions(hashCollisionsCounter);
aggregationBuilder.close();
// aggregationBuilder.close() will release all memory reserved in memory accounting.
// The reference must be set to null afterwards to avoid unaccounted memory.
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import static io.trino.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.sql.gen.JoinCompiler.PagesHashStrategyFactory;
import static io.trino.util.HashCollisionsEstimator.estimateNumberOfHashCollisions;
import static it.unimi.dsi.fastutil.HashCommon.arraySize;
import static it.unimi.dsi.fastutil.HashCommon.murmurHash3;
import static java.lang.Math.min;
Expand Down Expand Up @@ -89,8 +88,6 @@ public class MultiChannelGroupByHash

private int nextGroupId;
private DictionaryLookBack dictionaryLookBack;
private long hashCollisions;
private double expectedHashCollisions;

// reserve enough memory before rehash
private final UpdateMemory updateMemory;
Expand Down Expand Up @@ -178,18 +175,6 @@ public long getEstimatedSize()
(dictionaryLookBack != null ? dictionaryLookBack.getRetainedSizeInBytes() : 0);
}

@Override
public long getHashCollisions()
{
return hashCollisions;
}

@Override
public double getExpectedHashCollisions()
{
return expectedHashCollisions + estimateNumberOfHashCollisions(getGroupCount(), hashCapacity);
}

@Override
public List<Type> getTypes()
{
Expand Down Expand Up @@ -297,7 +282,6 @@ private int putIfAbsent(int position, Page page, long rawHash)
}
// increment position and mask to handle wrap around
hashPosition = (hashPosition + 1) & mask;
hashCollisions++;
}

// did we find an existing group?
Expand Down Expand Up @@ -381,8 +365,6 @@ private boolean tryRehash()
}
preallocatedMemoryInBytes = 0;

expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity);

int newMask = newCapacity - 1;
byte[] rawHashes = new byte[newCapacity];
int[] newGroupIdByHash = new int[newCapacity];
Expand All @@ -400,7 +382,6 @@ private boolean tryRehash()
int pos = getHashPosition(rawHash, newMask);
while (newGroupIdByHash[pos] != -1) {
pos = (pos + 1) & newMask;
hashCollisions++;
}

// record the mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,6 @@ public long getEstimatedSize()
return INSTANCE_SIZE;
}

@Override
public long getHashCollisions()
{
return 0;
}

@Override
public double getExpectedHashCollisions()
{
return 0;
}

@Override
public List<Type> getTypes()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
@JsonSubTypes.Type(value = LocalExchangeBufferInfo.class, name = "localExchangeBuffer"),
@JsonSubTypes.Type(value = TableFinishInfo.class, name = "tableFinish"),
@JsonSubTypes.Type(value = SplitOperatorInfo.class, name = "splitOperator"),
@JsonSubTypes.Type(value = HashCollisionsInfo.class, name = "hashCollisionsInfo"),
@JsonSubTypes.Type(value = PartitionedOutputInfo.class, name = "partitionedOutput"),
@JsonSubTypes.Type(value = JoinOperatorInfo.class, name = "joinOperatorInfo"),
@JsonSubTypes.Type(value = WindowInfo.class, name = "windowInfo"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package io.trino.operator.aggregation.builder;

import com.google.common.util.concurrent.ListenableFuture;
import io.trino.operator.HashCollisionsCounter;
import io.trino.operator.Work;
import io.trino.operator.WorkProcessor;
import io.trino.spi.Page;
Expand All @@ -30,8 +29,6 @@ public interface HashAggregationBuilder

void updateMemory();

void recordHashCollisions(HashCollisionsCounter hashCollisionsCounter);

@Override
void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.airlift.units.DataSize;
import io.trino.array.IntBigArray;
import io.trino.operator.GroupByHash;
import io.trino.operator.HashCollisionsCounter;
import io.trino.operator.OperatorContext;
import io.trino.operator.TransformWork;
import io.trino.operator.UpdateMemory;
Expand Down Expand Up @@ -161,22 +160,6 @@ public boolean isFull()
return full;
}

@Override
public void recordHashCollisions(HashCollisionsCounter hashCollisionsCounter)
{
hashCollisionsCounter.recordHashCollision(groupByHash.getHashCollisions(), groupByHash.getExpectedHashCollisions());
}

public long getHashCollisions()
{
return groupByHash.getHashCollisions();
}

public double getExpectedHashCollisions()
{
return groupByHash.getExpectedHashCollisions();
}

@Override
public ListenableFuture<Void> startMemoryRevoke()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.trino.memory.context.LocalMemoryContext;
import io.trino.operator.HashCollisionsCounter;
import io.trino.operator.MergeHashSort;
import io.trino.operator.OperatorContext;
import io.trino.operator.Work;
Expand Down Expand Up @@ -70,8 +69,6 @@ public class SpillableHashAggregationBuilder
// todo get rid of that and only use revocable memory
private long emptyHashAggregationBuilderSize;

private long hashCollisions;
private double expectedHashCollisions;
private boolean producingOutput;

public SpillableHashAggregationBuilder(
Expand Down Expand Up @@ -131,14 +128,6 @@ public void updateMemory()
}
}

@Override
public void recordHashCollisions(HashCollisionsCounter hashCollisionsCounter)
{
hashCollisionsCounter.recordHashCollision(hashCollisions, expectedHashCollisions);
hashCollisions = 0;
expectedHashCollisions = 0;
}

@Override
public boolean isFull()
{
Expand Down Expand Up @@ -321,8 +310,6 @@ private WorkProcessor<Page> mergeSortedPages(WorkProcessor<Page> sortedPages, lo
private void rebuildHashAggregationBuilder()
{
if (hashAggregationBuilder != null) {
hashCollisions += hashAggregationBuilder.getHashCollisions();
expectedHashCollisions += hashAggregationBuilder.getExpectedHashCollisions();
hashAggregationBuilder.close();
}

Expand Down
Loading