Skip to content

Commit

Permalink
Handle initial row set population for key initializers + preserve empty
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy committed Aug 7, 2022
1 parent f535183 commit 492dd73
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,14 @@ private static QueryTable aggregation(
stateManagerSupplier);
}

final RowSetBuilderRandom initialRowsBuilder = initialKeys != null && !preserveEmpty ?
new BitmapRandomBuilder(stateManager.maxTableSize() - 1) : null;
if (useGrouping) {
initialGroupedKeyAddition(input, reinterpretedKeySources, ac, stateManager, outputPosition, usePrev);
initialGroupedKeyAddition(input, reinterpretedKeySources, ac, stateManager, outputPosition,
initialRowsBuilder, usePrev);
} else {
initialBucketedKeyAddition(input, reinterpretedKeySources, ac, permuteKernels, stateManager,
outputPosition, usePrev);
outputPosition, initialRowsBuilder, usePrev);
}

// Construct and return result table
Expand Down Expand Up @@ -185,8 +188,10 @@ private static QueryTable aggregation(
}
ac.getResultColumns(resultColumnSourceMap);

final TrackingWritableRowSet resultRowSet =
RowSetFactory.flat(outputPosition.intValue()).toTracking();
final TrackingWritableRowSet resultRowSet = (initialRowsBuilder == null
? RowSetFactory.flat(outputPosition.intValue())
: initialRowsBuilder.build()
).toTracking();
if (input.isRefreshing()) {
copyKeyColumns(keyColumnsRaw, keyColumnsCopied, resultRowSet);
}
Expand Down Expand Up @@ -1712,12 +1717,13 @@ private static OperatorAggregationStateManager makeInitializedStateManager(
}

private static void initialBucketedKeyAddition(QueryTable input,
ColumnSource<?>[] reinterpretedKeySources,
AggregationContext ac,
PermuteKernel[] permuteKernels,
OperatorAggregationStateManager stateManager,
MutableInt outputPosition,
boolean usePrev) {
ColumnSource<?>[] reinterpretedKeySources,
AggregationContext ac,
PermuteKernel[] permuteKernels,
OperatorAggregationStateManager stateManager,
MutableInt outputPosition,
RowSetBuilderRandom initialRowsBuilder,
boolean usePrev) {
final boolean findRuns = ac.requiresRunFinds(SKIP_RUN_FIND);

final ChunkSource.GetContext[] getContexts = new ChunkSource.GetContext[ac.size()];
Expand Down Expand Up @@ -1773,6 +1779,9 @@ private static void initialBucketedKeyAddition(QueryTable input,
sharedContext.reset();

stateManager.add(bc, chunkOk, buildSources, outputPosition, outputPositions);
if (initialRowsBuilder != null) {
initialRowsBuilder.addRowKeysChunk(outputPositions);
}

ac.ensureCapacity(outputPosition.intValue());

Expand Down Expand Up @@ -1811,11 +1820,12 @@ private static void initialBucketedKeyAddition(QueryTable input,
}

private static void initialGroupedKeyAddition(QueryTable input,
ColumnSource<?>[] reinterpretedKeySources,
AggregationContext ac,
OperatorAggregationStateManager stateManager,
MutableInt outputPosition,
boolean usePrev) {
ColumnSource<?>[] reinterpretedKeySources,
AggregationContext ac,
OperatorAggregationStateManager stateManager,
MutableInt outputPosition,
RowSetBuilderRandom initialRowsBuilder,
boolean usePrev) {
final Pair<ArrayBackedColumnSource, ObjectArraySource<RowSet>> groupKeyIndexTable;
final RowSetIndexer indexer = RowSetIndexer.of(input.getRowSet());
final Map<Object, RowSet> grouping = usePrev ? indexer.getPrevGrouping(reinterpretedKeySources[0])
Expand All @@ -1842,6 +1852,9 @@ private static void initialGroupedKeyAddition(QueryTable input,
while (rsIt.hasMore()) {
final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(CHUNK_SIZE);
stateManager.add(bc, chunkOk, groupedFlatKeySource, outputPosition, outputPositions);
if (initialRowsBuilder != null) {
initialRowsBuilder.addRowKeysChunk(outputPositions);
}
}
Assert.eq(outputPosition.intValue(), "outputPosition.intValue()", responsiveGroups, "responsiveGroups");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

interface OperatorAggregationStateManager {

int maxTableSize();

SafeCloseable makeAggregationStateBuildContext(ColumnSource<?>[] buildSources, long maxSize);

void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource<?>[] sources, MutableInt nextOutputPosition, WritableIntChunk<RowKeys> outputPositions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected OperatorAggregationStateManagerOpenAddressedAlternateBase(ColumnSource
this.maximumLoadFactor = maximumLoadFactor;
}

@Override
public final int maxTableSize() {
return Math.toIntExact(MAX_TABLE_SIZE);
}

protected abstract void build(RowSequence rowSequence, Chunk<Values>[] sourceKeyChunks);

public static class BuildContext extends BuildOrProbeContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ protected OperatorAggregationStateManagerOpenAddressedBase(ColumnSource<?>[] tab
this.maximumLoadFactor = maximumLoadFactor;
}

@Override
public final int maxTableSize() {
return Math.toIntExact(MAX_TABLE_SIZE);
}

protected abstract void build(RowSequence rowSequence, Chunk<Values>[] sourceKeyChunks);

BuildContext makeBuildContext(ColumnSource<?>[] buildSources, long maxSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public abstract class OperatorAggregationStateManagerTypedBase
public static final int CHUNK_SIZE = ChunkedOperatorAggregationHelper.CHUNK_SIZE;
private static final long MAX_TABLE_SIZE = HashTableColumnSource.MINIMUM_OVERFLOW_HASH_SLOT;

@Override
public final int maxTableSize() {
return Math.toIntExact(MAX_TABLE_SIZE);
}

// the number of slots in our table
private int tableSize;

Expand Down

0 comments on commit 492dd73

Please sign in to comment.