preShiftIndices,
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManager.java
deleted file mode 100644
index 4bfd834bdd7..00000000000
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManager.java
+++ /dev/null
@@ -1,1791 +0,0 @@
-/**
- * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
- */
-package io.deephaven.engine.table.impl.by;
-
-import io.deephaven.base.verify.Require;
-import io.deephaven.base.verify.Assert;
-import io.deephaven.chunk.*;
-import io.deephaven.chunk.attributes.Any;
-import io.deephaven.chunk.attributes.ChunkPositions;
-import io.deephaven.chunk.attributes.HashCodes;
-import io.deephaven.chunk.attributes.Values;
-import io.deephaven.engine.rowset.*;
-import io.deephaven.engine.table.*;
-import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
-import io.deephaven.engine.rowset.chunkattributes.RowKeys;
-import io.deephaven.util.QueryConstants;
-import io.deephaven.chunk.util.hashing.*;
-// this is ugly to have twice, but we do need it twice for replication
-// @StateChunkIdentityName@ from \QIntChunk\E
-import io.deephaven.chunk.util.hashing.IntChunkEquals;
-import io.deephaven.engine.table.impl.sort.permute.PermuteKernel;
-import io.deephaven.engine.table.impl.sort.timsort.LongIntTimsortKernel;
-import io.deephaven.engine.table.impl.sources.*;
-import io.deephaven.engine.table.impl.util.*;
-
-// mixin rehash
-import java.util.Arrays;
-import io.deephaven.engine.table.impl.sort.permute.IntPermuteKernel;
-// @StateChunkTypeEnum@ from \QInt\E
-import io.deephaven.engine.table.impl.sort.permute.IntPermuteKernel;
-import io.deephaven.engine.table.impl.util.compact.IntCompactKernel;
-import io.deephaven.engine.table.impl.util.compact.LongCompactKernel;
-// endmixin rehash
-
-import io.deephaven.util.SafeCloseableArray;
-import org.jetbrains.annotations.NotNull;
-
-// region extra imports
-import io.deephaven.engine.table.impl.HashTableAnnotations;
-import io.deephaven.util.SafeCloseable;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.Objects;
-// endregion extra imports
-
-import static io.deephaven.util.SafeCloseable.closeArray;
-
-// region class visibility
-public
-// endregion class visibility
-class IncrementalChunkedOperatorAggregationStateManager
- // region extensions
- implements IncrementalOperatorAggregationStateManager
- // endregion extensions
-{
- // region constants
- public static final int CHUNK_SIZE = ChunkedOperatorAggregationHelper.CHUNK_SIZE;
- public static final int MINIMUM_INITIAL_HASH_SIZE = CHUNK_SIZE;
- private static final long MAX_TABLE_SIZE = HashTableColumnSource.MINIMUM_OVERFLOW_HASH_SLOT;
- // endregion constants
-
- // mixin rehash
- static final double DEFAULT_MAX_LOAD_FACTOR = 0.75;
- static final double DEFAULT_TARGET_LOAD_FACTOR = 0.70;
- // endmixin rehash
-
- // region preamble variables
- // endregion preamble variables
-
- @HashTableAnnotations.EmptyStateValue
- // @NullStateValue@ from \QQueryConstants.NULL_INT\E, @StateValueType@ from \Qint\E
- private static final int EMPTY_RIGHT_VALUE = QueryConstants.NULL_INT;
-
- // mixin getStateValue
- // region overflow pivot
- // endregion overflow pivot
- // endmixin getStateValue
-
- // the number of slots in our table
- // mixin rehash
- private int tableSize;
- // endmixin rehash
- // altmixin rehash: private final int tableSize;
-
- // how many key columns we have
- private final int keyColumnCount;
-
- // mixin rehash
- private long numEntries = 0;
-
- /** Our table size must be 2^L (i.e. a power of two); and the pivot is between 2^(L-1) and 2^L.
- *
- * When hashing a value, if hashCode % 2^L < tableHashPivot; then the destination location is hashCode % 2^L.
- * If hashCode % 2^L >= tableHashPivot, then the destination location is hashCode % 2^(L-1). Once the pivot reaches
- * the table size, we can simply double the table size and repeat the process.
- *
- * This has the effect of only using hash table locations < hashTablePivot. When we want to expand the table
- * we can move some of the entries from the location {@code tableHashPivot - 2^(L-1)} to tableHashPivot. This
- * provides for incremental expansion of the hash table, without the need for a full rehash.
- */
- private int tableHashPivot;
-
- // the table will be rehashed to a load factor of targetLoadFactor if our loadFactor exceeds maximumLoadFactor
- // or if it falls below minimum load factor we will instead contract the table
- private double targetLoadFactor = DEFAULT_TARGET_LOAD_FACTOR;
- private double maximumLoadFactor = DEFAULT_MAX_LOAD_FACTOR;
- // TODO: We do not yet support contraction
- // private final double minimumLoadFactor = 0.5;
-
- private final IntegerArraySource freeOverflowLocations = new IntegerArraySource();
- private int freeOverflowCount = 0;
- // endmixin rehash
-
- // the keys for our hash entries
- private final ArrayBackedColumnSource>[] keySources;
- // the location of any overflow entry in this bucket
- private final IntegerArraySource overflowLocationSource = new IntegerArraySource();
-
- // we are going to also reuse this for our state entry, so that we do not need additional storage
- @HashTableAnnotations.StateColumnSource
- // @StateColumnSourceType@ from \QIntegerArraySource\E
- private final IntegerArraySource stateSource
- // @StateColumnSourceConstructor@ from \QIntegerArraySource()\E
- = new IntegerArraySource();
-
- // the keys for overflow
- private int nextOverflowLocation = 0;
- private final ArrayBackedColumnSource> [] overflowKeySources;
- // the location of the next key in an overflow bucket
- private final IntegerArraySource overflowOverflowLocationSource = new IntegerArraySource();
- // the overflow buckets for the state source
- @HashTableAnnotations.OverflowStateColumnSource
- // @StateColumnSourceType@ from \QIntegerArraySource\E
- private final IntegerArraySource overflowStateSource
- // @StateColumnSourceConstructor@ from \QIntegerArraySource()\E
- = new IntegerArraySource();
-
- // the type of each of our key chunks
- private final ChunkType[] keyChunkTypes;
-
- // the operators for hashing and various equality methods
- private final ChunkHasher[] chunkHashers;
- private final ChunkEquals[] chunkEquals;
- private final PermuteKernel[] chunkCopiers;
-
- // mixin rehash
- // If we have objects in our key columns, then we should null them out if we delete an overflow row, this only
- // applies to ObjectArraySources, for primitives we are content to leave the dead entries in the tables, because
- // they will not affect GC.
- private final ObjectArraySource>[] overflowKeyColumnsToNull;
- // endmixin rehash
-
- // region extra variables
- // in position space
- private final LongArraySource rowCountSource = new LongArraySource();
-
- private final IntegerArraySource outputPositionToHashSlot = new IntegerArraySource();
- private final WritableRowRedirection resultIndexToHashSlot = new IntColumnSourceWritableRowRedirection(outputPositionToHashSlot);
- // endregion extra variables
-
- // region constructor visibility
- // endregion constructor visibility
- IncrementalChunkedOperatorAggregationStateManager(ColumnSource>[] tableKeySources
- , int tableSize
- // region constructor arguments
- , double maximumLoadFactor
- , double targetLoadFactor
- // endregion constructor arguments
- ) {
- // region super
- // endregion super
- keyColumnCount = tableKeySources.length;
-
- this.tableSize = tableSize;
- Require.leq(tableSize, "tableSize", MAX_TABLE_SIZE);
- Require.gtZero(tableSize, "tableSize");
- Require.eq(Integer.bitCount(tableSize), "Integer.bitCount(tableSize)", 1);
- // mixin rehash
- this.tableHashPivot = tableSize;
- // endmixin rehash
-
- overflowKeySources = new ArrayBackedColumnSource[keyColumnCount];
- keySources = new ArrayBackedColumnSource[keyColumnCount];
-
- keyChunkTypes = new ChunkType[keyColumnCount];
- chunkHashers = new ChunkHasher[keyColumnCount];
- chunkEquals = new ChunkEquals[keyColumnCount];
- chunkCopiers = new PermuteKernel[keyColumnCount];
-
- for (int ii = 0; ii < keyColumnCount; ++ii) {
- // the sources that we will use to store our hash table
- keySources[ii] = ArrayBackedColumnSource.getMemoryColumnSource(tableSize, tableKeySources[ii].getType());
- keyChunkTypes[ii] = tableKeySources[ii].getChunkType();
-
- overflowKeySources[ii] = ArrayBackedColumnSource.getMemoryColumnSource(CHUNK_SIZE, tableKeySources[ii].getType());
-
- chunkHashers[ii] = ChunkHasher.makeHasher(keyChunkTypes[ii]);
- chunkEquals[ii] = ChunkEquals.makeEqual(keyChunkTypes[ii]);
- chunkCopiers[ii] = PermuteKernel.makePermuteKernel(keyChunkTypes[ii]);
- }
-
- // mixin rehash
- overflowKeyColumnsToNull = Arrays.stream(overflowKeySources).filter(x -> x instanceof ObjectArraySource).map(x -> (ObjectArraySource)x).toArray(ObjectArraySource[]::new);
- // endmixin rehash
-
- // region constructor
- this.maximumLoadFactor = maximumLoadFactor;
- this.targetLoadFactor = targetLoadFactor;
- // endregion constructor
-
- ensureCapacity(tableSize);
- }
-
- private void ensureCapacity(int tableSize) {
- stateSource.ensureCapacity(tableSize);
- overflowLocationSource.ensureCapacity(tableSize);
- for (int ii = 0; ii < keyColumnCount; ++ii) {
- keySources[ii].ensureCapacity(tableSize);
- }
- // region ensureCapacity
- // endregion ensureCapacity
- }
-
- private void ensureOverflowCapacity(WritableIntChunk chunkPositionsToInsertInOverflow) {
- final int locationsToAllocate = chunkPositionsToInsertInOverflow.size();
- // mixin rehash
- if (freeOverflowCount >= locationsToAllocate) {
- return;
- }
- final int newCapacity = nextOverflowLocation + locationsToAllocate - freeOverflowCount;
- // endmixin rehash
- // altmixin rehash: final int newCapacity = nextOverflowLocation + locationsToAllocate;
- overflowOverflowLocationSource.ensureCapacity(newCapacity);
- overflowStateSource.ensureCapacity(newCapacity);
- //noinspection ForLoopReplaceableByForEach
- for (int ii = 0; ii < overflowKeySources.length; ++ii) {
- overflowKeySources[ii].ensureCapacity(newCapacity);
- }
- // region ensureOverflowCapacity
- // endregion ensureOverflowCapacity
- }
-
- // region build wrappers
- @Override
- public void beginUpdateCycle() {
- }
-
- @Override
- public void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources, MutableInt nextOutputPosition, WritableIntChunk outputPositions) {
- if (rowSequence.isEmpty()) {
- return;
- }
- buildTable((BuildContext) bc, rowSequence, sources, nextOutputPosition, outputPositions, null);
- }
-
- @Override
- public void addForUpdate(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources, MutableInt nextOutputPosition, WritableIntChunk outputPositions, WritableIntChunk reincarnatedPositions) {
- if (rowSequence.isEmpty()) {
- return;
- }
- buildTable((BuildContext) bc, rowSequence, sources, nextOutputPosition, outputPositions, reincarnatedPositions);
- }
-
- @Override
- public BuildContext makeAggregationStateBuildContext(ColumnSource>[] buildSources, long maxSize) {
- return makeBuildContext(buildSources, maxSize);
- }
- // endregion build wrappers
-
- class BuildContext implements Context {
- final int chunkSize;
-
- final LongIntTimsortKernel.LongIntSortKernelContext sortContext;
- final ColumnSource.FillContext stateSourceFillContext;
- // mixin rehash
- final ColumnSource.FillContext overflowStateSourceFillContext;
- // endmixin rehash
- final ColumnSource.FillContext overflowFillContext;
- final ColumnSource.FillContext overflowOverflowFillContext;
-
- // the chunk of hashcodes
- final WritableIntChunk hashChunk;
- // the chunk of positions within our table
- final WritableLongChunk tableLocationsChunk;
-
- final ResettableWritableChunk[] writeThroughChunks = getResettableWritableKeyChunks();
- final WritableIntChunk sourcePositions;
- final WritableIntChunk destinationLocationPositionInWriteThrough;
-
- final WritableBooleanChunk filledValues;
- final WritableBooleanChunk equalValues;
-
- // the overflow locations that we need to get from the overflowLocationSource (or overflowOverflowLocationSource)
- final WritableLongChunk overflowLocationsToFetch;
- // the overflow position in the working key chunks, parallel to the overflowLocationsToFetch
- final WritableIntChunk overflowPositionInSourceChunk;
-
- // the position with our hash table that we should insert a value into
- final WritableLongChunk insertTableLocations;
- // the position in our chunk, parallel to the workingChunkInsertTablePositions
- final WritableIntChunk insertPositionsInSourceChunk;
-
- // we sometimes need to check two positions within a single chunk for equality, this contains those positions as pairs
- final WritableIntChunk chunkPositionsToCheckForEquality;
- // While processing overflow insertions, parallel to the chunkPositions to check for equality, the overflow location that
- // is represented by the first of the pairs in chunkPositionsToCheckForEquality
- final WritableLongChunk overflowLocationForEqualityCheck;
-
- // the chunk of state values that we read from the hash table
- // @WritableStateChunkType@ from \QWritableIntChunk\E
- final WritableIntChunk workingStateEntries;
-
- // the chunks for getting key values from the hash table
- final WritableChunk[] workingKeyChunks;
- final WritableChunk[] overflowKeyChunks;
-
- // when fetching from the overflow, we record which chunk position we are fetching for
- final WritableIntChunk chunkPositionsForFetches;
- // which positions in the chunk we are inserting into the overflow
- final WritableIntChunk chunkPositionsToInsertInOverflow;
- // which table locations we are inserting into the overflow
- final WritableLongChunk tableLocationsToInsertInOverflow;
-
- // values we have read from the overflow locations sources
- final WritableIntChunk overflowLocations;
-
- // mixin rehash
- final WritableLongChunk rehashLocations;
- final WritableIntChunk overflowLocationsToMigrate;
- final WritableLongChunk overflowLocationsAsKeyIndices;
- final WritableBooleanChunk shouldMoveBucket;
-
- final ResettableWritableLongChunk overflowLocationForPromotionLoop = ResettableWritableLongChunk.makeResettableChunk();
-
- // mixin allowUpdateWriteThroughState
- // @WritableStateChunkType@ from \QWritableIntChunk\E, @WritableStateChunkName@ from \QWritableIntChunk\E
- final ResettableWritableIntChunk writeThroughState = ResettableWritableIntChunk.makeResettableChunk();
- // endmixin allowUpdateWriteThroughState
- final ResettableWritableIntChunk writeThroughOverflowLocations = ResettableWritableIntChunk.makeResettableChunk();
- // endmixin rehash
-
- final SharedContext sharedFillContext;
- final ColumnSource.FillContext[] workingFillContexts;
- final SharedContext sharedOverflowContext;
- final ColumnSource.FillContext[] overflowContexts;
- final SharedContext sharedBuildContext;
- final ChunkSource.GetContext[] buildContexts;
-
- // region build context
- final WritableIntChunk duplicatePositions;
- final WritableLongChunk addedSlotsByPosition;
- // endregion build context
-
- final boolean haveSharedContexts;
-
- private BuildContext(ColumnSource>[] buildSources,
- int chunkSize
- // region build context constructor args
- // endregion build context constructor args
- ) {
- Assert.gtZero(chunkSize, "chunkSize");
- this.chunkSize = chunkSize;
- haveSharedContexts = buildSources.length > 1;
- if (haveSharedContexts) {
- sharedFillContext = SharedContext.makeSharedContext();
- sharedOverflowContext = SharedContext.makeSharedContext();
- sharedBuildContext = SharedContext.makeSharedContext();
- } else {
- // no point in the additional work implied by these not being null.
- sharedFillContext = null;
- sharedOverflowContext = null;
- sharedBuildContext = null;
- }
- workingFillContexts = makeFillContexts(keySources, sharedFillContext, chunkSize);
- overflowContexts = makeFillContexts(overflowKeySources, sharedOverflowContext, chunkSize);
- buildContexts = makeGetContexts(buildSources, sharedBuildContext, chunkSize);
- // region build context constructor
- duplicatePositions = WritableIntChunk.makeWritableChunk(chunkSize * 2);
- addedSlotsByPosition = WritableLongChunk.makeWritableChunk(chunkSize);
- // endregion build context constructor
- sortContext = LongIntTimsortKernel.createContext(chunkSize);
- stateSourceFillContext = stateSource.makeFillContext(chunkSize);
- overflowFillContext = overflowLocationSource.makeFillContext(chunkSize);
- overflowOverflowFillContext = overflowOverflowLocationSource.makeFillContext(chunkSize);
- hashChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- tableLocationsChunk = WritableLongChunk.makeWritableChunk(chunkSize);
- sourcePositions = WritableIntChunk.makeWritableChunk(chunkSize);
- destinationLocationPositionInWriteThrough = WritableIntChunk.makeWritableChunk(chunkSize);
- filledValues = WritableBooleanChunk.makeWritableChunk(chunkSize);
- equalValues = WritableBooleanChunk.makeWritableChunk(chunkSize);
- overflowLocationsToFetch = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowPositionInSourceChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- insertTableLocations = WritableLongChunk.makeWritableChunk(chunkSize);
- insertPositionsInSourceChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- chunkPositionsToCheckForEquality = WritableIntChunk.makeWritableChunk(chunkSize * 2);
- overflowLocationForEqualityCheck = WritableLongChunk.makeWritableChunk(chunkSize);
- // @WritableStateChunkName@ from \QWritableIntChunk\E
- workingStateEntries = WritableIntChunk.makeWritableChunk(chunkSize);
- workingKeyChunks = getWritableKeyChunks(chunkSize);
- overflowKeyChunks = getWritableKeyChunks(chunkSize);
- chunkPositionsForFetches = WritableIntChunk.makeWritableChunk(chunkSize);
- chunkPositionsToInsertInOverflow = WritableIntChunk.makeWritableChunk(chunkSize);
- tableLocationsToInsertInOverflow = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowLocations = WritableIntChunk.makeWritableChunk(chunkSize);
- // mixin rehash
- rehashLocations = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowStateSourceFillContext = overflowStateSource.makeFillContext(chunkSize);
- overflowLocationsToMigrate = WritableIntChunk.makeWritableChunk(chunkSize);
- overflowLocationsAsKeyIndices = WritableLongChunk.makeWritableChunk(chunkSize);
- shouldMoveBucket = WritableBooleanChunk.makeWritableChunk(chunkSize);
- // endmixin rehash
- }
-
- private void resetSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.reset();
- sharedOverflowContext.reset();
- sharedBuildContext.reset();
- }
-
- private void closeSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.close();
- sharedOverflowContext.close();
- sharedBuildContext.close();
- }
-
- @Override
- public void close() {
- sortContext.close();
- stateSourceFillContext.close();
- // mixin rehash
- overflowStateSourceFillContext.close();
- // endmixin rehash
- overflowFillContext.close();
- overflowOverflowFillContext.close();
- closeArray(workingFillContexts);
- closeArray(overflowContexts);
- closeArray(buildContexts);
-
- hashChunk.close();
- tableLocationsChunk.close();
- closeArray(writeThroughChunks);
-
- sourcePositions.close();
- destinationLocationPositionInWriteThrough.close();
- filledValues.close();
- equalValues.close();
- overflowLocationsToFetch.close();
- overflowPositionInSourceChunk.close();
- insertTableLocations.close();
- insertPositionsInSourceChunk.close();
- chunkPositionsToCheckForEquality.close();
- overflowLocationForEqualityCheck.close();
- workingStateEntries.close();
- closeArray(workingKeyChunks);
- closeArray(overflowKeyChunks);
- chunkPositionsForFetches.close();
- chunkPositionsToInsertInOverflow.close();
- tableLocationsToInsertInOverflow.close();
- overflowLocations.close();
- // mixin rehash
- rehashLocations.close();
- overflowLocationsToMigrate.close();
- overflowLocationsAsKeyIndices.close();
- shouldMoveBucket.close();
- overflowLocationForPromotionLoop.close();
- // mixin allowUpdateWriteThroughState
- writeThroughState.close();
- // endmixin allowUpdateWriteThroughState
- writeThroughOverflowLocations.close();
- // endmixin rehash
- // region build context close
- duplicatePositions.close();
- addedSlotsByPosition.close();
- // endregion build context close
- closeSharedContexts();
- }
-
- }
-
- public BuildContext makeBuildContext(ColumnSource>[] buildSources,
- long maxSize
- // region makeBuildContext args
- // endregion makeBuildContext args
- ) {
- return new BuildContext(buildSources, (int)Math.min(CHUNK_SIZE, maxSize)
- // region makeBuildContext arg pass
- // endregion makeBuildContext arg pass
- );
- }
-
- private void buildTable(final BuildContext bc,
- final RowSequence buildIndex,
- ColumnSource>[] buildSources
- // region extra build arguments
- , final MutableInt nextOutputPosition
- , final WritableIntChunk outputPositions
- , @Nullable final WritableIntChunk reincarnatedPositions
- // endregion extra build arguments
- ) {
- long hashSlotOffset = 0;
- // region build start
-
- outputPositions.setSize(buildIndex.intSize());
- int maxAddedPosition = -1;
- bc.addedSlotsByPosition.setSize(outputPositions.size());
- bc.addedSlotsByPosition.fillWithValue(0, bc.addedSlotsByPosition.size(), RowSequence.NULL_ROW_KEY);
- bc.duplicatePositions.setSize(0);
-
- if (reincarnatedPositions != null) {
- reincarnatedPositions.setSize(0);
- }
-
- // endregion build start
-
- try (final RowSequence.Iterator rsIt = buildIndex.getRowSequenceIterator();
- // region build initialization try
- // endregion build initialization try
- ) {
- // region build initialization
- // endregion build initialization
-
- // chunks to write through to the table key sources
-
-
- //noinspection unchecked
- final Chunk [] sourceKeyChunks = new Chunk[buildSources.length];
-
- while (rsIt.hasMore()) {
- // we reset early to avoid carrying around state for old RowSequence which can't be reused.
- bc.resetSharedContexts();
-
- final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(bc.chunkSize);
-
- getKeyChunks(buildSources, bc.buildContexts, sourceKeyChunks, chunkOk);
- hashKeyChunks(bc.hashChunk, sourceKeyChunks);
-
- // region build loop initialization
- rowCountSource.ensureCapacity(nextOutputPosition.intValue() + chunkOk.size());
- // endregion build loop initialization
-
- // turn hash codes into indices within our table
- convertHashToTableLocations(bc.hashChunk, bc.tableLocationsChunk);
-
- // now fetch the values from the table, note that we do not order these fetches
- fillKeys(bc.workingFillContexts, bc.workingKeyChunks, bc.tableLocationsChunk);
-
- // and the corresponding states, if a value is null, we've found our insertion point
- stateSource.fillChunkUnordered(bc.stateSourceFillContext, bc.workingStateEntries, bc.tableLocationsChunk);
-
- // find things that exist
- // @StateChunkIdentityName@ from \QIntChunk\E
- IntChunkEquals.notEqual(bc.workingStateEntries, EMPTY_RIGHT_VALUE, bc.filledValues);
-
- // to be equal, the location must exist; and each of the keyChunks must match
- bc.equalValues.setSize(bc.filledValues.size());
- bc.equalValues.copyFromChunk(bc.filledValues, 0, 0, bc.filledValues.size());
- checkKeyEquality(bc.equalValues, bc.workingKeyChunks, sourceKeyChunks);
-
- bc.overflowPositionInSourceChunk.setSize(0);
- bc.overflowLocationsToFetch.setSize(0);
- bc.insertPositionsInSourceChunk.setSize(0);
- bc.insertTableLocations.setSize(0);
-
- for (int ii = 0; ii < bc.equalValues.size(); ++ii) {
- final long tableLocation = bc.tableLocationsChunk.get(ii);
- if (bc.equalValues.get(ii)) {
- // region build found main
- final int foundPosition = bc.workingStateEntries.get(ii);
- outputPositions.set(ii, foundPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(foundPosition);
- Assert.geqZero(oldRowCount, "oldRowCount");
- if (reincarnatedPositions != null && oldRowCount == 0) {
- reincarnatedPositions.add(foundPosition);
- }
- rowCountSource.set(foundPosition, oldRowCount + 1);
- // endregion build found main
- } else if (bc.filledValues.get(ii)) {
- // we must handle this as part of the overflow bucket
- bc.overflowPositionInSourceChunk.add(ii);
- bc.overflowLocationsToFetch.add(tableLocation);
- } else {
- // for the values that are empty, we record them in the insert chunks
- bc.insertPositionsInSourceChunk.add(ii);
- bc.insertTableLocations.add(tableLocation);
- }
- }
-
- // we first sort by position; so that we'll not insert things into the table twice or overwrite
- // collisions
- LongIntTimsortKernel.sort(bc.sortContext, bc.insertPositionsInSourceChunk, bc.insertTableLocations);
-
- // the first and last valid table location in our writeThroughChunks
- long firstBackingChunkLocation = -1;
- long lastBackingChunkLocation = -1;
-
- bc.chunkPositionsToCheckForEquality.setSize(0);
- bc.destinationLocationPositionInWriteThrough.setSize(0);
- bc.sourcePositions.setSize(0);
-
- for (int ii = 0; ii < bc.insertPositionsInSourceChunk.size(); ) {
- final int firstChunkPositionForHashLocation = bc.insertPositionsInSourceChunk.get(ii);
- final long currentHashLocation = bc.insertTableLocations.get(ii);
-
- // region main insert
- stateSource.set(currentHashLocation, chunkPositionToPendingState(firstChunkPositionForHashLocation));
-
- bc.addedSlotsByPosition.set(firstChunkPositionForHashLocation, currentHashLocation);
- maxAddedPosition = Math.max(maxAddedPosition, firstChunkPositionForHashLocation);
- // endregion main insert
- // mixin rehash
- numEntries++;
- // endmixin rehash
-
- if (currentHashLocation > lastBackingChunkLocation) {
- flushWriteThrough(bc.sourcePositions, sourceKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
- firstBackingChunkLocation = updateWriteThroughChunks(bc.writeThroughChunks, currentHashLocation, keySources);
- lastBackingChunkLocation = firstBackingChunkLocation + bc.writeThroughChunks[0].size() - 1;
- }
-
- bc.sourcePositions.add(firstChunkPositionForHashLocation);
- bc.destinationLocationPositionInWriteThrough.add((int)(currentHashLocation - firstBackingChunkLocation));
-
- final int currentHashValue = bc.hashChunk.get(firstChunkPositionForHashLocation);
-
- while (++ii < bc.insertTableLocations.size() && bc.insertTableLocations.get(ii) == currentHashLocation) {
- // if this thing is equal to the first one; we should mark the appropriate slot, we don't
- // know the types and don't want to make the virtual calls, so we need to just accumulate
- // the things to check for equality afterwards
- final int chunkPosition = bc.insertPositionsInSourceChunk.get(ii);
- if (bc.hashChunk.get(chunkPosition) != currentHashValue) {
- // we must be an overflow
- bc.overflowPositionInSourceChunk.add(chunkPosition);
- bc.overflowLocationsToFetch.add(currentHashLocation);
- } else {
- // we need to check equality, equal things are the same slot; unequal things are overflow
- bc.chunkPositionsToCheckForEquality.add(firstChunkPositionForHashLocation);
- bc.chunkPositionsToCheckForEquality.add(chunkPosition);
- }
- }
- }
-
- flushWriteThrough(bc.sourcePositions, sourceKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
-
- checkPairEquality(bc.chunkPositionsToCheckForEquality, sourceKeyChunks, bc.equalValues);
-
- for (int ii = 0; ii < bc.equalValues.size(); ii++) {
- final int chunkPosition = bc.chunkPositionsToCheckForEquality.get(ii * 2 + 1);
- final long tableLocation = bc.tableLocationsChunk.get(chunkPosition);
-
- if (bc.equalValues.get(ii)) {
- // region build main duplicate
- bc.duplicatePositions.add(chunkPosition);
- bc.duplicatePositions.add(bc.chunkPositionsToCheckForEquality.get(ii * 2));
- // endregion build main duplicate
- } else {
- // we are an overflow element
- bc.overflowPositionInSourceChunk.add(chunkPosition);
- bc.overflowLocationsToFetch.add(tableLocation);
- }
- }
-
- // now handle overflow
- if (bc.overflowPositionInSourceChunk.size() > 0) {
- // on the first pass we fill from the table's locations
- overflowLocationSource.fillChunkUnordered(bc.overflowFillContext, bc.overflowLocations, bc.overflowLocationsToFetch);
- bc.chunkPositionsToInsertInOverflow.setSize(0);
- bc.tableLocationsToInsertInOverflow.setSize(0);
-
- // overflow slots now contains the positions in the overflow columns
-
- while (bc.overflowPositionInSourceChunk.size() > 0) {
- // now we have the overflow slot for each of the things we are interested in.
- // if the slot is null, then we can insert it and we are complete.
-
- bc.overflowLocationsToFetch.setSize(0);
- bc.chunkPositionsForFetches.setSize(0);
-
- // TODO: Crunch it down
- for (int ii = 0; ii < bc.overflowLocations.size(); ++ii) {
- final int overflowLocation = bc.overflowLocations.get(ii);
- final int chunkPosition = bc.overflowPositionInSourceChunk.get(ii);
- if (overflowLocation == QueryConstants.NULL_INT) {
- // insert me into overflow in the next free overflow slot
- bc.chunkPositionsToInsertInOverflow.add(chunkPosition);
- bc.tableLocationsToInsertInOverflow.add(bc.tableLocationsChunk.get(chunkPosition));
- } else {
- // add to the key positions to fetch
- bc.chunkPositionsForFetches.add(chunkPosition);
- bc.overflowLocationsToFetch.add(overflowLocation);
- }
- }
-
- // if the slot is non-null, then we need to fetch the overflow values for comparison
- fillOverflowKeys(bc.overflowContexts, bc.overflowKeyChunks, bc.overflowLocationsToFetch);
-
- // now compare the value in our overflowKeyChunk to the value in the sourceChunk
- checkLhsPermutedEquality(bc.chunkPositionsForFetches, sourceKeyChunks, bc.overflowKeyChunks, bc.equalValues);
-
- int writePosition = 0;
- for (int ii = 0; ii < bc.equalValues.size(); ++ii) {
- final int chunkPosition = bc.chunkPositionsForFetches.get(ii);
- final long overflowLocation = bc.overflowLocationsToFetch.get(ii);
- if (bc.equalValues.get(ii)) {
- // region build overflow found
- final int position = overflowStateSource.getUnsafe(overflowLocation);
- outputPositions.set(chunkPosition, position);
-
- final long oldRowCount = rowCountSource.getUnsafe(position);
- Assert.geqZero(oldRowCount, "oldRowCount");
- if (reincarnatedPositions != null && oldRowCount == 0) {
- reincarnatedPositions.add(position);
- }
- rowCountSource.set(position, oldRowCount + 1);
- // endregion build overflow found
- } else {
- // otherwise, we need to repeat the overflow calculation, with our next overflow fetch
- bc.overflowLocationsToFetch.set(writePosition, overflowLocation);
- bc.overflowPositionInSourceChunk.set(writePosition++, chunkPosition);
- }
- }
- bc.overflowLocationsToFetch.setSize(writePosition);
- bc.overflowPositionInSourceChunk.setSize(writePosition);
-
- // on subsequent iterations, we are following the overflow chains, so we fill from the overflowOverflowLocationSource
- if (bc.overflowPositionInSourceChunk.size() > 0) {
- overflowOverflowLocationSource.fillChunkUnordered(bc.overflowOverflowFillContext, bc.overflowLocations, bc.overflowLocationsToFetch);
- }
- }
-
- // make sure we actually have enough room to insert stuff where we would like
- ensureOverflowCapacity(bc.chunkPositionsToInsertInOverflow);
-
- firstBackingChunkLocation = -1;
- lastBackingChunkLocation = -1;
- bc.destinationLocationPositionInWriteThrough.setSize(0);
- bc.sourcePositions.setSize(0);
-
- // do the overflow insertions, one per table position at a time; until we have no insertions left
- while (bc.chunkPositionsToInsertInOverflow.size() > 0) {
- // sort by table position
- LongIntTimsortKernel.sort(bc.sortContext, bc.chunkPositionsToInsertInOverflow, bc.tableLocationsToInsertInOverflow);
-
- bc.chunkPositionsToCheckForEquality.setSize(0);
- bc.overflowLocationForEqualityCheck.setSize(0);
-
- for (int ii = 0; ii < bc.chunkPositionsToInsertInOverflow.size(); ) {
- final long tableLocation = bc.tableLocationsToInsertInOverflow.get(ii);
- final int chunkPosition = bc.chunkPositionsToInsertInOverflow.get(ii);
-
- final int allocatedOverflowLocation = allocateOverflowLocation();
-
- // we are inserting into the head of the list, so we move the existing overflow into our overflow
- overflowOverflowLocationSource.set(allocatedOverflowLocation, overflowLocationSource.getUnsafe(tableLocation));
- // and we point the overflow at our slot
- overflowLocationSource.set(tableLocation, allocatedOverflowLocation);
-
- // region build overflow insert
- overflowStateSource.set(allocatedOverflowLocation, chunkPositionToPendingState(chunkPosition));
- bc.addedSlotsByPosition.set(chunkPosition, overflowLocationToHashLocation(allocatedOverflowLocation));
- maxAddedPosition = Math.max(maxAddedPosition, chunkPosition);
- // endregion build overflow insert
-
- // mixin rehash
- numEntries++;
- // endmixin rehash
-
- // get the backing chunk from the overflow keys
- if (allocatedOverflowLocation > lastBackingChunkLocation || allocatedOverflowLocation < firstBackingChunkLocation) {
- flushWriteThrough(bc.sourcePositions, sourceKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
- firstBackingChunkLocation = updateWriteThroughChunks(bc.writeThroughChunks, allocatedOverflowLocation, overflowKeySources);
- lastBackingChunkLocation = firstBackingChunkLocation + bc.writeThroughChunks[0].size() - 1;
- }
-
- // now we must set all of our key values in the overflow
- bc.sourcePositions.add(chunkPosition);
- bc.destinationLocationPositionInWriteThrough.add((int)(allocatedOverflowLocation - firstBackingChunkLocation));
-
- while (++ii < bc.tableLocationsToInsertInOverflow.size() && bc.tableLocationsToInsertInOverflow.get(ii) == tableLocation) {
- bc.overflowLocationForEqualityCheck.add(allocatedOverflowLocation);
- bc.chunkPositionsToCheckForEquality.add(chunkPosition);
- bc.chunkPositionsToCheckForEquality.add(bc.chunkPositionsToInsertInOverflow.get(ii));
- }
- }
-
- // now we need to do the equality check; so that we can mark things appropriately
- int remainingInserts = 0;
-
- checkPairEquality(bc.chunkPositionsToCheckForEquality, sourceKeyChunks, bc.equalValues);
- for (int ii = 0; ii < bc.equalValues.size(); ii++) {
- final int chunkPosition = bc.chunkPositionsToCheckForEquality.get(ii * 2 + 1);
- final long tableLocation = bc.tableLocationsChunk.get(chunkPosition);
-
- if (bc.equalValues.get(ii)) {
- final long insertedOverflowLocation = bc.overflowLocationForEqualityCheck.get(ii);
- // region build overflow duplicate
- bc.duplicatePositions.add(chunkPosition);
- bc.duplicatePositions.add(bc.chunkPositionsToCheckForEquality.get(ii * 2));
- // endregion build overflow duplicate
- } else {
- // we need to try this element again in the next round
- bc.chunkPositionsToInsertInOverflow.set(remainingInserts, chunkPosition);
- bc.tableLocationsToInsertInOverflow.set(remainingInserts++, tableLocation);
- }
- }
-
- bc.chunkPositionsToInsertInOverflow.setSize(remainingInserts);
- bc.tableLocationsToInsertInOverflow.setSize(remainingInserts);
- }
- flushWriteThrough(bc.sourcePositions, sourceKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
- // mixin rehash
- // region post-build rehash
- doRehash(bc);
- // endregion post-build rehash
- // endmixin rehash
- }
-
- // region copy hash slots
- outputPositionToHashSlot.ensureCapacity(nextOutputPosition.intValue() + maxAddedPosition + 1);
- for (int ii = 0; ii <= maxAddedPosition; ++ii) {
- final long longSlot = bc.addedSlotsByPosition.get(ii);
- if (longSlot != RowSequence.NULL_ROW_KEY) {
- final int intSlot = (int) longSlot;
-
- outputPositions.set(ii, nextOutputPosition.intValue());
- if (isOverflowLocation(intSlot)) {
- overflowStateSource.set(hashLocationToOverflowLocation(intSlot), nextOutputPosition.intValue());
- } else {
- stateSource.set(intSlot, nextOutputPosition.intValue());
- }
- rowCountSource.set(nextOutputPosition.intValue(), 1L);
-
- outputPositionToHashSlot.set(nextOutputPosition.intValue(), intSlot);
- nextOutputPosition.increment();
- }
- }
-
- for (int ii = 0; ii < bc.duplicatePositions.size(); ii += 2) {
- final int position = outputPositions.get(bc.duplicatePositions.get(ii + 1));
- outputPositions.set(bc.duplicatePositions.get(ii), position);
- rowCountSource.set(position, rowCountSource.getUnsafe(position) + 1L);
- }
- // endregion copy hash slots
- hashSlotOffset += chunkOk.size();
- }
- // region post build loop
- // endregion post build loop
- }
- }
-
- // mixin rehash
- public void doRehash(BuildContext bc
- // region extra rehash arguments
- // endregion extra rehash arguments
- ) {
- long firstBackingChunkLocation;
- long lastBackingChunkLocation;// mixin rehash
- // region rehash start
- // endregion rehash start
- while (rehashRequired()) {
- // region rehash loop start
- // endregion rehash loop start
- if (tableHashPivot == tableSize) {
- tableSize *= 2;
- ensureCapacity(tableSize);
- // region rehash ensure capacity
- // endregion rehash ensure capacity
- }
-
- final long targetBuckets = Math.min(MAX_TABLE_SIZE, (long)(numEntries / targetLoadFactor));
- final int bucketsToAdd = Math.max(1, (int)Math.min(Math.min(targetBuckets, tableSize) - tableHashPivot, bc.chunkSize));
-
- initializeRehashLocations(bc.rehashLocations, bucketsToAdd);
-
- // fill the overflow bucket locations
- overflowLocationSource.fillChunk(bc.overflowFillContext, bc.overflowLocations, RowSequenceFactory.wrapRowKeysChunkAsRowSequence(LongChunk.downcast(bc.rehashLocations)));
- // null out the overflow locations in the table
- setOverflowLocationsToNull(tableHashPivot - (tableSize >> 1), bucketsToAdd);
-
- while (bc.overflowLocations.size() > 0) {
- // figure out which table location each overflow location maps to
- compactOverflowLocations(bc.overflowLocations, bc.overflowLocationsToFetch);
- if (bc.overflowLocationsToFetch.size() == 0) {
- break;
- }
-
- fillOverflowKeys(bc.overflowContexts, bc.workingKeyChunks, bc.overflowLocationsToFetch);
- hashKeyChunks(bc.hashChunk, bc.workingKeyChunks);
- convertHashToTableLocations(bc.hashChunk, bc.tableLocationsChunk, tableHashPivot + bucketsToAdd);
-
- // read the next chunk of overflow locations, which we will be overwriting in the next step
- overflowOverflowLocationSource.fillChunkUnordered(bc.overflowOverflowFillContext, bc.overflowLocations, bc.overflowLocationsToFetch);
-
- // swap the table's overflow pointer with our location
- swapOverflowPointers(bc.tableLocationsChunk, bc.overflowLocationsToFetch);
- }
-
- // now rehash the main entries
-
- stateSource.fillChunkUnordered(bc.stateSourceFillContext, bc.workingStateEntries, bc.rehashLocations);
- // @StateChunkIdentityName@ from \QIntChunk\E
- IntChunkEquals.notEqual(bc.workingStateEntries, EMPTY_RIGHT_VALUE, bc.shouldMoveBucket);
-
- // crush down things that don't exist
- LongCompactKernel.compact(bc.rehashLocations, bc.shouldMoveBucket);
-
- // get the keys from the table
- fillKeys(bc.workingFillContexts, bc.workingKeyChunks, bc.rehashLocations);
- hashKeyChunks(bc.hashChunk, bc.workingKeyChunks);
- convertHashToTableLocations(bc.hashChunk, bc.tableLocationsChunk, tableHashPivot + bucketsToAdd);
-
- // figure out which ones must move
- LongChunkEquals.notEqual(bc.tableLocationsChunk, bc.rehashLocations, bc.shouldMoveBucket);
-
- firstBackingChunkLocation = -1;
- lastBackingChunkLocation = -1;
- // flushWriteThrough will have zero-ed out the sourcePositions and destinationLocationPositionInWriteThrough size
-
- int moves = 0;
- for (int ii = 0; ii < bc.shouldMoveBucket.size(); ++ii) {
- if (bc.shouldMoveBucket.get(ii)) {
- moves++;
- final long newHashLocation = bc.tableLocationsChunk.get(ii);
- final long oldHashLocation = bc.rehashLocations.get(ii);
-
- if (newHashLocation > lastBackingChunkLocation) {
- flushWriteThrough(bc.sourcePositions, bc.workingKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
- firstBackingChunkLocation = updateWriteThroughChunks(bc.writeThroughChunks, newHashLocation, keySources);
- lastBackingChunkLocation = firstBackingChunkLocation + bc.writeThroughChunks[0].size() - 1;
- }
-
- // @StateValueType@ from \Qint\E
- final int stateValueToMove = stateSource.getUnsafe(oldHashLocation);
- stateSource.set(newHashLocation, stateValueToMove);
- stateSource.set(oldHashLocation, EMPTY_RIGHT_VALUE);
- // region rehash move values
- if (isPendingState(stateValueToMove)) {
- bc.addedSlotsByPosition.set(pendingStateToChunkPosition(stateValueToMove), newHashLocation);
- } else {
- outputPositionToHashSlot.set(stateValueToMove, (int) newHashLocation);
- }
- // endregion rehash move values
-
- bc.sourcePositions.add(ii);
- bc.destinationLocationPositionInWriteThrough.add((int)(newHashLocation - firstBackingChunkLocation));
- }
- }
- flushWriteThrough(bc.sourcePositions, bc.workingKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
-
- // everything has been rehashed now, but we have some table locations that might have an overflow,
- // without actually having a main entry. We walk through the empty main entries, pulling non-empty
- // overflow locations into the main table
-
- // figure out which of the two possible locations is empty, because (1) we moved something from it
- // or (2) we did not move something to it
- bc.overflowLocationsToFetch.setSize(bc.shouldMoveBucket.size());
- final int totalPromotionsToProcess = bc.shouldMoveBucket.size();
- createOverflowPartitions(bc.overflowLocationsToFetch, bc.rehashLocations, bc.shouldMoveBucket, moves);
-
- for (int loop = 0; loop < 2; loop++) {
- final boolean firstLoop = loop == 0;
-
- if (firstLoop) {
- bc.overflowLocationForPromotionLoop.resetFromTypedChunk(bc.overflowLocationsToFetch, 0, moves);
- } else {
- bc.overflowLocationForPromotionLoop.resetFromTypedChunk(bc.overflowLocationsToFetch, moves, totalPromotionsToProcess - moves);
- }
-
- overflowLocationSource.fillChunk(bc.overflowFillContext, bc.overflowLocations, RowSequenceFactory.wrapRowKeysChunkAsRowSequence(bc.overflowLocationForPromotionLoop));
- IntChunkEquals.notEqual(bc.overflowLocations, QueryConstants.NULL_INT, bc.shouldMoveBucket);
-
- // crunch the chunk down to relevant locations
- LongCompactKernel.compact(bc.overflowLocationForPromotionLoop, bc.shouldMoveBucket);
- IntCompactKernel.compact(bc.overflowLocations, bc.shouldMoveBucket);
-
- IntToLongCast.castInto(IntChunk.downcast(bc.overflowLocations), bc.overflowLocationsAsKeyIndices);
-
- // now fetch the overflow key values
- fillOverflowKeys(bc.overflowContexts, bc.workingKeyChunks, bc.overflowLocationsAsKeyIndices);
- // and their state values
- overflowStateSource.fillChunkUnordered(bc.overflowStateSourceFillContext, bc.workingStateEntries, bc.overflowLocationsAsKeyIndices);
- // and where their next pointer is
- overflowOverflowLocationSource.fillChunkUnordered(bc.overflowOverflowFillContext, bc.overflowLocationsToMigrate, bc.overflowLocationsAsKeyIndices);
-
- // we'll have two sorted regions intermingled in the overflowLocationsToFetch, one of them is before the pivot, the other is after the pivot
- // so that we can use our write through chunks, we first process the things before the pivot; then have a separate loop for those
- // that go after
- firstBackingChunkLocation = -1;
- lastBackingChunkLocation = -1;
-
- for (int ii = 0; ii < bc.overflowLocationForPromotionLoop.size(); ++ii) {
- final long tableLocation = bc.overflowLocationForPromotionLoop.get(ii);
- if ((firstLoop && tableLocation < tableHashPivot) || (!firstLoop && tableLocation >= tableHashPivot)) {
- if (tableLocation > lastBackingChunkLocation) {
- if (bc.sourcePositions.size() > 0) {
- // the permutes here are flushing the write through for the state and overflow locations
-
- // mixin allowUpdateWriteThroughState
- // @StateChunkTypeEnum@ from \QInt\E
- IntPermuteKernel.permute(bc.sourcePositions, bc.workingStateEntries, bc.destinationLocationPositionInWriteThrough, bc.writeThroughState);
- // endmixin allowUpdateWriteThroughState
- IntPermuteKernel.permute(bc.sourcePositions, bc.overflowLocationsToMigrate, bc.destinationLocationPositionInWriteThrough, bc.writeThroughOverflowLocations);
- flushWriteThrough(bc.sourcePositions, bc.workingKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
- }
-
- firstBackingChunkLocation = updateWriteThroughChunks(bc.writeThroughChunks, tableLocation, keySources);
- lastBackingChunkLocation = firstBackingChunkLocation + bc.writeThroughChunks[0].size() - 1;
- // mixin allowUpdateWriteThroughState
- updateWriteThroughState(bc.writeThroughState, firstBackingChunkLocation, lastBackingChunkLocation);
- // endmixin allowUpdateWriteThroughState
- updateWriteThroughOverflow(bc.writeThroughOverflowLocations, firstBackingChunkLocation, lastBackingChunkLocation);
- }
- bc.sourcePositions.add(ii);
- bc.destinationLocationPositionInWriteThrough.add((int)(tableLocation - firstBackingChunkLocation));
- // region promotion move
- final long overflowLocation = bc.overflowLocationsAsKeyIndices.get(ii);
- final int positionForSlot = overflowStateSource.getUnsafe(overflowLocation);
- if (isPendingState(positionForSlot)) {
- bc.addedSlotsByPosition.set(pendingStateToChunkPosition(positionForSlot), tableLocation);
- } else {
- outputPositionToHashSlot.set(positionForSlot, (int) tableLocation);
- }
- // endregion promotion move
- }
- }
-
- // the permutes are completing the state and overflow promotions write through
- // mixin allowUpdateWriteThroughState
- // @StateChunkTypeEnum@ from \QInt\E
- IntPermuteKernel.permute(bc.sourcePositions, bc.workingStateEntries, bc.destinationLocationPositionInWriteThrough, bc.writeThroughState);
- // endmixin allowUpdateWriteThroughState
- IntPermuteKernel.permute(bc.sourcePositions, bc.overflowLocationsToMigrate, bc.destinationLocationPositionInWriteThrough, bc.writeThroughOverflowLocations);
- flushWriteThrough(bc.sourcePositions, bc.workingKeyChunks, bc.destinationLocationPositionInWriteThrough, bc.writeThroughChunks);
-
- // now mark these overflow locations as free, so that we can reuse them
- freeOverflowLocations.ensureCapacity(freeOverflowCount + bc.overflowLocations.size());
- // by sorting them, they will be more likely to be in the same write through chunk when we pull them from the free list
- bc.overflowLocations.sort();
- for (int ii = 0; ii < bc.overflowLocations.size(); ++ii) {
- freeOverflowLocations.set(freeOverflowCount++, bc.overflowLocations.get(ii));
- }
- nullOverflowObjectSources(bc.overflowLocations);
- }
-
- tableHashPivot += bucketsToAdd;
- // region rehash loop end
- // endregion rehash loop end
- }
- // region rehash final
- // endregion rehash final
- }
-
- public boolean rehashRequired() {
- return numEntries > (tableHashPivot * maximumLoadFactor) && tableHashPivot < MAX_TABLE_SIZE;
- }
-
- /**
- * This function can be stuck in for debugging if you are breaking the table to make sure each slot still corresponds
- * to the correct location.
- */
- @SuppressWarnings({"unused", "unchecked"})
- private void verifyKeyHashes() {
- final int maxSize = tableHashPivot;
-
- final ChunkSource.FillContext [] keyFillContext = makeFillContexts(keySources, SharedContext.makeSharedContext(), maxSize);
- final WritableChunk [] keyChunks = getWritableKeyChunks(maxSize);
-
- try (final WritableLongChunk positions = WritableLongChunk.makeWritableChunk(maxSize);
- final WritableBooleanChunk exists = WritableBooleanChunk.makeWritableChunk(maxSize);
- final WritableIntChunk hashChunk = WritableIntChunk.makeWritableChunk(maxSize);
- final WritableLongChunk tableLocationsChunk = WritableLongChunk.makeWritableChunk(maxSize);
- final SafeCloseableArray ignored = new SafeCloseableArray<>(keyFillContext);
- final SafeCloseableArray ignored2 = new SafeCloseableArray<>(keyChunks);
- // @StateChunkName@ from \QIntChunk\E
- final WritableIntChunk stateChunk = WritableIntChunk.makeWritableChunk(maxSize);
- final ChunkSource.FillContext fillContext = stateSource.makeFillContext(maxSize)) {
-
- stateSource.fillChunk(fillContext, stateChunk, RowSetFactory.flat(tableHashPivot));
-
- ChunkUtils.fillInOrder(positions);
-
- // @StateChunkIdentityName@ from \QIntChunk\E
- IntChunkEquals.notEqual(stateChunk, EMPTY_RIGHT_VALUE, exists);
-
- // crush down things that don't exist
- LongCompactKernel.compact(positions, exists);
-
- // get the keys from the table
- fillKeys(keyFillContext, keyChunks, positions);
- hashKeyChunks(hashChunk, keyChunks);
- convertHashToTableLocations(hashChunk, tableLocationsChunk, tableHashPivot);
-
- for (int ii = 0; ii < positions.size(); ++ii) {
- if (tableLocationsChunk.get(ii) != positions.get(ii)) {
- throw new IllegalStateException();
- }
- }
- }
- }
-
- void setTargetLoadFactor(final double targetLoadFactor) {
- this.targetLoadFactor = targetLoadFactor;
- }
-
- void setMaximumLoadFactor(final double maximumLoadFactor) {
- this.maximumLoadFactor = maximumLoadFactor;
- }
-
- private void createOverflowPartitions(WritableLongChunk overflowLocationsToFetch, WritableLongChunk rehashLocations, WritableBooleanChunk shouldMoveBucket, int moves) {
- int startWritePosition = 0;
- int endWritePosition = moves;
- for (int ii = 0; ii < shouldMoveBucket.size(); ++ii) {
- if (shouldMoveBucket.get(ii)) {
- final long oldHashLocation = rehashLocations.get(ii);
- // this needs to be promoted, because we moved it
- overflowLocationsToFetch.set(startWritePosition++, oldHashLocation);
- } else {
- // we didn't move anything into the destination slot; so we need to promote its overflow
- final long newEmptyHashLocation = rehashLocations.get(ii) + (tableSize >> 1);
- overflowLocationsToFetch.set(endWritePosition++, newEmptyHashLocation);
- }
- }
- }
-
- private void setOverflowLocationsToNull(long start, int count) {
- for (int ii = 0; ii < count; ++ii) {
- overflowLocationSource.set(start + ii, QueryConstants.NULL_INT);
- }
- }
-
- private void initializeRehashLocations(WritableLongChunk rehashLocations, int bucketsToAdd) {
- rehashLocations.setSize(bucketsToAdd);
- for (int ii = 0; ii < bucketsToAdd; ++ii) {
- rehashLocations.set(ii, tableHashPivot + ii - (tableSize >> 1));
- }
- }
-
- private void compactOverflowLocations(IntChunk overflowLocations, WritableLongChunk overflowLocationsToFetch) {
- overflowLocationsToFetch.setSize(0);
- for (int ii = 0; ii < overflowLocations.size(); ++ii) {
- final int overflowLocation = overflowLocations.get(ii);
- if (overflowLocation != QueryConstants.NULL_INT) {
- overflowLocationsToFetch.add(overflowLocation);
- }
- }
- }
-
- private void swapOverflowPointers(LongChunk tableLocationsChunk, LongChunk overflowLocationsToFetch) {
- for (int ii = 0; ii < overflowLocationsToFetch.size(); ++ii) {
- final long newLocation = tableLocationsChunk.get(ii);
- final int existingOverflow = overflowLocationSource.getUnsafe(newLocation);
- final long overflowLocation = overflowLocationsToFetch.get(ii);
- overflowOverflowLocationSource.set(overflowLocation, existingOverflow);
- overflowLocationSource.set(newLocation, (int)overflowLocation);
- }
- }
-
- // mixin allowUpdateWriteThroughState
- // @WritableStateChunkType@ from \QWritableIntChunk\E
- private void updateWriteThroughState(ResettableWritableIntChunk writeThroughState, long firstPosition, long expectedLastPosition) {
- final long firstBackingChunkPosition = stateSource.resetWritableChunkToBackingStore(writeThroughState, firstPosition);
- if (firstBackingChunkPosition != firstPosition) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- if (firstBackingChunkPosition + writeThroughState.size() - 1 != expectedLastPosition) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- }
- // endmixin allowUpdateWriteThroughState
-
- private void updateWriteThroughOverflow(ResettableWritableIntChunk writeThroughOverflow, long firstPosition, long expectedLastPosition) {
- final long firstBackingChunkPosition = overflowLocationSource.resetWritableChunkToBackingStore(writeThroughOverflow, firstPosition);
- if (firstBackingChunkPosition != firstPosition) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- if (firstBackingChunkPosition + writeThroughOverflow.size() - 1 != expectedLastPosition) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- }
-
- // endmixin rehash
-
- private int allocateOverflowLocation() {
- // mixin rehash
- if (freeOverflowCount > 0) {
- return freeOverflowLocations.getUnsafe(--freeOverflowCount);
- }
- // endmixin rehash
- return nextOverflowLocation++;
- }
-
- private static long updateWriteThroughChunks(ResettableWritableChunk[] writeThroughChunks, long currentHashLocation, ArrayBackedColumnSource>[] sources) {
- final long firstBackingChunkPosition = sources[0].resetWritableChunkToBackingStore(writeThroughChunks[0], currentHashLocation);
- for (int jj = 1; jj < sources.length; ++jj) {
- if (sources[jj].resetWritableChunkToBackingStore(writeThroughChunks[jj], currentHashLocation) != firstBackingChunkPosition) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- if (writeThroughChunks[jj].size() != writeThroughChunks[0].size()) {
- throw new IllegalStateException("ArrayBackedColumnSources have different block sizes!");
- }
- }
- return firstBackingChunkPosition;
- }
-
- private void flushWriteThrough(WritableIntChunk sourcePositions, Chunk[] sourceKeyChunks, WritableIntChunk destinationLocationPositionInWriteThrough, WritableChunk[] writeThroughChunks) {
- if (sourcePositions.size() < 0) {
- return;
- }
- for (int jj = 0; jj < keySources.length; ++jj) {
- chunkCopiers[jj].permute(sourcePositions, sourceKeyChunks[jj], destinationLocationPositionInWriteThrough, writeThroughChunks[jj]);
- }
- sourcePositions.setSize(0);
- destinationLocationPositionInWriteThrough.setSize(0);
- }
-
- // mixin rehash
- private void nullOverflowObjectSources(IntChunk locationsToNull) {
- for (ObjectArraySource> objectArraySource : overflowKeyColumnsToNull) {
- for (int ii = 0; ii < locationsToNull.size(); ++ii) {
- objectArraySource.set(locationsToNull.get(ii), null);
- }
- }
- // region nullOverflowObjectSources
- // endregion nullOverflowObjectSources
- }
- // endmixin rehash
-
- private void checkKeyEquality(WritableBooleanChunk equalValues, WritableChunk[] workingKeyChunks, Chunk[] sourceKeyChunks) {
- for (int ii = 0; ii < sourceKeyChunks.length; ++ii) {
- chunkEquals[ii].andEqual(workingKeyChunks[ii], sourceKeyChunks[ii], equalValues);
- }
- }
-
- private void checkLhsPermutedEquality(WritableIntChunk chunkPositionsForFetches, Chunk[] sourceKeyChunks, WritableChunk[] overflowKeyChunks, WritableBooleanChunk equalValues) {
- chunkEquals[0].equalLhsPermuted(chunkPositionsForFetches, sourceKeyChunks[0], overflowKeyChunks[0], equalValues);
- for (int ii = 1; ii < overflowKeySources.length; ++ii) {
- chunkEquals[ii].andEqualLhsPermuted(chunkPositionsForFetches, sourceKeyChunks[ii], overflowKeyChunks[ii], equalValues);
- }
- }
-
- private void checkPairEquality(WritableIntChunk chunkPositionsToCheckForEquality, Chunk[] sourceKeyChunks, WritableBooleanChunk equalPairs) {
- chunkEquals[0].equalPairs(chunkPositionsToCheckForEquality, sourceKeyChunks[0], equalPairs);
- for (int ii = 1; ii < keyColumnCount; ++ii) {
- chunkEquals[ii].andEqualPairs(chunkPositionsToCheckForEquality, sourceKeyChunks[ii], equalPairs);
- }
- }
-
- private void fillKeys(ColumnSource.FillContext[] fillContexts, WritableChunk[] keyChunks, WritableLongChunk tableLocationsChunk) {
- fillKeys(keySources, fillContexts, keyChunks, tableLocationsChunk);
- }
-
- private void fillOverflowKeys(ColumnSource.FillContext[] fillContexts, WritableChunk[] keyChunks, WritableLongChunk overflowLocationsChunk) {
- fillKeys(overflowKeySources, fillContexts, keyChunks, overflowLocationsChunk);
- }
-
- private static void fillKeys(ArrayBackedColumnSource>[] keySources, ColumnSource.FillContext[] fillContexts, WritableChunk[] keyChunks, WritableLongChunk keyIndices) {
- for (int ii = 0; ii < keySources.length; ++ii) {
- keySources[ii].fillChunkUnordered(fillContexts[ii], keyChunks[ii], keyIndices);
- }
- }
-
- private void hashKeyChunks(WritableIntChunk hashChunk, Chunk[] sourceKeyChunks) {
- chunkHashers[0].hashInitial(sourceKeyChunks[0], hashChunk);
- for (int ii = 1; ii < sourceKeyChunks.length; ++ii) {
- chunkHashers[ii].hashUpdate(sourceKeyChunks[ii], hashChunk);
- }
- }
-
- private void getKeyChunks(ColumnSource>[] sources, ColumnSource.GetContext[] contexts, Chunk extends Values>[] chunks, RowSequence rowSequence) {
- for (int ii = 0; ii < chunks.length; ++ii) {
- chunks[ii] = sources[ii].getChunk(contexts[ii], rowSequence);
- }
- }
-
- // mixin prev
- private void getPrevKeyChunks(ColumnSource>[] sources, ColumnSource.GetContext[] contexts, Chunk extends Values>[] chunks, RowSequence rowSequence) {
- for (int ii = 0; ii < chunks.length; ++ii) {
- chunks[ii] = sources[ii].getPrevChunk(contexts[ii], rowSequence);
- }
- }
- // endmixin prev
-
- // region probe wrappers
- @Override
- public void remove(final SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources, WritableIntChunk outputPositions, WritableIntChunk emptiedPositions) {
- if (rowSequence.isEmpty()) {
- return;
- }
- decorationProbe((ProbeContext)pc, rowSequence, sources, true, true, outputPositions, emptiedPositions);
- }
-
- @Override
- public void findModifications(final SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources, WritableIntChunk outputPositions) {
- if (rowSequence.isEmpty()) {
- return;
- }
- decorationProbe((ProbeContext)pc, rowSequence, sources, false, false, outputPositions, null);
- }
- // endregion probe wrappers
-
- // mixin decorationProbe
- class ProbeContext implements Context {
- final int chunkSize;
-
- final ColumnSource.FillContext stateSourceFillContext;
- final ColumnSource.FillContext overflowFillContext;
- final ColumnSource.FillContext overflowOverflowFillContext;
-
- final SharedContext sharedFillContext;
- final ColumnSource.FillContext[] workingFillContexts;
- final SharedContext sharedOverflowContext;
- final ColumnSource.FillContext[] overflowContexts;
-
- // the chunk of hashcodes
- final WritableIntChunk hashChunk;
- // the chunk of positions within our table
- final WritableLongChunk tableLocationsChunk;
-
- // the chunk of right indices that we read from the hash table, the empty right index is used as a sentinel that the
- // state exists; otherwise when building from the left it is always null
- // @WritableStateChunkType@ from \QWritableIntChunk\E
- final WritableIntChunk workingStateEntries;
-
- // the overflow locations that we need to get from the overflowLocationSource (or overflowOverflowLocationSource)
- final WritableLongChunk overflowLocationsToFetch;
- // the overflow position in the working keychunks, parallel to the overflowLocationsToFetch
- final WritableIntChunk overflowPositionInWorkingChunk;
- // values we have read from the overflow locations sources
- final WritableIntChunk overflowLocations;
- // when fetching from the overflow, we record which chunk position we are fetching for
- final WritableIntChunk chunkPositionsForFetches;
-
- final WritableBooleanChunk equalValues;
- final WritableChunk[] workingKeyChunks;
-
- final SharedContext sharedProbeContext;
- // the contexts for filling from our key columns
- final ChunkSource.GetContext[] probeContexts;
-
- // region probe context
- // endregion probe context
- final boolean haveSharedContexts;
-
- private ProbeContext(ColumnSource>[] probeSources,
- int chunkSize
- // region probe context constructor args
- // endregion probe context constructor args
- ) {
- Assert.gtZero(chunkSize, "chunkSize");
- this.chunkSize = chunkSize;
- haveSharedContexts = probeSources.length > 1;
- if (haveSharedContexts) {
- sharedFillContext = SharedContext.makeSharedContext();
- sharedOverflowContext = SharedContext.makeSharedContext();
- sharedProbeContext = SharedContext.makeSharedContext();
- } else {
- // No point in the additional work implied by these being non null.
- sharedFillContext = null;
- sharedOverflowContext = null;
- sharedProbeContext = null;
- }
- workingFillContexts = makeFillContexts(keySources, sharedFillContext, chunkSize);
- overflowContexts = makeFillContexts(overflowKeySources, sharedOverflowContext, chunkSize);
- probeContexts = makeGetContexts(probeSources, sharedProbeContext, chunkSize);
- // region probe context constructor
- // endregion probe context constructor
- stateSourceFillContext = stateSource.makeFillContext(chunkSize);
- overflowFillContext = overflowLocationSource.makeFillContext(chunkSize);
- overflowOverflowFillContext = overflowOverflowLocationSource.makeFillContext(chunkSize);
- hashChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- tableLocationsChunk = WritableLongChunk.makeWritableChunk(chunkSize);
- // @WritableStateChunkName@ from \QWritableIntChunk\E
- workingStateEntries = WritableIntChunk.makeWritableChunk(chunkSize);
- overflowLocationsToFetch = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowPositionInWorkingChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- overflowLocations = WritableIntChunk.makeWritableChunk(chunkSize);
- chunkPositionsForFetches = WritableIntChunk.makeWritableChunk(chunkSize);
- equalValues = WritableBooleanChunk.makeWritableChunk(chunkSize);
- workingKeyChunks = getWritableKeyChunks(chunkSize);
- }
-
- private void resetSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.reset();
- sharedOverflowContext.reset();
- sharedProbeContext.reset();
- }
-
- private void closeSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.close();
- sharedOverflowContext.close();
- sharedProbeContext.close();
- }
-
- @Override
- public void close() {
- stateSourceFillContext.close();
- overflowFillContext.close();
- overflowOverflowFillContext.close();
- closeArray(workingFillContexts);
- closeArray(overflowContexts);
- closeArray(probeContexts);
- hashChunk.close();
- tableLocationsChunk.close();
- workingStateEntries.close();
- overflowLocationsToFetch.close();
- overflowPositionInWorkingChunk.close();
- overflowLocations.close();
- chunkPositionsForFetches.close();
- equalValues.close();
- closeArray(workingKeyChunks);
- closeSharedContexts();
- // region probe context close
- // endregion probe context close
- closeSharedContexts();
- }
- }
-
- public ProbeContext makeProbeContext(ColumnSource>[] probeSources,
- long maxSize
- // region makeProbeContext args
- // endregion makeProbeContext args
- ) {
- return new ProbeContext(probeSources, (int)Math.min(maxSize, CHUNK_SIZE)
- // region makeProbeContext arg pass
- // endregion makeProbeContext arg pass
- );
- }
-
- private void decorationProbe(ProbeContext pc
- , RowSequence probeIndex
- , final ColumnSource>[] probeSources
- // mixin prev
- , boolean usePrev
- // endmixin prev
- // region additional probe arguments
- , boolean remove
- , WritableIntChunk outputPositions
- , WritableIntChunk emptiedPositions
- // endregion additional probe arguments
- ) {
- // region probe start
- outputPositions.setSize(probeIndex.intSize());
- if (remove) {
- emptiedPositions.setSize(0);
- }
- // endregion probe start
- long hashSlotOffset = 0;
-
- try (final RowSequence.Iterator rsIt = probeIndex.getRowSequenceIterator();
- // region probe additional try resources
- // endregion probe additional try resources
- ) {
- //noinspection unchecked
- final Chunk [] sourceKeyChunks = new Chunk[keyColumnCount];
-
- // region probe initialization
- // endregion probe initialization
-
- while (rsIt.hasMore()) {
- // we reset shared contexts early to avoid carrying around state that can't be reused.
- pc.resetSharedContexts();
- final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(pc.chunkSize);
- final int chunkSize = chunkOk.intSize();
-
- // region probe loop initialization
- // endregion probe loop initialization
-
- // get our keys, hash them, and convert them to table locations
- // mixin prev
- if (usePrev) {
- getPrevKeyChunks(probeSources, pc.probeContexts, sourceKeyChunks, chunkOk);
- } else {
- // endmixin prev
- getKeyChunks(probeSources, pc.probeContexts, sourceKeyChunks, chunkOk);
- // mixin prev
- }
- // endmixin prev
- hashKeyChunks(pc.hashChunk, sourceKeyChunks);
- convertHashToTableLocations(pc.hashChunk, pc.tableLocationsChunk);
-
- // get the keys from the table
- fillKeys(pc.workingFillContexts, pc.workingKeyChunks, pc.tableLocationsChunk);
-
- // and the corresponding states
- // - if a value is empty; we don't care about it
- // - otherwise we check for equality; if we are equal, we have found our thing to set
- // (or to complain if we are already set)
- // - if we are not equal, then we are an overflow block
- stateSource.fillChunkUnordered(pc.stateSourceFillContext, pc.workingStateEntries, pc.tableLocationsChunk);
-
- // @StateChunkIdentityName@ from \QIntChunk\E
- IntChunkEquals.notEqual(pc.workingStateEntries, EMPTY_RIGHT_VALUE, pc.equalValues);
- checkKeyEquality(pc.equalValues, pc.workingKeyChunks, sourceKeyChunks);
-
- pc.overflowPositionInWorkingChunk.setSize(0);
- pc.overflowLocationsToFetch.setSize(0);
-
- for (int ii = 0; ii < pc.equalValues.size(); ++ii) {
- if (pc.equalValues.get(ii)) {
- // region probe main found
- final long tableLocation = pc.tableLocationsChunk.get(ii);
- final int outputPosition = pc.workingStateEntries.get(ii);
- outputPositions.set(ii, outputPosition);
- if (remove) {
- // decrement the row count
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 1) {
- emptiedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount - 1);
- }
- // endregion probe main found
- } else if (pc.workingStateEntries.get(ii) != EMPTY_RIGHT_VALUE) {
- // we must handle this as part of the overflow bucket
- pc.overflowPositionInWorkingChunk.add(ii);
- pc.overflowLocationsToFetch.add(pc.tableLocationsChunk.get(ii));
- } else {
- // region probe main not found
- throw new IllegalStateException("Failed to find main aggregation slot for key " + ChunkUtils.extractKeyStringFromChunks(keyChunkTypes, sourceKeyChunks, ii));
- // endregion probe main not found
- }
- }
-
- overflowLocationSource.fillChunkUnordered(pc.overflowFillContext, pc.overflowLocations, pc.overflowLocationsToFetch);
-
- while (pc.overflowLocationsToFetch.size() > 0) {
- pc.overflowLocationsToFetch.setSize(0);
- pc.chunkPositionsForFetches.setSize(0);
- for (int ii = 0; ii < pc.overflowLocations.size(); ++ii) {
- final int overflowLocation = pc.overflowLocations.get(ii);
- final int chunkPosition = pc.overflowPositionInWorkingChunk.get(ii);
-
- // if the overflow slot is null, this state is not responsive to the join so we can ignore it
- if (overflowLocation != QueryConstants.NULL_INT) {
- pc.overflowLocationsToFetch.add(overflowLocation);
- pc.chunkPositionsForFetches.add(chunkPosition);
- } else {
- // region probe overflow not found
- throw new IllegalStateException("Failed to find overflow aggregation slot for key " + ChunkUtils.extractKeyStringFromChunks(keyChunkTypes, sourceKeyChunks, chunkPosition));
- // endregion probe overflow not found
- }
- }
-
- // if the slot is non-null, then we need to fetch the overflow values for comparison
- fillOverflowKeys(pc.overflowContexts, pc.workingKeyChunks, pc.overflowLocationsToFetch);
-
- // region probe overflow state source fill
- // endregion probe overflow state source fill
-
- // now compare the value in our workingKeyChunks to the value in the sourceChunk
- checkLhsPermutedEquality(pc.chunkPositionsForFetches, sourceKeyChunks, pc.workingKeyChunks, pc.equalValues);
-
- // we write back into the overflowLocationsToFetch, so we can't set its size to zero. Instead
- // we overwrite the elements in the front of the chunk referenced by a position cursor
- int overflowPosition = 0;
- for (int ii = 0; ii < pc.equalValues.size(); ++ii) {
- final long overflowLocation = pc.overflowLocationsToFetch.get(ii);
- final int chunkPosition = pc.chunkPositionsForFetches.get(ii);
-
- if (pc.equalValues.get(ii)) {
- // region probe overflow found
- final int outputPosition = overflowStateSource.getUnsafe(overflowLocation);
- outputPositions.set(chunkPosition, outputPosition);
- if (remove) {
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 1) {
- emptiedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount - 1);
- }
- // endregion probe overflow found
- } else {
- // otherwise, we need to repeat the overflow calculation, with our next overflow fetch
- pc.overflowLocationsToFetch.set(overflowPosition, overflowLocation);
- pc.overflowPositionInWorkingChunk.set(overflowPosition, chunkPosition);
- overflowPosition++;
- }
- }
- pc.overflowLocationsToFetch.setSize(overflowPosition);
- pc.overflowPositionInWorkingChunk.setSize(overflowPosition);
-
- overflowOverflowLocationSource.fillChunkUnordered(pc.overflowOverflowFillContext, pc.overflowLocations, pc.overflowLocationsToFetch);
- }
-
- // region probe complete
- // endregion probe complete
- hashSlotOffset += chunkSize;
- }
-
- // region probe cleanup
- // endregion probe cleanup
- }
- // region probe final
- // endregion probe final
- }
- // endmixin decorationProbe
-
- private void convertHashToTableLocations(WritableIntChunk hashChunk, WritableLongChunk tablePositionsChunk) {
- // mixin rehash
- // NOTE that this mixin section is a bit ugly, we are spanning the two functions so that we can avoid using tableHashPivot and having the unused pivotPoint parameter
- convertHashToTableLocations(hashChunk, tablePositionsChunk, tableHashPivot);
- }
-
- private void convertHashToTableLocations(WritableIntChunk hashChunk, WritableLongChunk tablePositionsChunk, int pivotPoint) {
- // endmixin rehash
-
- // turn hash codes into indices within our table
- for (int ii = 0; ii < hashChunk.size(); ++ii) {
- final int hash = hashChunk.get(ii);
- // mixin rehash
- final int location = hashToTableLocation(pivotPoint, hash);
- // endmixin rehash
- // altmixin rehash: final int location = hashToTableLocation(hash);
- tablePositionsChunk.set(ii, location);
- }
- tablePositionsChunk.setSize(hashChunk.size());
- }
-
- private int hashToTableLocation(
- // mixin rehash
- int pivotPoint,
- // endmixin rehash
- int hash) {
- // altmixin rehash: final \
- int location = hash & (tableSize - 1);
- // mixin rehash
- if (location >= pivotPoint) {
- location -= (tableSize >> 1);
- }
- // endmixin rehash
- return location;
- }
-
- // region extraction functions
- @Override
- public ColumnSource[] getKeyHashTableSources() {
- final ColumnSource[] keyHashTableSources = new ColumnSource[keyColumnCount];
- for (int kci = 0; kci < keyColumnCount; ++kci) {
- // noinspection unchecked
- keyHashTableSources[kci] = new RedirectedColumnSource(resultIndexToHashSlot, new HashTableColumnSource(keySources[kci], overflowKeySources[kci]));
- }
- return keyHashTableSources;
- }
-
- @Override
- public int findPositionForKey(Object key) {
- int hash;
- if (chunkHashers.length == 1) {
- hash = chunkHashers[0].hashInitial(key);
- } else {
- final Object [] values = (Object[])key;
- hash = chunkHashers[0].hashInitial(values[0]);
- for (int ii = 1; ii < chunkHashers.length; ++ii) {
- hash = chunkHashers[ii].hashUpdate(hash, values[ii]);
- }
- }
-
- final int location = hashToTableLocation(tableHashPivot, hash);
-
- final int positionValue = stateSource.getUnsafe(location);
- if (positionValue == EMPTY_RIGHT_VALUE) {
- return -1;
- }
-
- if (checkKeyEquality(keySources, key, location)) {
- return positionValue;
- }
-
- int overflowLocation = overflowLocationSource.getUnsafe(location);
- while (overflowLocation != QueryConstants.NULL_INT) {
- if (checkKeyEquality(overflowKeySources, key, overflowLocation)) {
- return overflowStateSource.getUnsafe(overflowLocation);
- }
- overflowLocation = overflowOverflowLocationSource.getUnsafe(overflowLocation);
- }
-
- return -1;
- }
-
- private boolean checkKeyEquality(ArrayBackedColumnSource>[] keySources, Object key, int location) {
- if (keySources.length == 1) {
- return Objects.equals(key, keySources[0].get(location));
- }
- final Object [] keyValues = (Object[]) key;
- for (int ii = 0; ii < keySources.length; ++ii) {
- if (!Objects.equals(keyValues[ii], keySources[ii].get(location))) {
- return false;
- }
- }
- return true;
- }
-
- @Override
- public void startTrackingPrevValues() {
- resultIndexToHashSlot.startTrackingPrevValues();
- }
-
- @Override
- public void setRowSize(int outputPosition, long size) {
- rowCountSource.set(outputPosition, size);
- }
- // endregion extraction functions
-
- @NotNull
- private static ColumnSource.FillContext[] makeFillContexts(ColumnSource>[] keySources, final SharedContext sharedContext, int chunkSize) {
- final ColumnSource.FillContext[] workingFillContexts = new ColumnSource.FillContext[keySources.length];
- for (int ii = 0; ii < keySources.length; ++ii) {
- workingFillContexts[ii] = keySources[ii].makeFillContext(chunkSize, sharedContext);
- }
- return workingFillContexts;
- }
-
- private static ColumnSource.GetContext[] makeGetContexts(ColumnSource> [] sources, final SharedContext sharedState, int chunkSize) {
- final ColumnSource.GetContext[] contexts = new ColumnSource.GetContext[sources.length];
- for (int ii = 0; ii < sources.length; ++ii) {
- contexts[ii] = sources[ii].makeGetContext(chunkSize, sharedState);
- }
- return contexts;
- }
-
- @NotNull
- private WritableChunk[] getWritableKeyChunks(int chunkSize) {
- //noinspection unchecked
- final WritableChunk[] workingKeyChunks = new WritableChunk[keyChunkTypes.length];
- for (int ii = 0; ii < keyChunkTypes.length; ++ii) {
- workingKeyChunks[ii] = keyChunkTypes[ii].makeWritableChunk(chunkSize);
- }
- return workingKeyChunks;
- }
-
- @NotNull
- private ResettableWritableChunk[] getResettableWritableKeyChunks() {
- //noinspection unchecked
- final ResettableWritableChunk[] workingKeyChunks = new ResettableWritableChunk[keyChunkTypes.length];
- for (int ii = 0; ii < keyChunkTypes.length; ++ii) {
- workingKeyChunks[ii] = keyChunkTypes[ii].makeResettableWritableChunk();
- }
- return workingKeyChunks;
- }
-
- // region getStateValue
- // endregion getStateValue
-
- // region overflowLocationToHashLocation
- private static int overflowLocationToHashLocation(final int overflowSlot) {
- return HashTableColumnSource.overflowLocationToHashLocation(overflowSlot);
- }
-
- private static int hashLocationToOverflowLocation(final int hashLocation) {
- return HashTableColumnSource.hashLocationToOverflowLocation(hashLocation);
- }
-
- private static boolean isOverflowLocation(final long slot) {
- return HashTableColumnSource.isOverflowLocation(slot);
- }
-
- private static int chunkPositionToPendingState(final int position) {
- return -position - 1;
- }
-
- private static int pendingStateToChunkPosition(final int pendingState) {
- return -pendingState - 1;
- }
-
- private static boolean isPendingState(final int position) {
- return position < 0;
- }
- // endregion overflowLocationToHashLocation
-
-
- static int hashTableSize(long initialCapacity) {
- return (int)Math.max(MINIMUM_INITIAL_HASH_SIZE, Math.min(MAX_TABLE_SIZE, Long.highestOneBit(initialCapacity) * 2));
- }
-
-}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java
index ff4b059500e..8c6f5fd55a2 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.java
@@ -11,7 +11,6 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.by.alternatingcolumnsource.AlternatingColumnSource;
import io.deephaven.engine.table.impl.sources.IntegerArraySource;
-import io.deephaven.engine.table.impl.sources.LongArraySource;
import io.deephaven.engine.table.impl.sources.RedirectedColumnSource;
import io.deephaven.engine.table.impl.sources.immutable.ImmutableIntArraySource;
import io.deephaven.engine.table.impl.util.IntColumnSourceWritableRowRedirection;
@@ -20,39 +19,47 @@
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.jetbrains.annotations.NotNull;
public abstract class IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase
extends OperatorAggregationStateManagerOpenAddressedAlternateBase
implements IncrementalOperatorAggregationStateManager {
- // our state value used when nothing is there
+
+ /** Our state value used when nothing is there. */
protected static final int EMPTY_OUTPUT_POSITION = QueryConstants.NULL_INT;
- // the state value for the bucket, parallel to mainKeySources (the state is an output row key for the aggregation)
+ /**
+ * The state value for the bucket, parallel to mainKeySources (the state is an output row key for the aggregation).
+ */
protected ImmutableIntArraySource mainOutputPosition = new ImmutableIntArraySource();
- // the state value for the bucket, parallel to alternateKeySources (the state is an output row key for the
- // aggregation)
+ /**
+ * The state value for the bucket, parallel to alternateKeySources (the state is an output row key for the
+ * aggregation).
+ */
protected ImmutableIntArraySource alternateOutputPosition;
- // used as a row redirection for the output key sources, updated using the mainInsertMask to identify the main vs.
- // alternate values
+ /**
+ * Used as a row redirection for the output key sources, updated using the mainInsertMask to identify the main vs.
+ * alternate values.
+ */
protected final IntegerArraySource outputPositionToHashSlot = new IntegerArraySource();
- // how many values are in each state, addressed by output row key
- protected final LongArraySource rowCountSource = new LongArraySource();
-
- // state variables that exist as part of the update
+ /** State variables that exist as part of the update. */
protected MutableInt nextOutputPosition;
protected WritableIntChunk outputPositions;
- // output alternating column sources
+ /** Output alternating column sources. */
protected AlternatingColumnSource[] alternatingColumnSources;
- // the mask for insertion into the main table (this tells our alternating column sources which of the two sources
- // to access for a given key)
+ /**
+ * The mask for insertion into the main table (this tells our alternating column sources which of the two sources to
+ * access for a given key).
+ */
protected int mainInsertMask = 0;
- protected IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase(ColumnSource>[] tableKeySources,
+ protected IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase(
+ ColumnSource>[] tableKeySources,
int tableSize,
double maximumLoadFactor) {
super(tableKeySources, tableSize, maximumLoadFactor);
@@ -93,15 +100,19 @@ public SafeCloseable makeAggregationStateBuildContext(ColumnSource>[] buildSou
}
@Override
- public void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources,
- MutableInt nextOutputPosition, WritableIntChunk outputPositions) {
+ public void add(
+ @NotNull final SafeCloseable bc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final MutableInt nextOutputPosition,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
if (rowSequence.isEmpty()) {
return;
}
this.nextOutputPosition = nextOutputPosition;
this.outputPositions = outputPositions;
- buildTable((BuildContext) bc, rowSequence, sources, true, this::build);
+ buildTable((BuildContext) bc, rowSequence, sources, this::build);
this.outputPositions = null;
this.nextOutputPosition = null;
}
@@ -109,7 +120,6 @@ public void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>
@Override
public void onNextChunk(int size) {
outputPositionToHashSlot.ensureCapacity(nextOutputPosition.intValue() + size, false);
- rowCountSource.ensureCapacity(nextOutputPosition.intValue() + size, false);
}
@Override
@@ -138,71 +148,47 @@ public ColumnSource[] getKeyHashTableSources() {
@Override
public void beginUpdateCycle() {
- // at the beginning of the update cycle, we always want to do some rehash work so that we can eventually ditch
- // the alternate table
+ // Once we're past initial state processing, we want to rehash incrementally.
+ fullRehash = false;
+ // At the beginning of the update cycle, we always want to do some rehash work so that we can eventually ditch
+ // the alternate table.
if (rehashPointer > 0) {
rehashInternalPartial(CHUNK_SIZE);
}
}
- @Override
- public void addForUpdate(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources,
- MutableInt nextOutputPosition, WritableIntChunk outputPositions,
- final WritableIntChunk reincarnatedPositions) {
- outputPositions.setSize(rowSequence.intSize());
- reincarnatedPositions.setSize(0);
- if (rowSequence.isEmpty()) {
- return;
- }
- this.nextOutputPosition = nextOutputPosition;
- this.outputPositions = outputPositions;
- buildTable((BuildContext) bc, rowSequence, sources, false,
- ((chunkOk, sourceKeyChunks) -> buildForUpdate(chunkOk, sourceKeyChunks, reincarnatedPositions)));
- this.outputPositions = null;
- this.nextOutputPosition = null;
- }
-
- protected abstract void buildForUpdate(RowSequence chunkOk, Chunk[] sourceKeyChunks,
- WritableIntChunk reincarnatedPositions);
+ protected abstract void probe(RowSequence chunkOk, Chunk[] sourceKeyChunks);
@Override
- public void remove(final SafeCloseable pc, RowSequence rowSequence, ColumnSource>[] sources,
- WritableIntChunk outputPositions, final WritableIntChunk emptiedPositions) {
+ public void remove(
+ @NotNull final SafeCloseable pc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
- emptiedPositions.setSize(0);
if (rowSequence.isEmpty()) {
return;
}
this.outputPositions = outputPositions;
- probeTable((ProbeContext) pc, rowSequence, true, sources,
- (chunkOk, sourceKeyChunks) -> IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.this
- .doRemoveProbe(chunkOk, sourceKeyChunks, emptiedPositions));
+ probeTable((ProbeContext) pc, rowSequence, true, sources, this::probe);
this.outputPositions = null;
}
- protected abstract void doRemoveProbe(RowSequence chunkOk, Chunk[] sourceKeyChunks,
- WritableIntChunk emptiedPositions);
-
@Override
- public void findModifications(final SafeCloseable pc, RowSequence rowSequence, ColumnSource>[] sources,
- WritableIntChunk outputPositions) {
+ public void findModifications(
+ @NotNull final SafeCloseable pc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
if (rowSequence.isEmpty()) {
return;
}
this.outputPositions = outputPositions;
- probeTable((ProbeContext) pc, rowSequence, false, sources,
- IncrementalChunkedOperatorAggregationStateManagerOpenAddressedBase.this::doModifyProbe);
+ probeTable((ProbeContext) pc, rowSequence, false, sources, this::probe);
this.outputPositions = null;
}
- protected abstract void doModifyProbe(RowSequence chunkOk, Chunk[] sourceKeyChunks);
-
@Override
public void startTrackingPrevValues() {}
-
- @Override
- public void setRowSize(int outputPosition, long size) {
- rowCountSource.set(outputPosition, size);
- }
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java
index f923e32e346..f6ce5d37929 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalChunkedOperatorAggregationStateManagerTypedBase.java
@@ -3,13 +3,11 @@
*/
package io.deephaven.engine.table.impl.by;
-import io.deephaven.base.verify.Assert;
import io.deephaven.chunk.WritableIntChunk;
import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.impl.sources.IntegerArraySource;
-import io.deephaven.engine.table.impl.sources.LongArraySource;
import io.deephaven.engine.table.impl.sources.RedirectedColumnSource;
import io.deephaven.engine.table.impl.util.IntColumnSourceWritableRowRedirection;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -18,6 +16,7 @@
import io.deephaven.util.QueryConstants;
import io.deephaven.util.SafeCloseable;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.jetbrains.annotations.NotNull;
/**
* Incremental aggregation state manager that is extended by code generated typed hashers.
@@ -38,12 +37,8 @@ public abstract class IncrementalChunkedOperatorAggregationStateManagerTypedBase
// used as a row redirection for the output key sources
private final IntegerArraySource outputPositionToHashSlot = new IntegerArraySource();
- // how many values are in each state, addressed by output row key
- private final LongArraySource rowCountSource = new LongArraySource();
-
- // handlers for use during updates
- private final AddInitialHandler addInitialHandler = new AddInitialHandler();
- private final AddUpdateHandler addUpdateHandler = new AddUpdateHandler();
+ // handlers for use during initialization and updates
+ private final AddHandler addHandler = new AddHandler();
private final RemoveHandler removeHandler = new RemoveHandler();
private final ModifyHandler modifyHandler = new ModifyHandler();
@@ -59,15 +54,19 @@ public SafeCloseable makeAggregationStateBuildContext(ColumnSource>[] buildSou
}
@Override
- public void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources,
- MutableInt nextOutputPosition, WritableIntChunk outputPositions) {
+ public void add(
+ @NotNull final SafeCloseable bc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final MutableInt nextOutputPosition,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
if (rowSequence.isEmpty()) {
return;
}
- addInitialHandler.reset(nextOutputPosition, outputPositions);
- buildTable(addInitialHandler, (BuildContext) bc, rowSequence, sources);
- addInitialHandler.reset();
+ addHandler.reset(nextOutputPosition, outputPositions);
+ buildTable(addHandler, (BuildContext) bc, rowSequence, sources);
+ addHandler.reset();
}
@Override
@@ -90,42 +89,26 @@ public void startTrackingPrevValues() {}
public void beginUpdateCycle() {}
@Override
- public void setRowSize(int outputPosition, long size) {
- rowCountSource.set(outputPosition, size);
- }
-
- @Override
- public void addForUpdate(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources,
- MutableInt nextOutputPosition, WritableIntChunk outputPositions,
- WritableIntChunk reincarnatedPositions) {
- outputPositions.setSize(rowSequence.intSize());
- reincarnatedPositions.setSize(0);
- if (rowSequence.isEmpty()) {
- return;
- }
- addUpdateHandler.reset(nextOutputPosition, outputPositions, reincarnatedPositions);
- buildTable(addUpdateHandler, (BuildContext) bc,
- rowSequence, sources);
- addUpdateHandler.reset();
- }
-
- @Override
- public void remove(final SafeCloseable pc, RowSequence rowSequence, ColumnSource>[] sources,
- WritableIntChunk outputPositions, WritableIntChunk emptiedPositions) {
+ public void remove(
+ @NotNull final SafeCloseable pc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
- emptiedPositions.setSize(0);
if (rowSequence.isEmpty()) {
return;
}
- removeHandler.reset(outputPositions, emptiedPositions);
- probeTable(removeHandler, (ProbeContext) pc, rowSequence, true,
- sources);
+ removeHandler.reset(outputPositions);
+ probeTable(removeHandler, (ProbeContext) pc, rowSequence, true, sources);
removeHandler.reset();
}
@Override
- public void findModifications(final SafeCloseable pc, RowSequence rowSequence, ColumnSource>[] sources,
- WritableIntChunk outputPositions) {
+ public void findModifications(
+ @NotNull final SafeCloseable pc,
+ @NotNull final RowSequence rowSequence,
+ @NotNull final ColumnSource>[] sources,
+ @NotNull final WritableIntChunk outputPositions) {
outputPositions.setSize(rowSequence.intSize());
if (rowSequence.isEmpty()) {
return;
@@ -136,20 +119,23 @@ public void findModifications(final SafeCloseable pc, RowSequence rowSequence, C
}
@Override
- protected void ensureMainState(int tableSize) {
+ protected void ensureMainState(final int tableSize) {
mainOutputPosition.ensureCapacity(tableSize);
}
@Override
- protected void ensureOverflowState(int newCapacity) {
+ protected void ensureOverflowState(final int newCapacity) {
overflowOutputPosition.ensureCapacity(newCapacity);
}
- private abstract class AddHandler extends HashHandler.BuildHandler {
+ private class AddHandler extends HashHandler.BuildHandler {
+
MutableInt outputPosition;
WritableIntChunk outputPositions;
- void reset(MutableInt nextOutputPosition, WritableIntChunk outputPositions) {
+ void reset(
+ @NotNull final MutableInt nextOutputPosition,
+ @NotNull final WritableIntChunk outputPositions) {
this.outputPosition = nextOutputPosition;
this.outputPositions = outputPositions;
}
@@ -160,159 +146,89 @@ void reset() {
}
@Override
- public void doMainInsert(int tableLocation, int chunkPosition) {
+ public void doMainInsert(final int tableLocation, final int chunkPosition) {
final int nextOutputPosition = outputPosition.getAndIncrement();
outputPositions.set(chunkPosition, nextOutputPosition);
mainOutputPosition.set(tableLocation, nextOutputPosition);
outputPositionToHashSlot.set(nextOutputPosition, tableLocation);
- rowCountSource.set(nextOutputPosition, 1L);
}
@Override
- public void doMoveMain(int oldTableLocation, int newTableLocation) {
+ public void doMainFound(final int tableLocation, final int chunkPosition) {
+ final int outputPosition = mainOutputPosition.getUnsafe(tableLocation);
+ outputPositions.set(chunkPosition, outputPosition);
+ }
+
+ @Override
+ public void doMoveMain(final int oldTableLocation, final int newTableLocation) {
final int outputPosition = mainOutputPosition.getUnsafe(newTableLocation);
outputPositionToHashSlot.set(outputPosition, newTableLocation);
}
@Override
- public void doPromoteOverflow(int overflowLocation, int mainInsertLocation) {
+ public void doPromoteOverflow(final int overflowLocation, final int mainInsertLocation) {
outputPositionToHashSlot.set(mainOutputPosition.getUnsafe(mainInsertLocation), mainInsertLocation);
}
@Override
- public void onNextChunk(int size) {
+ public void onNextChunk(final int size) {
outputPositionToHashSlot.ensureCapacity(outputPosition.intValue() + size);
- rowCountSource.ensureCapacity(outputPosition.intValue() + size);
}
@Override
- public void doOverflowInsert(int overflowLocation, int chunkPosition) {
+ public void doOverflowInsert(final int overflowLocation, final int chunkPosition) {
final int nextOutputPosition = outputPosition.getAndIncrement();
outputPositions.set(chunkPosition, nextOutputPosition);
overflowOutputPosition.set(overflowLocation, nextOutputPosition);
outputPositionToHashSlot.set(nextOutputPosition,
HashTableColumnSource.overflowLocationToHashLocation(overflowLocation));
- rowCountSource.set(nextOutputPosition, 1L);
- }
- }
-
- class AddInitialHandler extends AddHandler {
- @Override
- public void doMainFound(int tableLocation, int chunkPosition) {
- final int outputPosition = mainOutputPosition.getUnsafe(tableLocation);
- outputPositions.set(chunkPosition, outputPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- rowCountSource.set(outputPosition, oldRowCount + 1);
}
@Override
- public void doOverflowFound(int overflowLocation, int chunkPosition) {
+ public void doOverflowFound(final int overflowLocation, final int chunkPosition) {
final int outputPosition = overflowOutputPosition.getUnsafe(overflowLocation);
outputPositions.set(chunkPosition, outputPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- rowCountSource.set(outputPosition, oldRowCount + 1);
- }
- }
-
- class AddUpdateHandler extends AddHandler {
- private WritableIntChunk reincarnatedPositions;
-
- public void reset(MutableInt nextOutputPosition, WritableIntChunk outputPositions,
- WritableIntChunk reincarnatedPositions) {
- super.reset(nextOutputPosition, outputPositions);
- this.reincarnatedPositions = reincarnatedPositions;
- }
-
- void reset() {
- super.reset();
- reincarnatedPositions = null;
- }
-
- @Override
- public void doMainFound(int tableLocation, int chunkPosition) {
- final int outputPosition = mainOutputPosition.getUnsafe(tableLocation);
- outputPositions.set(chunkPosition, outputPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.geqZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 0) {
- reincarnatedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount + 1);
- }
-
- @Override
- public void doOverflowFound(int overflowLocation, int chunkPosition) {
- final int outputPosition = overflowOutputPosition.getUnsafe(overflowLocation);
- outputPositions.set(chunkPosition, outputPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.geqZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 0) {
- reincarnatedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount + 1);
}
}
class RemoveHandler extends HashHandler.ProbeHandler {
+
private WritableIntChunk outputPositions;
- private WritableIntChunk emptiedPositions;
- public void reset(WritableIntChunk outputPositions, WritableIntChunk emptiedPositions) {
+ public void reset(@NotNull final WritableIntChunk outputPositions) {
this.outputPositions = outputPositions;
- this.emptiedPositions = emptiedPositions;
}
public void reset() {
this.outputPositions = null;
- this.emptiedPositions = null;
}
@Override
- public void doMainFound(int tableLocation, int chunkPosition) {
+ public void doMainFound(final int tableLocation, final int chunkPosition) {
final int outputPosition = mainOutputPosition.getUnsafe(tableLocation);
outputPositions.set(chunkPosition, outputPosition);
-
- // decrement the row count
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 1) {
- emptiedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount - 1);
}
@Override
- public void doOverflowFound(int overflowLocation, int chunkPosition) {
+ public void doOverflowFound(final int overflowLocation, final int chunkPosition) {
final int outputPosition = overflowOutputPosition.getUnsafe(overflowLocation);
outputPositions.set(chunkPosition, outputPosition);
-
- final long oldRowCount = rowCountSource.getUnsafe(outputPosition);
- Assert.gtZero(oldRowCount, "oldRowCount");
- if (oldRowCount == 1) {
- emptiedPositions.add(outputPosition);
- }
- rowCountSource.set(outputPosition, oldRowCount - 1);
}
@Override
- public void onNextChunk(int size) {}
+ public void onNextChunk(final int size) {}
@Override
- public void doMissing(int chunkPosition) {
+ public void doMissing(final int chunkPosition) {
throw new IllegalStateException();
}
}
class ModifyHandler extends HashHandler.ProbeHandler {
+
private WritableIntChunk outputPositions;
- public void reset(WritableIntChunk outputPositions) {
+ public void reset(@NotNull final WritableIntChunk outputPositions) {
this.outputPositions = outputPositions;
}
@@ -321,22 +237,22 @@ public void reset() {
}
@Override
- public void doMainFound(int tableLocation, int chunkPosition) {
+ public void doMainFound(final int tableLocation, final int chunkPosition) {
final int outputPosition = mainOutputPosition.getUnsafe(tableLocation);
outputPositions.set(chunkPosition, outputPosition);
}
@Override
- public void doOverflowFound(int overflowLocation, int chunkPosition) {
+ public void doOverflowFound(final int overflowLocation, final int chunkPosition) {
final int outputPosition = overflowOutputPosition.getUnsafe(overflowLocation);
outputPositions.set(chunkPosition, outputPosition);
}
@Override
- public void onNextChunk(int size) {}
+ public void onNextChunk(final int size) {}
@Override
- public void doMissing(int chunkPosition) {
+ public void doMissing(final int chunkPosition) {
throw new IllegalStateException();
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalOperatorAggregationStateManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalOperatorAggregationStateManager.java
index 350c8dc1c2b..366e46214cb 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalOperatorAggregationStateManager.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/IncrementalOperatorAggregationStateManager.java
@@ -8,7 +8,6 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.util.SafeCloseable;
-import org.apache.commons.lang3.mutable.MutableInt;
/**
* Interface for ChunkedOperatorAggregationHelper to process incremental updates.
@@ -25,11 +24,9 @@ public interface IncrementalOperatorAggregationStateManager extends OperatorAggr
void startTrackingPrevValues();
- void setRowSize(int outputPosition, long size);
+ void remove(SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources,
+ WritableIntChunk outputPositions);
- void addForUpdate(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources, MutableInt nextOutputPosition, WritableIntChunk outputPositions, WritableIntChunk reincarnatedPositions);
-
- void remove(final SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources, WritableIntChunk outputPositions, WritableIntChunk emptiedPositions);
-
- void findModifications(final SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources, WritableIntChunk outputPositions);
+ void findModifications(SafeCloseable pc, RowSequence rowSequence, ColumnSource> [] sources,
+ WritableIntChunk outputPositions);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/NoopStateChangeRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/NoopStateChangeRecorder.java
new file mode 100644
index 00000000000..ca0ba2a349f
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/NoopStateChangeRecorder.java
@@ -0,0 +1,18 @@
+package io.deephaven.engine.table.impl.by;
+
+import java.util.function.LongConsumer;
+
+/**
+ * Re-usable support for not recording reincarnated and emptied states in incremental aggregation processing,
+ * for operators that never process any removes.
+ */
+class NoopStateChangeRecorder implements StateChangeRecorder {
+
+ @Override
+ public final void startRecording(
+ final LongConsumer reincarnatedDestinationCallback,
+ final LongConsumer emptiedDestinationCallback) {}
+
+ @Override
+ public final void finishRecording() {}
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManager.java
index a6853eb3973..a41ff6edcd4 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManager.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManager.java
@@ -12,6 +12,8 @@
interface OperatorAggregationStateManager {
+ int maxTableSize();
+
SafeCloseable makeAggregationStateBuildContext(ColumnSource>[] buildSources, long maxSize);
void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources, MutableInt nextOutputPosition, WritableIntChunk outputPositions);
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedAlternateBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedAlternateBase.java
index 230c95d7041..9ee4eac83b8 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedAlternateBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedAlternateBase.java
@@ -14,32 +14,39 @@
import org.apache.commons.lang3.mutable.MutableInt;
import static io.deephaven.engine.table.impl.util.TypedHasherUtil.*;
-import static io.deephaven.util.SafeCloseable.closeArray;
public abstract class OperatorAggregationStateManagerOpenAddressedAlternateBase
implements OperatorAggregationStateManager {
public static final int CHUNK_SIZE = ChunkedOperatorAggregationHelper.CHUNK_SIZE;
private static final long MAX_TABLE_SIZE = 1 << 30; // maximum array size
- // the number of slots in our table
+ /** The number of slots in our table. */
protected int tableSize;
- // the number of slots in our alternate table, to start with "1" is a lie, but rehashPointer is zero; so our
- // location value is positive and can be compared against rehashPointer safely
+
+ /**
+ * The number of slots in our alternate table, to start with "1" is a lie, but rehashPointer is zero; so our
+ * location value is positive and can be compared against rehashPointer safely
+ */
protected int alternateTableSize = 1;
- // how much of the alternate sources are necessary to rehash?
+ /** Should we rehash the entire table fully ({@code true}) or incrementally ({@code false})? */
+ protected boolean fullRehash = true;
+
+ /** How much of the alternate sources are necessary to rehash? */
protected int rehashPointer = 0;
protected long numEntries = 0;
- // the table will be rehashed to a load factor of targetLoadFactor if our loadFactor exceeds maximumLoadFactor
- // or if it falls below minimum load factor we will instead contract the table
+ /**
+ * The table will be rehashed to a load factor of targetLoadFactor if our loadFactor exceeds maximumLoadFactor or if
+ * it falls below minimum load factor we will instead contract the table.
+ */
private final double maximumLoadFactor;
- // the keys for our hash entries
+ /** The keys for our hash entries. */
protected final WritableColumnSource[] mainKeySources;
- // the keys for our hash entries, for the old alternative smaller table
+ /** The keys for our hash entries, for the old alternative smaller table. */
protected final ColumnSource[] alternateKeySources;
protected OperatorAggregationStateManagerOpenAddressedAlternateBase(ColumnSource>[] tableKeySources,
@@ -61,6 +68,11 @@ protected OperatorAggregationStateManagerOpenAddressedAlternateBase(ColumnSource
this.maximumLoadFactor = maximumLoadFactor;
}
+ @Override
+ public final int maxTableSize() {
+ return Math.toIntExact(MAX_TABLE_SIZE);
+ }
+
protected abstract void build(RowSequence rowSequence, Chunk[] sourceKeyChunks);
public static class BuildContext extends BuildOrProbeContext {
@@ -85,7 +97,6 @@ protected void buildTable(
final BuildContext bc,
final RowSequence buildRows,
final ColumnSource>[] buildSources,
- final boolean fullRehash,
final BuildHandler buildHandler) {
try (final RowSequence.Iterator rsIt = buildRows.getRowSequenceIterator()) {
// noinspection unchecked
@@ -95,7 +106,7 @@ protected void buildTable(
final RowSequence chunkOk = rsIt.getNextRowSequenceWithLength(bc.chunkSize);
final int nextChunkSize = chunkOk.intSize();
onNextChunk(nextChunkSize);
- while (doRehash(fullRehash, bc.rehashCredits, nextChunkSize)) {
+ while (doRehash(bc.rehashCredits, nextChunkSize)) {
migrateFront();
}
@@ -152,12 +163,11 @@ public interface BuildHandler {
}
/**
- * @param fullRehash should we rehash the entire table (if false, we rehash incrementally)
* @param rehashCredits the number of entries this operation has rehashed (input/output)
* @param nextChunkSize the size of the chunk we are processing
* @return true if a front migration is required
*/
- public boolean doRehash(boolean fullRehash, MutableInt rehashCredits, int nextChunkSize) {
+ public boolean doRehash(MutableInt rehashCredits, int nextChunkSize) {
if (rehashPointer > 0) {
final int requiredRehash = nextChunkSize - rehashCredits.intValue();
if (requiredRehash <= 0) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedBase.java
index 0cb7b3bcc99..aa363f5fe02 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerOpenAddressedBase.java
@@ -48,6 +48,11 @@ protected OperatorAggregationStateManagerOpenAddressedBase(ColumnSource>[] tab
this.maximumLoadFactor = maximumLoadFactor;
}
+ @Override
+ public final int maxTableSize() {
+ return Math.toIntExact(MAX_TABLE_SIZE);
+ }
+
protected abstract void build(RowSequence rowSequence, Chunk[] sourceKeyChunks);
BuildContext makeBuildContext(ColumnSource>[] buildSources, long maxSize) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerTypedBase.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerTypedBase.java
index 211249f3944..125572d1796 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerTypedBase.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/OperatorAggregationStateManagerTypedBase.java
@@ -20,6 +20,11 @@ public abstract class OperatorAggregationStateManagerTypedBase
public static final int CHUNK_SIZE = ChunkedOperatorAggregationHelper.CHUNK_SIZE;
private static final long MAX_TABLE_SIZE = HashTableColumnSource.MINIMUM_OVERFLOW_HASH_SLOT;
+ @Override
+ public final int maxTableSize() {
+ return Math.toIntExact(MAX_TABLE_SIZE);
+ }
+
// the number of slots in our table
private int tableSize;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java
index d3520401a1b..daa30528726 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/SortedFirstOrLastChunkedOperator.java
@@ -6,6 +6,7 @@
import io.deephaven.chunk.attributes.ChunkLengths;
import io.deephaven.chunk.attributes.ChunkPositions;
import io.deephaven.chunk.attributes.Values;
+import io.deephaven.engine.rowset.RowSequence;
import io.deephaven.engine.table.impl.SortingOrder;
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.MatchPair;
@@ -26,7 +27,9 @@
import java.util.Map;
import java.util.function.Supplier;
-public class SortedFirstOrLastChunkedOperator implements IterativeChunkedAggregationOperator {
+public class SortedFirstOrLastChunkedOperator
+ extends BasicStateChangeRecorder
+ implements IterativeChunkedAggregationOperator {
private final ChunkType chunkType;
private final boolean isFirst;
private final Supplier ssaFactory;
@@ -416,6 +419,9 @@ private boolean addSortedChunk(Chunk values, LongChunk indices,
ssa.insert(values, indices);
final long newValue = isFirst ? ssa.getFirst() : ssa.getLast();
final long oldValue = redirections.getAndSetUnsafe(destination, newValue);
+ if (oldValue == RowSequence.NULL_ROW_KEY && newValue != RowSequence.NULL_ROW_KEY) {
+ onReincarnated(destination);
+ }
return oldValue != newValue;
}
@@ -432,6 +438,9 @@ private boolean removeSortedChunk(Chunk values, LongChunk indic
ssa.remove(values, indices);
final long newValue = isFirst ? ssa.getFirst() : ssa.getLast();
final long oldValue = redirections.getAndSetUnsafe(destination, newValue);
+ if (oldValue != RowSequence.NULL_ROW_KEY && newValue == RowSequence.NULL_ROW_KEY) {
+ onEmptied(destination);
+ }
return oldValue != newValue;
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StateChangeRecorder.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StateChangeRecorder.java
new file mode 100644
index 00000000000..7e55434ba74
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StateChangeRecorder.java
@@ -0,0 +1,24 @@
+package io.deephaven.engine.table.impl.by;
+
+import java.util.function.LongConsumer;
+
+/**
+ * Interface for recording reincarnated and emptied states in incremental aggregation processing.
+ */
+interface StateChangeRecorder {
+
+ /**
+ * Set {@link LongConsumer callbacks} that should be used to record destinations that have transitioned from empty
+ * to non-empty ({@code reincarnatedDestinationCallback}) or non-empty to empty
+ * ({@code emptiedDestinationCallback}).
+ *
+ * @param reincarnatedDestinationCallback Consumer for destinations that have gone from empty to non-empty
+ * @param emptiedDestinationCallback Consumer for destinations that have gone from non-empty to empty
+ */
+ void startRecording(LongConsumer reincarnatedDestinationCallback, LongConsumer emptiedDestinationCallback);
+
+ /**
+ * Remove callbacks and stop state change recording.
+ */
+ void finishRecording();
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManager.java
deleted file mode 100644
index 7f6f1f7352a..00000000000
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/by/StaticChunkedOperatorAggregationStateManager.java
+++ /dev/null
@@ -1,1414 +0,0 @@
-/**
- * Copyright (c) 2016-2022 Deephaven Data Labs and Patent Pending
- */
-package io.deephaven.engine.table.impl.by;
-
-import io.deephaven.base.verify.Require;
-import io.deephaven.base.verify.Assert;
-import io.deephaven.chunk.*;
-import io.deephaven.chunk.attributes.Any;
-import io.deephaven.chunk.attributes.ChunkPositions;
-import io.deephaven.chunk.attributes.HashCodes;
-import io.deephaven.chunk.attributes.Values;
-import io.deephaven.engine.rowset.*;
-import io.deephaven.engine.table.*;
-import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
-import io.deephaven.engine.rowset.chunkattributes.RowKeys;
-import io.deephaven.util.QueryConstants;
-import io.deephaven.chunk.util.hashing.*;
-// this is ugly to have twice, but we do need it twice for replication
-// @StateChunkIdentityName@ from \QIntChunk\E
-import io.deephaven.chunk.util.hashing.IntChunkEquals;
-import io.deephaven.engine.table.impl.sort.permute.PermuteKernel;
-import io.deephaven.engine.table.impl.sort.timsort.LongIntTimsortKernel;
-import io.deephaven.engine.table.impl.sources.*;
-import io.deephaven.engine.table.impl.util.*;
-
-// mixin rehash
-import java.util.Arrays;
-import io.deephaven.engine.table.impl.sort.permute.IntPermuteKernel;
-// @StateChunkTypeEnum@ from \QInt\E
-import io.deephaven.engine.table.impl.sort.permute.IntPermuteKernel;
-import io.deephaven.engine.table.impl.util.compact.IntCompactKernel;
-import io.deephaven.engine.table.impl.util.compact.LongCompactKernel;
-// endmixin rehash
-
-import io.deephaven.util.SafeCloseableArray;
-import org.jetbrains.annotations.NotNull;
-
-// region extra imports
-import io.deephaven.engine.table.impl.HashTableAnnotations;
-import io.deephaven.util.SafeCloseable;
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import java.util.Objects;
-// endregion extra imports
-
-import static io.deephaven.util.SafeCloseable.closeArray;
-
-// region class visibility
-public
-// endregion class visibility
-class StaticChunkedOperatorAggregationStateManager
- // region extensions
- implements OperatorAggregationStateManager
- // endregion extensions
-{
- // region constants
- public static final int CHUNK_SIZE = ChunkedOperatorAggregationHelper.CHUNK_SIZE;
- private static final int MINIMUM_INITIAL_HASH_SIZE = CHUNK_SIZE;
- private static final long MAX_TABLE_SIZE = HashTableColumnSource.MINIMUM_OVERFLOW_HASH_SLOT;
- // endregion constants
-
- // mixin rehash
- static final double DEFAULT_MAX_LOAD_FACTOR = 0.75;
- static final double DEFAULT_TARGET_LOAD_FACTOR = 0.70;
- // endmixin rehash
-
- // region preamble variables
- // endregion preamble variables
-
- @HashTableAnnotations.EmptyStateValue
- // @NullStateValue@ from \QQueryConstants.NULL_INT\E, @StateValueType@ from \Qint\E
- private static final int EMPTY_RIGHT_VALUE = QueryConstants.NULL_INT;
-
- // mixin getStateValue
- // region overflow pivot
- // endregion overflow pivot
- // endmixin getStateValue
-
- // the number of slots in our table
- // mixin rehash
- private int tableSize;
- // endmixin rehash
- // altmixin rehash: private final int tableSize;
-
- // how many key columns we have
- private final int keyColumnCount;
-
- // mixin rehash
- private long numEntries = 0;
-
- /** Our table size must be 2^L (i.e. a power of two); and the pivot is between 2^(L-1) and 2^L.
- *
- * When hashing a value, if hashCode % 2^L < tableHashPivot; then the destination location is hashCode % 2^L.
- * If hashCode % 2^L >= tableHashPivot, then the destination location is hashCode % 2^(L-1). Once the pivot reaches
- * the table size, we can simply double the table size and repeat the process.
- *
- * This has the effect of only using hash table locations < hashTablePivot. When we want to expand the table
- * we can move some of the entries from the location {@code tableHashPivot - 2^(L-1)} to tableHashPivot. This
- * provides for incremental expansion of the hash table, without the need for a full rehash.
- */
- private int tableHashPivot;
-
- // the table will be rehashed to a load factor of targetLoadFactor if our loadFactor exceeds maximumLoadFactor
- // or if it falls below minimum load factor we will instead contract the table
- private double targetLoadFactor = DEFAULT_TARGET_LOAD_FACTOR;
- private double maximumLoadFactor = DEFAULT_MAX_LOAD_FACTOR;
- // TODO: We do not yet support contraction
- // private final double minimumLoadFactor = 0.5;
-
- private final IntegerArraySource freeOverflowLocations = new IntegerArraySource();
- private int freeOverflowCount = 0;
- // endmixin rehash
-
- // the keys for our hash entries
- private final ArrayBackedColumnSource>[] keySources;
- // the location of any overflow entry in this bucket
- private final IntegerArraySource overflowLocationSource = new IntegerArraySource();
-
- // we are going to also reuse this for our state entry, so that we do not need additional storage
- @HashTableAnnotations.StateColumnSource
- // @StateColumnSourceType@ from \QIntegerArraySource\E
- private final IntegerArraySource stateSource
- // @StateColumnSourceConstructor@ from \QIntegerArraySource()\E
- = new IntegerArraySource();
-
- // the keys for overflow
- private int nextOverflowLocation = 0;
- private final ArrayBackedColumnSource> [] overflowKeySources;
- // the location of the next key in an overflow bucket
- private final IntegerArraySource overflowOverflowLocationSource = new IntegerArraySource();
- // the overflow buckets for the state source
- @HashTableAnnotations.OverflowStateColumnSource
- // @StateColumnSourceType@ from \QIntegerArraySource\E
- private final IntegerArraySource overflowStateSource
- // @StateColumnSourceConstructor@ from \QIntegerArraySource()\E
- = new IntegerArraySource();
-
- // the type of each of our key chunks
- private final ChunkType[] keyChunkTypes;
-
- // the operators for hashing and various equality methods
- private final ChunkHasher[] chunkHashers;
- private final ChunkEquals[] chunkEquals;
- private final PermuteKernel[] chunkCopiers;
-
- // mixin rehash
- // If we have objects in our key columns, then we should null them out if we delete an overflow row, this only
- // applies to ObjectArraySources, for primitives we are content to leave the dead entries in the tables, because
- // they will not affect GC.
- private final ObjectArraySource>[] overflowKeyColumnsToNull;
- // endmixin rehash
-
- // region extra variables
- private final IntegerArraySource outputPositionToHashSlot = new IntegerArraySource();
- // endregion extra variables
-
- // region constructor visibility
- // endregion constructor visibility
- StaticChunkedOperatorAggregationStateManager(ColumnSource>[] tableKeySources
- , int tableSize
- // region constructor arguments
- , double maximumLoadFactor
- , double targetLoadFactor
- // endregion constructor arguments
- ) {
- // region super
- // endregion super
- keyColumnCount = tableKeySources.length;
-
- this.tableSize = tableSize;
- Require.leq(tableSize, "tableSize", MAX_TABLE_SIZE);
- Require.gtZero(tableSize, "tableSize");
- Require.eq(Integer.bitCount(tableSize), "Integer.bitCount(tableSize)", 1);
- // mixin rehash
- this.tableHashPivot = tableSize;
- // endmixin rehash
-
- overflowKeySources = new ArrayBackedColumnSource[keyColumnCount];
- keySources = new ArrayBackedColumnSource[keyColumnCount];
-
- keyChunkTypes = new ChunkType[keyColumnCount];
- chunkHashers = new ChunkHasher[keyColumnCount];
- chunkEquals = new ChunkEquals[keyColumnCount];
- chunkCopiers = new PermuteKernel[keyColumnCount];
-
- for (int ii = 0; ii < keyColumnCount; ++ii) {
- // the sources that we will use to store our hash table
- keySources[ii] = ArrayBackedColumnSource.getMemoryColumnSource(tableSize, tableKeySources[ii].getType());
- keyChunkTypes[ii] = tableKeySources[ii].getChunkType();
-
- overflowKeySources[ii] = ArrayBackedColumnSource.getMemoryColumnSource(CHUNK_SIZE, tableKeySources[ii].getType());
-
- chunkHashers[ii] = ChunkHasher.makeHasher(keyChunkTypes[ii]);
- chunkEquals[ii] = ChunkEquals.makeEqual(keyChunkTypes[ii]);
- chunkCopiers[ii] = PermuteKernel.makePermuteKernel(keyChunkTypes[ii]);
- }
-
- // mixin rehash
- overflowKeyColumnsToNull = Arrays.stream(overflowKeySources).filter(x -> x instanceof ObjectArraySource).map(x -> (ObjectArraySource)x).toArray(ObjectArraySource[]::new);
- // endmixin rehash
-
- // region constructor
- this.maximumLoadFactor = maximumLoadFactor;
- this.targetLoadFactor = targetLoadFactor;
- // endregion constructor
-
- ensureCapacity(tableSize);
- }
-
- private void ensureCapacity(int tableSize) {
- stateSource.ensureCapacity(tableSize);
- overflowLocationSource.ensureCapacity(tableSize);
- for (int ii = 0; ii < keyColumnCount; ++ii) {
- keySources[ii].ensureCapacity(tableSize);
- }
- // region ensureCapacity
- // endregion ensureCapacity
- }
-
- private void ensureOverflowCapacity(WritableIntChunk chunkPositionsToInsertInOverflow) {
- final int locationsToAllocate = chunkPositionsToInsertInOverflow.size();
- // mixin rehash
- if (freeOverflowCount >= locationsToAllocate) {
- return;
- }
- final int newCapacity = nextOverflowLocation + locationsToAllocate - freeOverflowCount;
- // endmixin rehash
- // altmixin rehash: final int newCapacity = nextOverflowLocation + locationsToAllocate;
- overflowOverflowLocationSource.ensureCapacity(newCapacity);
- overflowStateSource.ensureCapacity(newCapacity);
- //noinspection ForLoopReplaceableByForEach
- for (int ii = 0; ii < overflowKeySources.length; ++ii) {
- overflowKeySources[ii].ensureCapacity(newCapacity);
- }
- // region ensureOverflowCapacity
- // endregion ensureOverflowCapacity
- }
-
- // region build wrappers
-
- @Override
- public void add(final SafeCloseable bc, RowSequence rowSequence, ColumnSource>[] sources, MutableInt nextOutputPosition, WritableIntChunk outputPositions) {
- if (rowSequence.isEmpty()) {
- return;
- }
- buildTable((BuildContext)bc, rowSequence, sources, nextOutputPosition, outputPositions);
- }
-
- @Override
- public SafeCloseable makeAggregationStateBuildContext(ColumnSource>[] buildSources, long maxSize) {
- return makeBuildContext(buildSources, maxSize);
- }
-
- // endregion build wrappers
-
- class BuildContext implements Context {
- final int chunkSize;
-
- final LongIntTimsortKernel.LongIntSortKernelContext sortContext;
- final ColumnSource.FillContext stateSourceFillContext;
- // mixin rehash
- final ColumnSource.FillContext overflowStateSourceFillContext;
- // endmixin rehash
- final ColumnSource.FillContext overflowFillContext;
- final ColumnSource.FillContext overflowOverflowFillContext;
-
- // the chunk of hashcodes
- final WritableIntChunk hashChunk;
- // the chunk of positions within our table
- final WritableLongChunk tableLocationsChunk;
-
- final ResettableWritableChunk[] writeThroughChunks = getResettableWritableKeyChunks();
- final WritableIntChunk sourcePositions;
- final WritableIntChunk destinationLocationPositionInWriteThrough;
-
- final WritableBooleanChunk filledValues;
- final WritableBooleanChunk equalValues;
-
- // the overflow locations that we need to get from the overflowLocationSource (or overflowOverflowLocationSource)
- final WritableLongChunk overflowLocationsToFetch;
- // the overflow position in the working key chunks, parallel to the overflowLocationsToFetch
- final WritableIntChunk overflowPositionInSourceChunk;
-
- // the position with our hash table that we should insert a value into
- final WritableLongChunk insertTableLocations;
- // the position in our chunk, parallel to the workingChunkInsertTablePositions
- final WritableIntChunk insertPositionsInSourceChunk;
-
- // we sometimes need to check two positions within a single chunk for equality, this contains those positions as pairs
- final WritableIntChunk chunkPositionsToCheckForEquality;
- // While processing overflow insertions, parallel to the chunkPositions to check for equality, the overflow location that
- // is represented by the first of the pairs in chunkPositionsToCheckForEquality
- final WritableLongChunk overflowLocationForEqualityCheck;
-
- // the chunk of state values that we read from the hash table
- // @WritableStateChunkType@ from \QWritableIntChunk\E
- final WritableIntChunk workingStateEntries;
-
- // the chunks for getting key values from the hash table
- final WritableChunk[] workingKeyChunks;
- final WritableChunk[] overflowKeyChunks;
-
- // when fetching from the overflow, we record which chunk position we are fetching for
- final WritableIntChunk chunkPositionsForFetches;
- // which positions in the chunk we are inserting into the overflow
- final WritableIntChunk chunkPositionsToInsertInOverflow;
- // which table locations we are inserting into the overflow
- final WritableLongChunk tableLocationsToInsertInOverflow;
-
- // values we have read from the overflow locations sources
- final WritableIntChunk overflowLocations;
-
- // mixin rehash
- final WritableLongChunk rehashLocations;
- final WritableIntChunk overflowLocationsToMigrate;
- final WritableLongChunk overflowLocationsAsKeyIndices;
- final WritableBooleanChunk shouldMoveBucket;
-
- final ResettableWritableLongChunk overflowLocationForPromotionLoop = ResettableWritableLongChunk.makeResettableChunk();
-
- // mixin allowUpdateWriteThroughState
- // @WritableStateChunkType@ from \QWritableIntChunk\E, @WritableStateChunkName@ from \QWritableIntChunk\E
- final ResettableWritableIntChunk writeThroughState = ResettableWritableIntChunk.makeResettableChunk();
- // endmixin allowUpdateWriteThroughState
- final ResettableWritableIntChunk writeThroughOverflowLocations = ResettableWritableIntChunk.makeResettableChunk();
- // endmixin rehash
-
- final SharedContext sharedFillContext;
- final ColumnSource.FillContext[] workingFillContexts;
- final SharedContext sharedOverflowContext;
- final ColumnSource.FillContext[] overflowContexts;
- final SharedContext sharedBuildContext;
- final ChunkSource.GetContext[] buildContexts;
-
- // region build context
- final WritableIntChunk duplicatePositions;
- final WritableLongChunk addedSlotsByPosition;
- // endregion build context
-
- final boolean haveSharedContexts;
-
- private BuildContext(ColumnSource>[] buildSources,
- int chunkSize
- // region build context constructor args
- // endregion build context constructor args
- ) {
- Assert.gtZero(chunkSize, "chunkSize");
- this.chunkSize = chunkSize;
- haveSharedContexts = buildSources.length > 1;
- if (haveSharedContexts) {
- sharedFillContext = SharedContext.makeSharedContext();
- sharedOverflowContext = SharedContext.makeSharedContext();
- sharedBuildContext = SharedContext.makeSharedContext();
- } else {
- // no point in the additional work implied by these not being null.
- sharedFillContext = null;
- sharedOverflowContext = null;
- sharedBuildContext = null;
- }
- workingFillContexts = makeFillContexts(keySources, sharedFillContext, chunkSize);
- overflowContexts = makeFillContexts(overflowKeySources, sharedOverflowContext, chunkSize);
- buildContexts = makeGetContexts(buildSources, sharedBuildContext, chunkSize);
- // region build context constructor
- duplicatePositions = WritableIntChunk.makeWritableChunk(chunkSize * 2);
- addedSlotsByPosition = WritableLongChunk.makeWritableChunk(chunkSize);
- // endregion build context constructor
- sortContext = LongIntTimsortKernel.createContext(chunkSize);
- stateSourceFillContext = stateSource.makeFillContext(chunkSize);
- overflowFillContext = overflowLocationSource.makeFillContext(chunkSize);
- overflowOverflowFillContext = overflowOverflowLocationSource.makeFillContext(chunkSize);
- hashChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- tableLocationsChunk = WritableLongChunk.makeWritableChunk(chunkSize);
- sourcePositions = WritableIntChunk.makeWritableChunk(chunkSize);
- destinationLocationPositionInWriteThrough = WritableIntChunk.makeWritableChunk(chunkSize);
- filledValues = WritableBooleanChunk.makeWritableChunk(chunkSize);
- equalValues = WritableBooleanChunk.makeWritableChunk(chunkSize);
- overflowLocationsToFetch = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowPositionInSourceChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- insertTableLocations = WritableLongChunk.makeWritableChunk(chunkSize);
- insertPositionsInSourceChunk = WritableIntChunk.makeWritableChunk(chunkSize);
- chunkPositionsToCheckForEquality = WritableIntChunk.makeWritableChunk(chunkSize * 2);
- overflowLocationForEqualityCheck = WritableLongChunk.makeWritableChunk(chunkSize);
- // @WritableStateChunkName@ from \QWritableIntChunk\E
- workingStateEntries = WritableIntChunk.makeWritableChunk(chunkSize);
- workingKeyChunks = getWritableKeyChunks(chunkSize);
- overflowKeyChunks = getWritableKeyChunks(chunkSize);
- chunkPositionsForFetches = WritableIntChunk.makeWritableChunk(chunkSize);
- chunkPositionsToInsertInOverflow = WritableIntChunk.makeWritableChunk(chunkSize);
- tableLocationsToInsertInOverflow = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowLocations = WritableIntChunk.makeWritableChunk(chunkSize);
- // mixin rehash
- rehashLocations = WritableLongChunk.makeWritableChunk(chunkSize);
- overflowStateSourceFillContext = overflowStateSource.makeFillContext(chunkSize);
- overflowLocationsToMigrate = WritableIntChunk.makeWritableChunk(chunkSize);
- overflowLocationsAsKeyIndices = WritableLongChunk.makeWritableChunk(chunkSize);
- shouldMoveBucket = WritableBooleanChunk.makeWritableChunk(chunkSize);
- // endmixin rehash
- }
-
- private void resetSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.reset();
- sharedOverflowContext.reset();
- sharedBuildContext.reset();
- }
-
- private void closeSharedContexts() {
- if (!haveSharedContexts) {
- return;
- }
- sharedFillContext.close();
- sharedOverflowContext.close();
- sharedBuildContext.close();
- }
-
- @Override
- public void close() {
- sortContext.close();
- stateSourceFillContext.close();
- // mixin rehash
- overflowStateSourceFillContext.close();
- // endmixin rehash
- overflowFillContext.close();
- overflowOverflowFillContext.close();
- closeArray(workingFillContexts);
- closeArray(overflowContexts);
- closeArray(buildContexts);
-
- hashChunk.close();
- tableLocationsChunk.close();
- closeArray(writeThroughChunks);
-
- sourcePositions.close();
- destinationLocationPositionInWriteThrough.close();
- filledValues.close();
- equalValues.close();
- overflowLocationsToFetch.close();
- overflowPositionInSourceChunk.close();
- insertTableLocations.close();
- insertPositionsInSourceChunk.close();
- chunkPositionsToCheckForEquality.close();
- overflowLocationForEqualityCheck.close();
- workingStateEntries.close();
- closeArray(workingKeyChunks);
- closeArray(overflowKeyChunks);
- chunkPositionsForFetches.close();
- chunkPositionsToInsertInOverflow.close();
- tableLocationsToInsertInOverflow.close();
- overflowLocations.close();
- // mixin rehash
- rehashLocations.close();
- overflowLocationsToMigrate.close();
- overflowLocationsAsKeyIndices.close();
- shouldMoveBucket.close();
- overflowLocationForPromotionLoop.close();
- // mixin allowUpdateWriteThroughState
- writeThroughState.close();
- // endmixin allowUpdateWriteThroughState
- writeThroughOverflowLocations.close();
- // endmixin rehash
- // region build context close
- duplicatePositions.close();
- addedSlotsByPosition.close();
- // endregion build context close
- closeSharedContexts();
- }
-
- }
-
- public BuildContext makeBuildContext(ColumnSource>[] buildSources,
- long maxSize
- // region makeBuildContext args
- // endregion makeBuildContext args
- ) {
- return new BuildContext(buildSources, (int)Math.min(CHUNK_SIZE, maxSize)
- // region makeBuildContext arg pass
- // endregion makeBuildContext arg pass
- );
- }
-
- private void buildTable(final BuildContext bc,
- final RowSequence buildIndex,
- ColumnSource>[] buildSources
- // region extra build arguments
- , final MutableInt outputPosition
- , final WritableIntChunk