From 326f0a43c5dd6b119af0d1d98b4d8429427d3996 Mon Sep 17 00:00:00 2001
From: Larry Booker
Date: Thu, 4 Aug 2022 15:25:28 -0700
Subject: [PATCH 001/123] added UpdateByCumulativeOperator class, tests still
pass
---
.../engine/table/impl/UpdateByOperator.java | 43 ++++---
.../engine/table/impl/ZeroKeyUpdateBy.java | 106 ++++++++++--------
.../impl/updateby/ema/ByteEMAOperator.java | 2 +-
.../impl/updateby/ema/IntEMAOperator.java | 2 +-
.../impl/updateby/ema/LongEMAOperator.java | 2 +-
.../updateby/hashing/UpdateBySlotTracker.java | 2 +-
.../internal/BaseByteUpdateByOperator.java | 5 +-
.../internal/BaseCharUpdateByOperator.java | 5 +-
.../internal/BaseDoubleUpdateByOperator.java | 5 +-
.../internal/BaseFloatUpdateByOperator.java | 5 +-
.../internal/BaseIntUpdateByOperator.java | 5 +-
.../internal/BaseLongUpdateByOperator.java | 5 +-
.../internal/BaseObjectUpdateByOperator.java | 5 +-
.../internal/BaseShortUpdateByOperator.java | 5 +-
.../LongRecordingUpdateByOperator.java | 5 +-
15 files changed, 118 insertions(+), 84 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
index 158ad91cab4..4655c6d4114 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
@@ -89,20 +89,6 @@ static boolean isAppendOnly(@NotNull final TableUpdate update, final long lastKn
update.added().firstRowKey() > lastKnownKey;
}
- /**
- * Find the smallest valued key that participated in the upstream {@link TableUpdate}.
- *
- * @param upstream the update
- * @return the smallest key that participated in any part of the update.
- */
- static long determineSmallestVisitedKey(@NotNull final TableUpdate upstream, @NotNull final RowSet affected) {
- return determineSmallestVisitedKey(upstream.added(),
- upstream.modified(),
- upstream.removed(),
- upstream.shifted(),
- affected);
- }
-
/**
* Find the smallest valued key that participated in the upstream {@link TableUpdate}.
*
@@ -113,7 +99,7 @@ static long determineSmallestVisitedKey(@NotNull final TableUpdate upstream, @No
*
* @return the smallest key that participated in any part of the update.
*/
- static long determineSmallestVisitedKey(@NotNull final RowSet added,
+ static long smallestAffectedKey(@NotNull final RowSet added,
@NotNull final RowSet modified,
@NotNull final RowSet removed,
@NotNull final RowSetShiftData shifted,
@@ -181,6 +167,33 @@ static long determineSmallestVisitedKey(@NotNull final RowSet added,
* updates.
*/
interface UpdateContext extends SafeCloseable {
+ /**
+ * Determine all the rows affected by the {@link TableUpdate} that need to be recomputed
+ *
+ * @param upstream the update
+ * @param source the rowset of the parent table (affected rows will be a subset)
+ */
+ default void determineAffectedRows(@NotNull final TableUpdate upstream, @NotNull final RowSet source,
+ final boolean upstreamAppendOnly) {
+ throw (new IllegalStateException("A raw UpdateContext cannot execute determineAffectedRows()"));
+ }
+
+ /**
+ * Return the rows computed by the {@Code determineAffectedRows()}
+ */
+ default RowSet getAffectedRows() {
+ throw (new IllegalStateException("A raw UpdateContext cannot execute getAffectedRows()"));
+ }
+
+ /**
+ * Set all the rows that need to be computed by this operator
+ *
+ * @param affected the rowset of the parent table (affected is subset)
+ */
+ default void setAffectedRows(@NotNull final RowSet affected) {
+ throw (new IllegalStateException("A raw UpdateContext cannot execute setAffectedRows()"));
+
+ }
}
/**
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
index 9a15d6260c0..377cd9f303b 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
@@ -127,7 +127,7 @@ private class UpdateContext implements SafeCloseable {
/** A Long Chunk for previous keys */
WritableLongChunk prevKeyChunk;
- final long smallestModifiedKey;
+ final RowSet affectedRows;
@SuppressWarnings("resource")
UpdateContext(@NotNull final TableUpdate upstream,
@@ -158,8 +158,6 @@ private class UpdateContext implements SafeCloseable {
final boolean upstreamAppendOnly =
isInitializeStep || UpdateByOperator.isAppendOnly(upstream, source.getRowSet().lastRowKeyPrev());
- smallestModifiedKey = upstreamAppendOnly ? Long.MAX_VALUE
- : UpdateByOperator.determineSmallestVisitedKey(upstream, source.getRowSet());
// noinspection unchecked
this.postWorkingChunks = new SizedSafeCloseable[operators.length];
@@ -198,6 +196,20 @@ private class UpdateContext implements SafeCloseable {
operators[opIdx].initializeForUpdate(opContext[opIdx], upstream, source.getRowSet(), false,
upstreamAppendOnly);
}
+
+ // retrieve the affected rows from all operator update contexts
+ WritableRowSet tmp = RowSetFactory.empty();
+ for (int opIdx = 0; opIdx < operators.length; opIdx++) {
+ if (!opAffected[opIdx]) {
+ continue;
+ }
+ // trigger the operator to determine its own set of affected rows (window-specific)
+ opContext[opIdx].determineAffectedRows(upstream, source.getRowSet(), upstreamAppendOnly);
+
+ // union the operator rowsets together to get a global set
+ tmp.insert(opContext[opIdx].getAffectedRows());
+ }
+ affectedRows = tmp;
}
public SharedContext getSharedContext() {
@@ -267,6 +279,7 @@ void finishFor(@NotNull final UpdateType type) {
public void close() {
sharedContext.close();
keyChunk.close();
+ affectedRows.close();
if (prevKeyChunk != null) {
prevKeyChunk.close();
@@ -380,23 +393,23 @@ void doUpdate(@NotNull final RowSet updateRowSet,
postWorkingChunks[slotPosition].get(),
0);
} else if (type == UpdateType.Reprocess) {
- // TODO: When we reprocess rows, we are basically re-adding the entire table starting at the
- // lowest key.
- // Since every operator might start at a different key, we could try to be efficient and not
- // replay
- // chunks of rows to operators that don't actually need them.
- //
- // At the time of writing, any op that reprocesses uses the same logic to decide when,
- // so there is no need for fancyness deciding if we need to push this particular set
- // of RowSequence through.
- prepareValuesChunkFor(opIdx, slotPosition, false, true, chunkOk, null,
- null, postWorkingChunks[slotPosition].get(),
- null, fillContexts[slotPosition].get());
- currentOp.reprocessChunk(opContext[opIdx],
- chunkOk,
- keyChunk.get(),
- postWorkingChunks[slotPosition].get(),
- source.getRowSet());
+ // is this chunk relevant to this operator? If so, then intersect and process only the
+ // relevant rows
+ if (chunkOk.firstRowKey() <= opContext[opIdx].getAffectedRows().lastRowKey()
+ && chunkOk.lastRowKey() >= opContext[opIdx].getAffectedRows().firstRowKey()) {
+ try (final RowSet rs = chunkOk.asRowSet();
+ final RowSet intersect = rs.intersect(opContext[opIdx].getAffectedRows())) {
+
+ prepareValuesChunkFor(opIdx, slotPosition, false, true, intersect, intersect,
+ null, postWorkingChunks[slotPosition].get(),
+ null, fillContexts[slotPosition].get());
+ currentOp.reprocessChunk(opContext[opIdx],
+ intersect,
+ keyChunk.get(),
+ postWorkingChunks[slotPosition].get(),
+ source.getRowSet());
+ }
+ }
}
}
}
@@ -409,43 +422,42 @@ void doUpdate(@NotNull final RowSet updateRowSet,
* Locate the smallest key that requires reprocessing and then replay the table from that point
*/
private void reprocessRows(RowSetShiftData shifted) {
+ // Get a sub-index of the source from that minimum reprocessing index and make sure we update our
// Get a sub-index of the source from that minimum reprocessing index and make sure we update our
// contextual chunks and FillContexts to an appropriate size for this step.
final RowSet sourceRowSet = source.getRowSet();
- try (final RowSet indexToReprocess =
- sourceRowSet.subSetByKeyRange(smallestModifiedKey, sourceRowSet.lastRowKey())) {
- final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), indexToReprocess.size());
- setChunkSize(newChunkSize);
-
- final long keyBefore;
- try (final RowSet.SearchIterator sit = sourceRowSet.searchIterator()) {
- keyBefore = sit.binarySearchValue(
- (compareTo, ignored) -> Long.compare(smallestModifiedKey - 1, compareTo), 1);
- }
- for (int opRowSet = 0; opRowSet < operators.length; opRowSet++) {
- if (opAffected[opRowSet]) {
- operators[opRowSet].resetForReprocess(opContext[opRowSet], sourceRowSet, keyBefore);
+ final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), affectedRows.size());
+ setChunkSize(newChunkSize);
+
+ for (int opIndex = 0; opIndex < operators.length; opIndex++) {
+ if (opAffected[opIndex]) {
+ final long keyStart = opContext[opIndex].getAffectedRows().firstRowKey();
+ final long keyBefore;
+ try (final RowSet.SearchIterator sit = sourceRowSet.searchIterator()) {
+ keyBefore = sit.binarySearchValue(
+ (compareTo, ignored) -> Long.compare(keyStart - 1, compareTo), 1);
}
+ operators[opIndex].resetForReprocess(opContext[opIndex], sourceRowSet, keyBefore);
}
+ }
- // We will not mess with shifts if we are using a redirection because we'll have applied the shift
- // to the redirection index already by now.
- if (rowRedirection == null && shifted.nonempty()) {
- try (final RowSet prevIdx = source.getRowSet().copyPrev()) {
- shifted.apply((begin, end, delta) -> {
- try (final RowSet subRowSet = prevIdx.subSetByKeyRange(begin, end)) {
- for (int opIdx = 0; opIdx < operators.length; opIdx++) {
- operators[opIdx].applyOutputShift(opContext[opIdx], subRowSet, delta);
- }
+ // We will not mess with shifts if we are using a redirection because we'll have applied the shift
+ // to the redirection index already by now.
+ if (rowRedirection == null && shifted.nonempty()) {
+ try (final RowSet prevIdx = source.getRowSet().copyPrev()) {
+ shifted.apply((begin, end, delta) -> {
+ try (final RowSet subRowSet = prevIdx.subSetByKeyRange(begin, end)) {
+ for (int opIdx = 0; opIdx < operators.length; opIdx++) {
+ operators[opIdx].applyOutputShift(opContext[opIdx], subRowSet, delta);
}
- });
- }
+ }
+ });
}
-
- // Now iterate index to reprocess.
- doUpdate(indexToReprocess, indexToReprocess, UpdateType.Reprocess);
}
+
+ // Now iterate rowset to reprocess.
+ doUpdate(affectedRows, affectedRows, UpdateType.Reprocess);
}
/**
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/ByteEMAOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/ByteEMAOperator.java
index 4067dad979c..f8ac5847f43 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/ByteEMAOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/ByteEMAOperator.java
@@ -5,12 +5,12 @@
*/
package io.deephaven.engine.table.impl.updateby.ema;
+import io.deephaven.api.updateby.OperationControl;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.ByteChunk;
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.ColumnSource;
-import io.deephaven.api.updateby.OperationControl;
import io.deephaven.engine.table.MatchPair;
import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/IntEMAOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/IntEMAOperator.java
index 5f391dde775..87620966e84 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/IntEMAOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/IntEMAOperator.java
@@ -5,12 +5,12 @@
*/
package io.deephaven.engine.table.impl.updateby.ema;
+import io.deephaven.api.updateby.OperationControl;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.ColumnSource;
-import io.deephaven.api.updateby.OperationControl;
import io.deephaven.engine.table.MatchPair;
import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/LongEMAOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/LongEMAOperator.java
index d0e35e2dba0..c183adf706c 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/LongEMAOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/LongEMAOperator.java
@@ -5,12 +5,12 @@
*/
package io.deephaven.engine.table.impl.updateby.ema;
+import io.deephaven.api.updateby.OperationControl;
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.WritableDoubleChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.table.ColumnSource;
-import io.deephaven.api.updateby.OperationControl;
import io.deephaven.engine.table.MatchPair;
import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
import io.deephaven.engine.table.impl.util.RowRedirection;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/hashing/UpdateBySlotTracker.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/hashing/UpdateBySlotTracker.java
index a59f361de5f..eca00aca813 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/hashing/UpdateBySlotTracker.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/hashing/UpdateBySlotTracker.java
@@ -174,7 +174,7 @@ private void applyTo(@NotNull final WritableRowSet slotindex, @NotNull final Row
wasAppendOnly = false;
}
- this.smallestModifiedKey = UpdateByOperator.determineSmallestVisitedKey(added,
+ this.smallestModifiedKey = UpdateByOperator.smallestAffectedKey(added,
modified,
removed,
shifts,
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
index 1fd0c385d46..d9682e0e7c7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -30,7 +31,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseByteUpdateByOperator implements UpdateByOperator {
+public abstract class BaseByteUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -49,7 +50,7 @@ public abstract class BaseByteUpdateByOperator implements UpdateByOperator {
final byte nullValue;
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedByteChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
index 0e21db78eed..fe6c7f1780e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
@@ -12,6 +12,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -25,7 +26,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseCharUpdateByOperator implements UpdateByOperator {
+public abstract class BaseCharUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -43,7 +44,7 @@ public abstract class BaseCharUpdateByOperator implements UpdateByOperator {
// region extra-fields
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedCharChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
index 354860af0ee..3a0b7e6d278 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.DoubleArraySource;
import io.deephaven.engine.table.impl.sources.DoubleSparseArraySource;
@@ -33,7 +34,7 @@
import static io.deephaven.util.QueryConstants.NULL_DOUBLE;
import static io.deephaven.util.QueryConstants.NULL_LONG;
-public abstract class BaseDoubleUpdateByOperator implements UpdateByOperator {
+public abstract class BaseDoubleUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
@@ -49,7 +50,7 @@ public abstract class BaseDoubleUpdateByOperator implements UpdateByOperator {
protected boolean initialized = false;
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedDoubleChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
index 60ea52d2548..8e3dc96020d 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
@@ -12,6 +12,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.FloatArraySource;
import io.deephaven.engine.table.impl.sources.FloatSparseArraySource;
@@ -28,7 +29,7 @@
import static io.deephaven.util.QueryConstants.NULL_FLOAT;
import static io.deephaven.util.QueryConstants.NULL_LONG;
-public abstract class BaseFloatUpdateByOperator implements UpdateByOperator {
+public abstract class BaseFloatUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
@@ -44,7 +45,7 @@ public abstract class BaseFloatUpdateByOperator implements UpdateByOperator {
protected boolean initialized = false;
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedFloatChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
index 49871e2dad3..4b5c875a0f2 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -30,7 +31,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseIntUpdateByOperator implements UpdateByOperator {
+public abstract class BaseIntUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -48,7 +49,7 @@ public abstract class BaseIntUpdateByOperator implements UpdateByOperator {
// region extra-fields
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedIntChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
index cad71cddca9..d1b13b1f1ec 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -30,7 +31,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseLongUpdateByOperator implements UpdateByOperator {
+public abstract class BaseLongUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -48,7 +49,7 @@ public abstract class BaseLongUpdateByOperator implements UpdateByOperator {
// region extra-fields
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedLongChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
index 477c356fe30..e9a9c6fe1f5 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -30,7 +31,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseObjectUpdateByOperator implements UpdateByOperator {
+public abstract class BaseObjectUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -49,7 +50,7 @@ public abstract class BaseObjectUpdateByOperator implements UpdateByOperator
private final Class colType;
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedObjectChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
index 073cf9360eb..233124ffbee 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
@@ -30,7 +31,7 @@
import static io.deephaven.engine.rowset.RowSequence.NULL_ROW_KEY;
-public abstract class BaseShortUpdateByOperator implements UpdateByOperator {
+public abstract class BaseShortUpdateByOperator extends UpdateByCumulativeOperator {
protected final WritableColumnSource outputSource;
protected final WritableColumnSource maybeInnerSource;
protected final MatchPair pair;
@@ -48,7 +49,7 @@ public abstract class BaseShortUpdateByOperator implements UpdateByOperator {
// region extra-fields
// endregion extra-fields
- protected class Context implements UpdateContext {
+ protected class Context extends UpdateCumulativeContext {
public final SizedSafeCloseable fillContext;
public final SizedShortChunk outputValues;
public boolean canProcessDirectly;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
index 08081ee57f3..13ff966d45c 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
@@ -17,6 +17,7 @@
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.TableUpdate;
import io.deephaven.engine.table.impl.UpdateBy;
+import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.ReinterpretUtils;
import org.jetbrains.annotations.NotNull;
@@ -28,7 +29,7 @@
/**
* An operator that simply remembers the current chunks during the add and reprocess phases.
*/
-public class LongRecordingUpdateByOperator implements UpdateByOperator {
+public class LongRecordingUpdateByOperator extends UpdateByCumulativeOperator {
private final String inputColumnName;
private final String[] affectingColumns;
private final ColumnSource> columnSource;
@@ -42,7 +43,7 @@ public LongRecordingUpdateByOperator(@NotNull final String inputColumnName,
this.columnSource = ReinterpretUtils.maybeConvertToPrimitive(columnSource);
}
- private class RecordingContext implements UpdateContext {
+ private class RecordingContext extends UpdateCumulativeContext {
private LongChunk addedChunk;
@Override
From d8f941ce85a36287c531fa798fd81cf618dbaa15 Mon Sep 17 00:00:00 2001
From: Larry Booker
Date: Thu, 4 Aug 2022 16:23:54 -0700
Subject: [PATCH 002/123] adding in windowed operators, tests still passing
---
.../table/impl/UpdateByOperatorFactory.java | 74 +++++++++++++++++++
.../api/updateby/spec/UpdateBySpec.java | 2 +
2 files changed, 76 insertions(+)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
index 429cf2644d5..32d7ef0856c 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
@@ -205,6 +205,29 @@ public Void visit(CumProdSpec p) {
return null;
}
+ @Override
+ public Void visit(@NotNull final RollingSumSpec rs) {
+ final LongRecordingUpdateByOperator timeStampRecorder;
+ final boolean isTimeBased = rs.prevTimeScale().isTimeBased();
+ final String timestampCol = rs.prevTimeScale().timestampCol();
+
+ if (isTimeBased) {
+ timeStampRecorder = makeLongRecordingOperator(source, timestampCol);
+ ops.add(timeStampRecorder);
+ } else {
+ timeStampRecorder = null;
+ }
+
+ Arrays.stream(pairs)
+ .filter(p -> !isTimeBased || !p.rightColumn().equals(timestampCol))
+ .map(fc -> makeRollingSumOperator(fc,
+ source,
+ timeStampRecorder,
+ rs))
+ .forEach(ops::add);
+ return null;
+ }
+
@SuppressWarnings("unchecked")
private UpdateByOperator makeEmaOperator(@NotNull final MatchPair pair,
@NotNull final TableWithDefaults source,
@@ -364,5 +387,56 @@ private UpdateByOperator makeForwardFillOperator(MatchPair fc, TableWithDefaults
return new ObjectFillByOperator<>(fc, rowRedirection, csType);
}
}
+
+ private UpdateByOperator makeRollingSumOperator(@NotNull final MatchPair pair,
+ @NotNull final TableWithDefaults source,
+ @Nullable final LongRecordingUpdateByOperator recorder,
+ @NotNull final RollingSumSpec rs) {
+ // noinspection rawtypes
+ final ColumnSource columnSource = source.getColumnSource(pair.rightColumn);
+ final Class> csType = columnSource.getType();
+
+ final String[] affectingColumns;
+ if (recorder == null) {
+ affectingColumns = new String[] {pair.rightColumn};
+ } else {
+ affectingColumns = new String[] {rs.prevTimeScale().timestampCol(), pair.rightColumn};
+ }
+
+ // use the correct units from the EmaSpec (depending on was Time or Tick based)
+ final long prevTimeScaleUnits = rs.prevTimeScale().timescaleUnits();
+ final long fwdTimeScaleUnits = rs.fwdTimeScale().timescaleUnits();
+
+// if (csType == Boolean.class || csType == boolean.class) {
+// return new ByteRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection, NULL_BOOLEAN_AS_BYTE);
+// } else if (csType == byte.class || csType == Byte.class) {
+// return new ByteRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection, NULL_BYTE);
+// } else if (csType == short.class || csType == Short.class) {
+// return new ShortRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection);
+// } else if (csType == int.class || csType == Integer.class) {
+// return new IntRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection);
+// } else if (csType == long.class || csType == Long.class) {
+// return new LongRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection);
+// } else if (csType == float.class || csType == Float.class) {
+// return new FloatRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
+// columnSource, rowRedirection);
+// } else if (csType == double.class || csType == Double.class) {
+// return new DoubleRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder,
+// prevTimeScaleUnits, fwdTimeScaleUnits, columnSource, rowRedirection);
+// } else if (csType == BigDecimal.class) {
+// return new BigDecimalRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder,
+// prevTimeScaleUnits, fwdTimeScaleUnits, columnSource, rowRedirection, control.mathContextOrDefault());
+// } else if (csType == BigInteger.class) {
+// return new BigIntegerRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder,
+// prevTimeScaleUnits, fwdTimeScaleUnits, columnSource, rowRedirection);
+// }
+
+ throw new IllegalArgumentException("Can not perform RollingSum on type " + csType);
+ }
}
}
diff --git a/table-api/src/main/java/io/deephaven/api/updateby/spec/UpdateBySpec.java b/table-api/src/main/java/io/deephaven/api/updateby/spec/UpdateBySpec.java
index e8fb67759f5..633a1fbd415 100644
--- a/table-api/src/main/java/io/deephaven/api/updateby/spec/UpdateBySpec.java
+++ b/table-api/src/main/java/io/deephaven/api/updateby/spec/UpdateBySpec.java
@@ -71,6 +71,8 @@ interface Visitor {
T visit(CumMinMaxSpec m);
T visit(CumProdSpec p);
+
+ T visit(RollingSumSpec p);
}
// endregion
}
From 6ddc5ed27755536e817e880253bcd795def353c6 Mon Sep 17 00:00:00 2001
From: Larry Booker
Date: Fri, 5 Aug 2022 15:14:24 -0700
Subject: [PATCH 003/123] wip
---
.../table/impl/UpdateByOperatorFactory.java | 54 ++++++++++++++++---
.../LongRecordingUpdateByOperator.java | 4 +-
.../replicators/ReplicateUpdateBy.java | 26 +++++++++
.../api/updateby/UpdateByOperation.java | 25 +++++++++
4 files changed, 99 insertions(+), 10 deletions(-)
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
index 32d7ef0856c..6bd1bf379b4 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
@@ -11,8 +11,10 @@
import io.deephaven.engine.table.impl.updateby.ema.*;
import io.deephaven.engine.table.impl.updateby.fill.*;
import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
+import io.deephaven.engine.table.impl.updateby.internal.WindowMangerUpdateByOperator;
import io.deephaven.engine.table.impl.updateby.minmax.*;
import io.deephaven.engine.table.impl.updateby.prod.*;
+import io.deephaven.engine.table.impl.updateby.rollingsum.ShortRollingSumOperator;
import io.deephaven.engine.table.impl.updateby.sum.*;
import io.deephaven.engine.table.impl.util.WritableRowRedirection;
import io.deephaven.time.DateTime;
@@ -207,22 +209,28 @@ public Void visit(CumProdSpec p) {
@Override
public Void visit(@NotNull final RollingSumSpec rs) {
- final LongRecordingUpdateByOperator timeStampRecorder;
+ final WindowMangerUpdateByOperator windowManager;
final boolean isTimeBased = rs.prevTimeScale().isTimeBased();
- final String timestampCol = rs.prevTimeScale().timestampCol();
+ final String timestampCol = rs.prevTimeScale().timestampCol();
if (isTimeBased) {
- timeStampRecorder = makeLongRecordingOperator(source, timestampCol);
- ops.add(timeStampRecorder);
+ windowManager = makeWindowManagerUpdateByOperator(source,
+ timestampCol,
+ rs.prevTimeScale().timescaleUnits(),
+ rs.fwdTimeScale().timescaleUnits());
} else {
- timeStampRecorder = null;
+ windowManager = makeWindowManagerUpdateByOperator(source,
+ null,
+ rs.prevTimeScale().timescaleUnits(),
+ rs.fwdTimeScale().timescaleUnits());
}
+ ops.add(windowManager);
Arrays.stream(pairs)
.filter(p -> !isTimeBased || !p.rightColumn().equals(timestampCol))
.map(fc -> makeRollingSumOperator(fc,
source,
- timeStampRecorder,
+ windowManager,
rs))
.forEach(ops::add);
return null;
@@ -388,16 +396,42 @@ private UpdateByOperator makeForwardFillOperator(MatchPair fc, TableWithDefaults
}
}
+ private WindowMangerUpdateByOperator makeWindowManagerUpdateByOperator(TableWithDefaults source,
+ String colName,
+ final long reverseTimeScaleUnits,
+ final long forwardTimeScaleUnits) {
+ if (colName == null) {
+ // not using a time source, this will manage our fixed offset calcs
+ final String[] inputColumns = Arrays.stream(pairs).map(MatchPair::rightColumn).toArray(String[]::new);
+ return new WindowMangerUpdateByOperator(colName, inputColumns, reverseTimeScaleUnits, forwardTimeScaleUnits, null);
+ } else {
+ final ColumnSource> columnSource = source.getColumnSource(colName);
+ final Class> colType = columnSource.getType();
+ if (colType != long.class &&
+ colType != Long.class &&
+ colType != DateTime.class &&
+ colType != Instant.class &&
+ !columnSource.allowsReinterpret(long.class)) {
+ throw new IllegalArgumentException("Column " + colName + " cannot be interpreted as a long");
+ }
+
+ final String[] inputColumns = Stream.concat(Stream.of(colName),
+ Arrays.stream(pairs).map(MatchPair::rightColumn)).toArray(String[]::new);
+
+ return new WindowMangerUpdateByOperator(colName, inputColumns, reverseTimeScaleUnits, forwardTimeScaleUnits, columnSource);
+ }
+ }
+
private UpdateByOperator makeRollingSumOperator(@NotNull final MatchPair pair,
@NotNull final TableWithDefaults source,
- @Nullable final LongRecordingUpdateByOperator recorder,
+ @NotNull final WindowMangerUpdateByOperator windowManager,
@NotNull final RollingSumSpec rs) {
// noinspection rawtypes
final ColumnSource columnSource = source.getColumnSource(pair.rightColumn);
final Class> csType = columnSource.getType();
final String[] affectingColumns;
- if (recorder == null) {
+ if (!windowManager.isTimeBased()) {
affectingColumns = new String[] {pair.rightColumn};
} else {
affectingColumns = new String[] {rs.prevTimeScale().timestampCol(), pair.rightColumn};
@@ -407,6 +441,10 @@ private UpdateByOperator makeRollingSumOperator(@NotNull final MatchPair pair,
final long prevTimeScaleUnits = rs.prevTimeScale().timescaleUnits();
final long fwdTimeScaleUnits = rs.fwdTimeScale().timescaleUnits();
+ if (csType == short.class || csType == Short.class) {
+ return new ShortRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), windowManager, prevTimeScaleUnits, fwdTimeScaleUnits,
+ columnSource, rowRedirection);
+ }
// if (csType == Boolean.class || csType == boolean.class) {
// return new ByteRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
// columnSource, rowRedirection, NULL_BOOLEAN_AS_BYTE);
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
index 13ff966d45c..8a74ba83326 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/LongRecordingUpdateByOperator.java
@@ -29,7 +29,7 @@
/**
* An operator that simply remembers the current chunks during the add and reprocess phases.
*/
-public class LongRecordingUpdateByOperator extends UpdateByCumulativeOperator {
+public class LongRecordingUpdateByOperator implements UpdateByOperator {
private final String inputColumnName;
private final String[] affectingColumns;
private final ColumnSource> columnSource;
@@ -43,7 +43,7 @@ public LongRecordingUpdateByOperator(@NotNull final String inputColumnName,
this.columnSource = ReinterpretUtils.maybeConvertToPrimitive(columnSource);
}
- private class RecordingContext extends UpdateCumulativeContext {
+ private class RecordingContext implements UpdateContext {
private LongChunk addedChunk;
@Override
diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateUpdateBy.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateUpdateBy.java
index 7a8961d3a65..911ba17acfe 100644
--- a/replication/static/src/main/java/io/deephaven/replicators/ReplicateUpdateBy.java
+++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateUpdateBy.java
@@ -80,6 +80,32 @@ public static void main(String[] args) throws IOException {
replicateNumericOperator(
"engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/ShortEMAOperator.java",
"engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/FloatEMAOperator.java");
+
+
+
+ // Replicate the rolling versions of UpdateByOperations
+ files = ReplicatePrimitiveCode.charToAllButBooleanAndFloats(
+ "engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedCharUpdateByOperator.java",
+ exemptions);
+ for (final String f : files) {
+ if (f.contains("Int")) {
+ fixupInteger(f);
+ }
+
+ if (f.contains("Byte")) {
+ fixupByteBase(f);
+ }
+ }
+// objectResult = ReplicatePrimitiveCode.charToObject(
+// "engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedCharUpdateByOperator.java");
+// fixupStandardObject(objectResult, "BaseWindowedObjectUpdateByOperator", true);
+// ReplicatePrimitiveCode.floatToAllFloatingPoints(
+// "engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedFloatUpdateByOperator.java",
+// exemptions);
+//
+// replicateNumericOperator(
+// "engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/rollingsum/ShortRollingSumOperator.java",
+// "engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/rollingsum/FloatRollingSumOperator.java");
}
private static void replicateNumericOperator(@NotNull final String shortClass, @NotNull final String floatClass)
diff --git a/table-api/src/main/java/io/deephaven/api/updateby/UpdateByOperation.java b/table-api/src/main/java/io/deephaven/api/updateby/UpdateByOperation.java
index 6269c471059..5441048abe7 100644
--- a/table-api/src/main/java/io/deephaven/api/updateby/UpdateByOperation.java
+++ b/table-api/src/main/java/io/deephaven/api/updateby/UpdateByOperation.java
@@ -215,6 +215,31 @@ static UpdateByOperation Ema(OperationControl control, String timestampColumn, D
return EmaSpec.ofTime(control, timestampColumn, emaDuration).clause(pairs);
}
+ /**
+ * Create an {@link RollingSumSpec rolling sum} for the supplied column name pairs, using ticks as
+ * the windowing unit. Uses the default OperationControl settings.
+ *
+ * @param prevTimeTicks the look-behind window size (in rows/ticks)
+ * @param pairs The input/output column name pairs
+ * @return The aggregation
+ */
+ static UpdateByOperation RollingSum(long prevTimeTicks, String... pairs) {
+ return RollingSumSpec.ofTicks(prevTimeTicks).clause(pairs);
+ }
+
+ /**
+ * Create an {@link RollingSumSpec rolling sum} for the supplied column name pairs, using ticks as
+ * the windowing unit. Uses the default OperationControl settings.
+ *
+ * @param prevTimeTicks the look-behind window size (in rows/ticks)
+ * @param fwdTimeTicks the look-ahead window size (in rows/ticks)
+ * @param pairs The input/output column name pairs
+ * @return The aggregation
+ */
+ static UpdateByOperation RollingSum(long prevTimeTicks, long fwdTimeTicks, String... pairs) {
+ return RollingSumSpec.ofTicks(prevTimeTicks, fwdTimeTicks).clause(pairs);
+ }
+
T walk(Visitor visitor);
interface Visitor {
From c8d322460c086cb8804817df17c7256159cff5cd Mon Sep 17 00:00:00 2001
From: Larry Booker
Date: Wed, 10 Aug 2022 12:15:19 -0700
Subject: [PATCH 004/123] WIP, good save point. ZeroKey working, bucketed not
---
.../engine/table/impl/BucketedUpdateBy.java | 7 +-
.../deephaven/engine/table/impl/UpdateBy.java | 4 +-
.../impl/UpdateByCumulativeOperator.java | 42 ++
.../engine/table/impl/UpdateByOperator.java | 84 ++-
.../table/impl/UpdateByOperatorFactory.java | 53 +-
.../table/impl/UpdateByWindowedOperator.java | 221 ++++++++
.../engine/table/impl/ZeroKeyUpdateBy.java | 25 +-
.../ema/BasePrimitiveEMAOperator.java | 24 +-
.../updateby/ema/BigNumberEMAOperator.java | 24 +-
.../updateby/fill/BooleanFillByOperator.java | 12 +-
.../updateby/fill/ByteFillByOperator.java | 12 +-
.../updateby/fill/CharFillByOperator.java | 12 +-
.../updateby/fill/DoubleFillByOperator.java | 12 +-
.../updateby/fill/FloatFillByOperator.java | 12 +-
.../impl/updateby/fill/IntFillByOperator.java | 12 +-
.../updateby/fill/LongFillByOperator.java | 12 +-
.../updateby/fill/ObjectFillByOperator.java | 12 +-
.../updateby/fill/ShortFillByOperator.java | 12 +-
.../internal/BaseByteUpdateByOperator.java | 48 +-
.../internal/BaseCharUpdateByOperator.java | 43 +-
.../internal/BaseDoubleUpdateByOperator.java | 41 +-
.../internal/BaseFloatUpdateByOperator.java | 41 +-
.../internal/BaseIntUpdateByOperator.java | 43 +-
.../internal/BaseLongUpdateByOperator.java | 43 +-
.../internal/BaseObjectBinaryOperator.java | 12 +-
.../internal/BaseObjectUpdateByOperator.java | 43 +-
.../internal/BaseShortUpdateByOperator.java | 43 +-
.../BaseWindowedByteUpdateByOperator.java | 412 ++++++++++++++
.../BaseWindowedCharUpdateByOperator.java | 388 ++++++++++++++
.../BaseWindowedIntUpdateByOperator.java | 393 ++++++++++++++
.../BaseWindowedLongUpdateByOperator.java | 393 ++++++++++++++
.../BaseWindowedShortUpdateByOperator.java | 393 ++++++++++++++
.../LongRecordingUpdateByOperator.java | 44 +-
.../minmax/ByteCumMinMaxOperator.java | 12 +-
.../minmax/DoubleCumMinMaxOperator.java | 12 +-
.../minmax/FloatCumMinMaxOperator.java | 12 +-
.../updateby/minmax/IntCumMinMaxOperator.java | 12 +-
.../minmax/LongCumMinMaxOperator.java | 12 +-
.../minmax/ShortCumMinMaxOperator.java | 12 +-
.../updateby/prod/ByteCumProdOperator.java | 12 +-
.../updateby/prod/DoubleCumProdOperator.java | 12 +-
.../updateby/prod/FloatCumProdOperator.java | 12 +-
.../updateby/prod/IntCumProdOperator.java | 12 +-
.../updateby/prod/LongCumProdOperator.java | 12 +-
.../updateby/prod/ShortCumProdOperator.java | 12 +-
.../rollingsum/ShortRollingSumOperator.java | 225 ++++++++
.../impl/updateby/sum/ByteCumSumOperator.java | 12 +-
.../updateby/sum/DoubleCumSumOperator.java | 12 +-
.../updateby/sum/FloatCumSumOperator.java | 12 +-
.../impl/updateby/sum/IntCumSumOperator.java | 12 +-
.../impl/updateby/sum/LongCumSumOperator.java | 12 +-
.../updateby/sum/ShortCumSumOperator.java | 12 +-
.../table/impl/updateby/TestRollingSum.java | 504 ++++++++++++++++++
.../replicators/ReplicateUpdateBy.java | 6 +
.../api/updateby/spec/RollingSumSpec.java | 125 +++++
55 files changed, 3549 insertions(+), 499 deletions(-)
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByCumulativeOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByWindowedOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedByteUpdateByOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedCharUpdateByOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedIntUpdateByOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedLongUpdateByOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseWindowedShortUpdateByOperator.java
create mode 100644 engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/rollingsum/ShortRollingSumOperator.java
create mode 100644 engine/table/src/test/java/io/deephaven/engine/table/impl/updateby/TestRollingSum.java
create mode 100644 table-api/src/main/java/io/deephaven/api/updateby/spec/RollingSumSpec.java
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/BucketedUpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/BucketedUpdateBy.java
index 1bd6653c2a2..9e84f421231 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/BucketedUpdateBy.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/BucketedUpdateBy.java
@@ -9,7 +9,6 @@
import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.sized.SizedIntChunk;
import io.deephaven.chunk.sized.SizedLongChunk;
-import io.deephaven.configuration.Configuration;
import io.deephaven.engine.rowset.*;
import io.deephaven.engine.rowset.chunkattributes.OrderedRowKeys;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
@@ -690,7 +689,7 @@ private void accumulateIndexToReprocess(@NotNull final UpdateBySlotTracker.Updat
for (int opIdx = 0; opIdx < operators.length; opIdx++) {
opAffected[opIdx] |= bucketHasAddedOrRemoved;
if (opAffected[opIdx]) {
- operators[opIdx].resetForReprocess(opContext[opIdx], slotIndex, tracker.getSlot(), keyBefore);
+ operators[opIdx].resetForReprocessBucketed(opContext[opIdx], slotIndex, tracker.getSlot(), keyBefore);
}
}
}
@@ -808,7 +807,7 @@ public void processBucketedUpdates() {
localPermutedWorkingChunk,
fillContexts[slotPosition].get());
- operators[opIdx].reprocessChunk(opContext[opIdx],
+ operators[opIdx].reprocessChunkBucketed(opContext[opIdx],
chunkOk,
permuteRequired ? localPermutedWorkingChunk : localPostWorkingChunk,
permuteRequired ? localPermutedKeyChunk : localKeyChunk,
@@ -903,7 +902,7 @@ private void doAppendOnlyAdds(final boolean initialBuild, @NotNull final RowSet
localPermutedPostWorkingChunk,
fillContexts[slotPosition].get());
- operators[opIdx].addChunk(opContext[opIdx],
+ operators[opIdx].addChunkBucketed(opContext[opIdx],
permuteRequired ? localPermutedPostWorkingChunk : localPostWorkingChunk,
permuteRequired ? localPermutedKeys : localKeys,
localOutputPositions,
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateBy.java
index 7a92f970842..7c6fa0428f7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateBy.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateBy.java
@@ -2,8 +2,6 @@
import gnu.trove.map.hash.TObjectIntHashMap;
import io.deephaven.api.ColumnName;
-import io.deephaven.api.Selectable;
-import io.deephaven.api.Strings;
import io.deephaven.api.updateby.UpdateByOperation;
import io.deephaven.api.updateby.UpdateByControl;
import io.deephaven.chunk.Chunk;
@@ -243,7 +241,7 @@ private boolean shiftRedirectedKey(@NotNull final RowSet.SearchIterator iterator
public enum UpdateType {
/**
* Indicates that rows are being
- * {@link UpdateByOperator#addChunk(UpdateByOperator.UpdateContext, RowSequence, LongChunk, Chunk, long)} added}
+ * {@link UpdateByOperator#addChunkBucketed(UpdateByOperator.UpdateContext, RowSequence, LongChunk, Chunk, long)} added}
* to the operator.
*/
Add,
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByCumulativeOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByCumulativeOperator.java
new file mode 100644
index 00000000000..7e283e1a352
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByCumulativeOperator.java
@@ -0,0 +1,42 @@
+package io.deephaven.engine.table.impl;
+
+import io.deephaven.engine.rowset.RowSet;
+import io.deephaven.engine.rowset.RowSetFactory;
+import io.deephaven.engine.table.TableUpdate;
+import org.jetbrains.annotations.NotNull;
+
+public abstract class UpdateByCumulativeOperator implements UpdateByOperator {
+
+ public abstract class UpdateCumulativeContext implements UpdateContext {
+ // store the current subset of rows that need computation
+ protected RowSet affectedRows = RowSetFactory.empty();
+
+ public RowSet determineAffectedRows(@NotNull final TableUpdate upstream, @NotNull final RowSet source, final boolean upstreamAppendOnly) {
+ if (upstreamAppendOnly) {
+ // cumulative operators do not need to reprocess any rows on append-only updates
+ try (final RowSet ignored = affectedRows) {
+ affectedRows = RowSetFactory.empty();
+ }
+ return affectedRows;
+ }
+
+ long smallestModifiedKey = UpdateByOperator.smallestAffectedKey(upstream.added(), upstream.modified(), upstream.removed(), upstream.shifted(), source);
+
+ try (final RowSet ignored = affectedRows) {
+ affectedRows = smallestModifiedKey == Long.MAX_VALUE
+ ? RowSetFactory.empty()
+ : source.subSetByKeyRange(smallestModifiedKey, source.lastRowKey());
+ }
+ return affectedRows;
+ }
+
+ public RowSet getAffectedRows() {
+ return affectedRows;
+ }
+
+ @Override
+ public void close() {
+ affectedRows.close();
+ }
+ }
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
index 4655c6d4114..0063f85cc69 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperator.java
@@ -51,7 +51,7 @@
* Adds
*
* - {@link #initializeFor(UpdateContext, RowSet, UpdateBy.UpdateType)}
- * - {@link #addChunk(UpdateContext, RowSequence, LongChunk, Chunk, long)}
+ * - {@link #addChunkBucketed(UpdateContext, RowSequence, LongChunk, Chunk, long)}
* - {@link #finishFor(UpdateContext, UpdateBy.UpdateType)}
*
*
@@ -60,7 +60,7 @@
*
* - {@link #resetForReprocess(UpdateContext, RowSet, long)}
* - {@link #initializeFor(UpdateContext, RowSet, UpdateBy.UpdateType)}
- * - {@link #reprocessChunk(UpdateContext, RowSequence, Chunk, LongChunk, IntChunk, IntChunk, IntChunk)}
+ * - {@link #reprocessChunkBucketed(UpdateContext, RowSequence, Chunk, LongChunk, IntChunk, IntChunk, IntChunk)}
* - {@link #finishFor(UpdateContext, UpdateBy.UpdateType)}
*
*
@@ -168,32 +168,18 @@ static long smallestAffectedKey(@NotNull final RowSet added,
*/
interface UpdateContext extends SafeCloseable {
/**
- * Determine all the rows affected by the {@link TableUpdate} that need to be recomputed
+ * Determine all the rows affected by the {@link TableUpdate} that need to be reprocessed
*
* @param upstream the update
* @param source the rowset of the parent table (affected rows will be a subset)
*/
- default void determineAffectedRows(@NotNull final TableUpdate upstream, @NotNull final RowSet source,
- final boolean upstreamAppendOnly) {
- throw (new IllegalStateException("A raw UpdateContext cannot execute determineAffectedRows()"));
- }
+ RowSet determineAffectedRows(@NotNull final TableUpdate upstream, @NotNull final RowSet source,
+ final boolean upstreamAppendOnly);
/**
* Return the rows computed by the {@Code determineAffectedRows()}
*/
- default RowSet getAffectedRows() {
- throw (new IllegalStateException("A raw UpdateContext cannot execute getAffectedRows()"));
- }
-
- /**
- * Set all the rows that need to be computed by this operator
- *
- * @param affected the rowset of the parent table (affected is subset)
- */
- default void setAffectedRows(@NotNull final RowSet affected) {
- throw (new IllegalStateException("A raw UpdateContext cannot execute setAffectedRows()"));
-
- }
+ RowSet getAffectedRows();
}
/**
@@ -271,11 +257,11 @@ void initializeForUpdate(@NotNull final UpdateContext context,
*
*
* @param context the context object
- * @param updateIndex the index of rows associated with the update.
+ * @param updateRowSet the index of rows associated with the update.
* @param type the type of update being applied
*/
void initializeFor(@NotNull final UpdateContext context,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type);
/**
@@ -362,10 +348,10 @@ void initializeFor(@NotNull final UpdateContext context,
* @param bucketPosition the group position
*/
void addChunk(@NotNull final UpdateContext context,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- final long bucketPosition);
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ final long bucketPosition);
/**
* Add a chunk of bucketed items to the operation.
@@ -377,12 +363,12 @@ void addChunk(@NotNull final UpdateContext context,
* @param runLengths the runLengths of each run of bucket values
* @param startPositions the start position of a run within the chunk
*/
- void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths);
+ void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths);
/**
* Modify a chunk of values with the operation.
@@ -444,10 +430,10 @@ void applyOutputShift(@NotNull final UpdateContext context,
* @param postUpdateSourceIndex the resulting source index af
*/
void reprocessChunk(@NotNull final UpdateContext context,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex);
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex);
/**
* Reprocess a chunk of data for a bucketed table.
@@ -461,17 +447,17 @@ void reprocessChunk(@NotNull final UpdateContext context,
* @param runStartPositions the starting positions of each run within the key and value chunk
* @param runLengths the run runLengths of each run in the key and value chunks. Parallel to `runStartPositions`
*/
- void reprocessChunk(@NotNull final UpdateContext context,
- @NotNull final RowSequence inputKeys,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths);
+ void reprocessChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final RowSequence inputKeys,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths);
/**
* Reset the operator to the state at the `firstModifiedKey` for non-bucketed operation. This is invoked immediately
- * prior to calls to {@link #reprocessChunk(UpdateContext, RowSequence, LongChunk, Chunk, RowSet)}.
+ * prior to calls to {@link #resetForReprocess(UpdateContext, RowSet, long)}.
*
* A `firstUnmodifiedKey` of {@link RowSet#NULL_ROW_KEY} indicates that the entire table needs to be recomputed.
*
@@ -486,14 +472,14 @@ void resetForReprocess(@NotNull final UpdateContext context,
/**
* Reset the operator to the state at the `firstModifiedKey` for the specified bucket. This is invoked immediately
* prior to calls to
- * {@link #reprocessChunk(UpdateContext, RowSequence, Chunk, LongChunk, IntChunk, IntChunk, IntChunk)}.
+ * {@link #reprocessChunkBucketed(UpdateContext, RowSequence, Chunk, LongChunk, IntChunk, IntChunk, IntChunk)}.
*
* @param context the context object
* @param bucketIndex the current index of the specified bucket
* @param firstUnmodifiedKey the first unmodified key in the bucket after which we will reprocess rows.
*/
- void resetForReprocess(@NotNull final UpdateContext context,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey);
+ void resetForReprocessBucketed(@NotNull final UpdateContext context,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
index 6bd1bf379b4..b8003373386 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByOperatorFactory.java
@@ -11,7 +11,6 @@
import io.deephaven.engine.table.impl.updateby.ema.*;
import io.deephaven.engine.table.impl.updateby.fill.*;
import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
-import io.deephaven.engine.table.impl.updateby.internal.WindowMangerUpdateByOperator;
import io.deephaven.engine.table.impl.updateby.minmax.*;
import io.deephaven.engine.table.impl.updateby.prod.*;
import io.deephaven.engine.table.impl.updateby.rollingsum.ShortRollingSumOperator;
@@ -209,28 +208,22 @@ public Void visit(CumProdSpec p) {
@Override
public Void visit(@NotNull final RollingSumSpec rs) {
- final WindowMangerUpdateByOperator windowManager;
+ final LongRecordingUpdateByOperator timeStampRecorder;
final boolean isTimeBased = rs.prevTimeScale().isTimeBased();
-
final String timestampCol = rs.prevTimeScale().timestampCol();
+
if (isTimeBased) {
- windowManager = makeWindowManagerUpdateByOperator(source,
- timestampCol,
- rs.prevTimeScale().timescaleUnits(),
- rs.fwdTimeScale().timescaleUnits());
+ timeStampRecorder = makeLongRecordingOperator(source, timestampCol);
+ ops.add(timeStampRecorder);
} else {
- windowManager = makeWindowManagerUpdateByOperator(source,
- null,
- rs.prevTimeScale().timescaleUnits(),
- rs.fwdTimeScale().timescaleUnits());
+ timeStampRecorder = null;
}
- ops.add(windowManager);
Arrays.stream(pairs)
.filter(p -> !isTimeBased || !p.rightColumn().equals(timestampCol))
.map(fc -> makeRollingSumOperator(fc,
source,
- windowManager,
+ timeStampRecorder,
rs))
.forEach(ops::add);
return null;
@@ -396,54 +389,26 @@ private UpdateByOperator makeForwardFillOperator(MatchPair fc, TableWithDefaults
}
}
- private WindowMangerUpdateByOperator makeWindowManagerUpdateByOperator(TableWithDefaults source,
- String colName,
- final long reverseTimeScaleUnits,
- final long forwardTimeScaleUnits) {
- if (colName == null) {
- // not using a time source, this will manage our fixed offset calcs
- final String[] inputColumns = Arrays.stream(pairs).map(MatchPair::rightColumn).toArray(String[]::new);
- return new WindowMangerUpdateByOperator(colName, inputColumns, reverseTimeScaleUnits, forwardTimeScaleUnits, null);
- } else {
- final ColumnSource> columnSource = source.getColumnSource(colName);
- final Class> colType = columnSource.getType();
- if (colType != long.class &&
- colType != Long.class &&
- colType != DateTime.class &&
- colType != Instant.class &&
- !columnSource.allowsReinterpret(long.class)) {
- throw new IllegalArgumentException("Column " + colName + " cannot be interpreted as a long");
- }
-
- final String[] inputColumns = Stream.concat(Stream.of(colName),
- Arrays.stream(pairs).map(MatchPair::rightColumn)).toArray(String[]::new);
-
- return new WindowMangerUpdateByOperator(colName, inputColumns, reverseTimeScaleUnits, forwardTimeScaleUnits, columnSource);
- }
- }
-
private UpdateByOperator makeRollingSumOperator(@NotNull final MatchPair pair,
@NotNull final TableWithDefaults source,
- @NotNull final WindowMangerUpdateByOperator windowManager,
+ @Nullable final LongRecordingUpdateByOperator recorder,
@NotNull final RollingSumSpec rs) {
// noinspection rawtypes
final ColumnSource columnSource = source.getColumnSource(pair.rightColumn);
final Class> csType = columnSource.getType();
final String[] affectingColumns;
- if (!windowManager.isTimeBased()) {
+ if (recorder == null) {
affectingColumns = new String[] {pair.rightColumn};
} else {
affectingColumns = new String[] {rs.prevTimeScale().timestampCol(), pair.rightColumn};
}
- // use the correct units from the EmaSpec (depending on was Time or Tick based)
final long prevTimeScaleUnits = rs.prevTimeScale().timescaleUnits();
final long fwdTimeScaleUnits = rs.fwdTimeScale().timescaleUnits();
if (csType == short.class || csType == Short.class) {
- return new ShortRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), windowManager, prevTimeScaleUnits, fwdTimeScaleUnits,
- columnSource, rowRedirection);
+ return new ShortRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits, columnSource, rowRedirection);
}
// if (csType == Boolean.class || csType == boolean.class) {
// return new ByteRollingSumOperator(pair, affectingColumns, rs.controlOrDefault(), recorder, prevTimeScaleUnits, fwdTimeScaleUnits,
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByWindowedOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByWindowedOperator.java
new file mode 100644
index 00000000000..0760b44c8f8
--- /dev/null
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/UpdateByWindowedOperator.java
@@ -0,0 +1,221 @@
+package io.deephaven.engine.table.impl;
+
+import io.deephaven.api.updateby.OperationControl;
+import io.deephaven.engine.rowset.RowSet;
+import io.deephaven.engine.rowset.RowSetBuilderRandom;
+import io.deephaven.engine.rowset.RowSetFactory;
+import io.deephaven.engine.rowset.WritableRowSet;
+import io.deephaven.engine.table.MatchPair;
+import io.deephaven.engine.table.TableUpdate;
+import io.deephaven.engine.table.impl.updateby.internal.LongRecordingUpdateByOperator;
+import io.deephaven.engine.table.impl.util.RowRedirection;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Arrays;
+import java.util.List;
+
+public abstract class UpdateByWindowedOperator implements UpdateByOperator {
+ protected final OperationControl control;
+ protected final LongRecordingUpdateByOperator recorder;
+ protected final long reverseTimeScaleUnits;
+ protected final long forwardTimeScaleUnits;
+
+ protected final MatchPair pair;
+ protected final String[] affectingColumns;
+
+ protected final boolean isRedirected;
+
+
+ public abstract class UpdateWindowedContext implements UpdateContext {
+ // store the current subset of rows that need computation
+ protected RowSet affectedRows = RowSetFactory.empty();
+
+ /***
+ * This function is only correct if the proper {@code source} rowset is provided. If using buckets, then the
+ * provided rowset must be limited to the rows in the current bucket
+ * only
+ *
+ * @param upstream the update
+ * @param source the rowset of the parent table (affected rows will be a subset)
+ * @param upstreamAppendOnly
+ */
+ public RowSet determineAffectedRows(@NotNull final TableUpdate upstream, @NotNull final RowSet source,
+ final boolean upstreamAppendOnly) {
+
+ // NOTE: this is fast rather than bounding to the smallest set possible (i.e. unaware of buckets and
+ // over-represents sparse data). Will result in computing far more than actually necessary
+
+ // TODO: return the minimal set of data for this update
+
+ RowSetBuilderRandom builder = RowSetFactory.builderRandom();
+
+ if (upstream.removed().isNonempty()) {
+ // need removes in post-shift space to determine rows to recompute
+ try (final WritableRowSet shiftedRemoves = upstream.removed().copy()) {
+ upstream.shifted().apply(shiftedRemoves);
+
+ builder.addRange(computeFirstAffectedKey(shiftedRemoves.firstRowKey(), source),
+ computeLastAffectedKey(shiftedRemoves.lastRowKey(), source));
+ }
+ }
+
+ if (upstream.added().isNonempty()) {
+ // all the new rows need computed
+ builder.addRowSet(upstream.added());
+
+ // add the rows affected by the adds
+ builder.addRange(computeFirstAffectedKey(upstream.added().firstRowKey(), source),
+ computeLastAffectedKey(upstream.added().lastRowKey(), source));
+ }
+
+ if (upstream.modified().isNonempty()) {
+ // TODO: make this more efficient
+ final List cols = List.of(affectingColumns);
+ boolean modifiedAffectingColumn = Arrays.stream(upstream.modifiedColumnSet().dirtyColumnNames()).anyMatch(cols::contains);
+
+ if (modifiedAffectingColumn) {
+ // add the rows affected by the mods
+ builder.addRange(computeFirstAffectedKey(upstream.modified().firstRowKey(), source),
+ computeLastAffectedKey(upstream.modified().lastRowKey(), source));
+ }
+ }
+
+ try (final RowSet ignored = affectedRows) {
+ affectedRows = builder.build();
+ }
+ return affectedRows;
+ }
+
+ public RowSet getAffectedRows() {
+ return affectedRows;
+ }
+
+ @Override
+ public void close() {
+ affectedRows.close();
+ }
+ }
+
+ /**
+ * An operator that computes a windowed operation from a column
+ *
+ * @param pair the {@link MatchPair} that defines the input/output for this operation
+ * @param affectingColumns the names of the columns that affect this operation
+ * @param control the control parameters for operation
+ * @param timeRecorder an optional recorder for a timestamp column. If this is null, it will be assumed time is
+ * measured in integer ticks.
+ * @param rowRedirection the row redirection to use for the operation
+ */
+ public UpdateByWindowedOperator(@NotNull final MatchPair pair,
+ @NotNull final String[] affectingColumns,
+ @NotNull final OperationControl control,
+ @Nullable final LongRecordingUpdateByOperator timeRecorder,
+ final long reverseTimeScaleUnits,
+ final long forwardTimeScaleUnits,
+ @Nullable final RowRedirection rowRedirection) {
+ this.pair = pair;
+ this.affectingColumns = affectingColumns;
+ this.control = control;
+ this.recorder = timeRecorder;
+ this.reverseTimeScaleUnits = reverseTimeScaleUnits;
+ this.forwardTimeScaleUnits = forwardTimeScaleUnits;
+ this.isRedirected = rowRedirection != null;
+ }
+
+ // return the first row that affects this key
+ public long computeFirstAffectingKey(long key, @NotNull final RowSet source) {
+
+ if (recorder == null) {
+ // ticks
+ final long keyPos = source.find(key);
+ final long idx = keyPos - (long) reverseTimeScaleUnits + 1;
+ if (idx < 0) {
+ return source.firstRowKey();
+ }
+ return source.get(idx);
+ }
+ return -1;
+ }
+
+ // return the last row that affects this key
+ public long computeLastAffectingKey(long key, @NotNull final RowSet source) {
+ if (recorder == null) {
+ // ticks
+ final long keyPos = source.find(key);
+ final long idx = keyPos + (long)forwardTimeScaleUnits;
+ if (idx >= source.size()) {
+ return source.lastRowKey();
+ }
+ return source.get(idx);
+ }
+ return -1;
+ }
+
+ // return the first row affected by this key
+ public long computeFirstAffectedKey(long key, @NotNull final RowSet source) {
+ if (recorder == null) {
+ // ticks
+ final long keyPos = source.find(key);
+ final long idx = keyPos - (long) forwardTimeScaleUnits;
+ if (idx >= source.size()) {
+ return source.lastRowKey();
+ }
+ return source.get(idx);
+ }
+ return -1;
+ }
+
+ // return the last row affected by this key
+ public long computeLastAffectedKey(long key, @NotNull final RowSet source) {
+ if (recorder == null) {
+ // ticks
+ final long keyPos = source.find(key);
+ final long idx = keyPos + (long) reverseTimeScaleUnits;
+ if (idx >= source.size()) {
+ return source.lastRowKey();
+ }
+ return source.get(idx);
+ }
+ return -1;
+ }
+
+ /***
+ * Windowed operators must always reprocess
+ */
+ @Override
+ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
+ return false;
+ }
+
+ @NotNull
+ @Override
+ public String getInputColumnName() {
+ return pair.rightColumn;
+ }
+
+ @NotNull
+ @Override
+ public String[] getAffectingColumnNames() {
+ return affectingColumns;
+ }
+
+ @NotNull
+ @Override
+ public String[] getOutputColumnNames() {
+ return new String[] { pair.leftColumn };
+ }
+
+ @Override
+ public boolean requiresKeys() {
+ return true;
+ }
+
+ @Override
+ public void setBucketCapacity(int capacity) {
+ }
+
+ @Override
+ public void onBucketsRemoved(@NotNull final RowSet removedBuckets) {
+ }
+}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
index 377cd9f303b..28412dfb88c 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/ZeroKeyUpdateBy.java
@@ -183,8 +183,7 @@ private class UpdateContext implements SafeCloseable {
postWorkingChunks[slotPosition].ensureCapacity(chunkSize);
// Note that these don't participate in setChunkSize() because nothing will use them. If that
- // changes
- // then these must also become SizedSafeCloseables.
+ // changes then these must also become SizedSafeCloseables.
if (hasModifies) {
prevFillContexts[slotPosition] =
inputSources[opIdx].makeFillContext(chunkSize, prevSharedContext);
@@ -203,11 +202,12 @@ private class UpdateContext implements SafeCloseable {
if (!opAffected[opIdx]) {
continue;
}
- // trigger the operator to determine its own set of affected rows (window-specific)
- opContext[opIdx].determineAffectedRows(upstream, source.getRowSet(), upstreamAppendOnly);
+ // trigger the operator to determine its own set of affected rows (window-specific), do not close()
+ // since this is managed by the operator context
+ final RowSet rs = opContext[opIdx].determineAffectedRows(upstream, source.getRowSet(), upstreamAppendOnly);
// union the operator rowsets together to get a global set
- tmp.insert(opContext[opIdx].getAffectedRows());
+ tmp.insert(rs);
}
affectedRows = tmp;
}
@@ -422,7 +422,6 @@ void doUpdate(@NotNull final RowSet updateRowSet,
* Locate the smallest key that requires reprocessing and then replay the table from that point
*/
private void reprocessRows(RowSetShiftData shifted) {
- // Get a sub-index of the source from that minimum reprocessing index and make sure we update our
// Get a sub-index of the source from that minimum reprocessing index and make sure we update our
// contextual chunks and FillContexts to an appropriate size for this step.
final RowSet sourceRowSet = source.getRowSet();
@@ -463,12 +462,12 @@ private void reprocessRows(RowSetShiftData shifted) {
/**
* Prepare the specified chunk for use.
*
- * @param opRowSet the operator index
+ * @param opIdx the operator index
* @param usePrev if previous values should be fetched
* @param chunkOk the {@link RowSequence} for current values
* @param prevChunkOk the {@link RowSequence} for previous values.
*/
- private void prepareValuesChunkFor(final int opRowSet,
+ private void prepareValuesChunkFor(final int opIdx,
final int inputSlot,
final boolean usePrev,
final boolean useCurrent,
@@ -478,20 +477,20 @@ private void prepareValuesChunkFor(final int opRowSet,
final WritableChunk postWorkingChunk,
final ChunkSource.FillContext prevFillContext,
final ChunkSource.FillContext postFillContext) {
- if (!operators[opRowSet].requiresValues(opContext[opRowSet])) {
+ if (!operators[opIdx].requiresValues(opContext[opIdx])) {
return;
}
if (!inputChunkPopulated[inputSlot]) {
- // Using opRowSet below is OK, because if we are sharing an input slot, we are referring to the same
- // input source. Instead of maintaining another array of sourced by slot, we'll just use the opRowSet
+ // Using opIdx below is OK, because if we are sharing an input slot, we are referring to the same
+ // input source. Instead of maintaining another array of sourced by slot, we'll just use the opIdx
inputChunkPopulated[inputSlot] = true;
if (usePrev) {
- inputSources[opRowSet].fillPrevChunk(prevFillContext, prevWorkingChunk, prevChunkOk);
+ inputSources[opIdx].fillPrevChunk(prevFillContext, prevWorkingChunk, prevChunkOk);
}
if (useCurrent) {
- inputSources[opRowSet].fillChunk(postFillContext, postWorkingChunk, chunkOk);
+ inputSources[opIdx].fillChunk(postFillContext, postWorkingChunk, chunkOk);
}
}
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BasePrimitiveEMAOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BasePrimitiveEMAOperator.java
index 5dec7ff97bd..273d71da9d8 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BasePrimitiveEMAOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BasePrimitiveEMAOperator.java
@@ -115,9 +115,9 @@ public void initializeForUpdate(@NotNull final UpdateByOperator.UpdateContext co
@Override
public void initializeFor(@NotNull final UpdateByOperator.UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
- super.initializeFor(updateContext, updateIndex, type);
+ super.initializeFor(updateContext, updateRowSet, type);
((EmaContext) updateContext).lastStamp = NULL_LONG;
}
@@ -159,12 +159,12 @@ protected void doAddChunk(@NotNull final Context context,
}
@Override
- public void addChunk(@NotNull final UpdateByOperator.UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateByOperator.UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final EmaContext ctx = (EmaContext) context;
for (int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
final int runStart = startPositions.get(runIdx);
@@ -220,10 +220,10 @@ public void resetForReprocess(@NotNull final UpdateByOperator.UpdateContext cont
}
@Override
- public void resetForReprocess(@NotNull final UpdateByOperator.UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateByOperator.UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final double previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? QueryConstants.NULL_DOUBLE
: outputSource.getDouble(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BigNumberEMAOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BigNumberEMAOperator.java
index a0b93df7f53..747934a1d75 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BigNumberEMAOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/ema/BigNumberEMAOperator.java
@@ -126,9 +126,9 @@ public void initializeForUpdate(@NotNull UpdateContext context,
@SuppressWarnings("unchecked")
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
- super.initializeFor(updateContext, updateIndex, type);
+ super.initializeFor(updateContext, updateRowSet, type);
((EmaContext) updateContext).lastStamp = NULL_LONG;
}
@@ -161,12 +161,12 @@ protected void doAddChunk(@NotNull final Context updateContext,
}
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ObjectChunk asObjects = values.asObjectChunk();
// noinspection unchecked
final EmaContext ctx = (EmaContext) context;
@@ -223,10 +223,10 @@ public void resetForReprocess(@NotNull final UpdateContext context,
}
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final BigDecimal previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? null : outputSource.get(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/BooleanFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/BooleanFillByOperator.java
index 8caedbe4f9e..b6525a5c469 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/BooleanFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/BooleanFillByOperator.java
@@ -63,12 +63,12 @@ public Map> getOutputColumns() {
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ByteChunk asBooleans = values.asByteChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ByteFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ByteFillByOperator.java
index 918886a02ac..ccf2dfbc3f7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ByteFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ByteFillByOperator.java
@@ -37,12 +37,12 @@ public ByteFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ByteChunk asBytes = values.asByteChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/CharFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/CharFillByOperator.java
index c5d03ac7cde..8e78172c311 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/CharFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/CharFillByOperator.java
@@ -32,12 +32,12 @@ public CharFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final CharChunk asChars = values.asCharChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/DoubleFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/DoubleFillByOperator.java
index 9d836da5ab2..9664c60eab9 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/DoubleFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/DoubleFillByOperator.java
@@ -37,12 +37,12 @@ public DoubleFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final DoubleChunk asDoubles = values.asDoubleChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/FloatFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/FloatFillByOperator.java
index c2dd0a4303a..5c91e5596fc 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/FloatFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/FloatFillByOperator.java
@@ -37,12 +37,12 @@ public FloatFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final FloatChunk asFloats = values.asFloatChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/IntFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/IntFillByOperator.java
index 6b14668b53e..0a508c13678 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/IntFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/IntFillByOperator.java
@@ -37,12 +37,12 @@ public IntFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final IntChunk asInts = values.asIntChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/LongFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/LongFillByOperator.java
index c53d0a64217..366c7d718e7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/LongFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/LongFillByOperator.java
@@ -58,12 +58,12 @@ public Map> getOutputColumns() {
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final LongChunk asLongs = values.asLongChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ObjectFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ObjectFillByOperator.java
index 8daf5b4e2b2..4b530b4b863 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ObjectFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ObjectFillByOperator.java
@@ -37,12 +37,12 @@ public ObjectFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ObjectChunk asObjects = values.asObjectChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ShortFillByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ShortFillByOperator.java
index d895abda7e0..28be54315bb 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ShortFillByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/fill/ShortFillByOperator.java
@@ -37,12 +37,12 @@ public ShortFillByOperator(@NotNull final MatchPair fillPair,
// endregion extra-methods
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ShortChunk asShorts = values.asShortChunk();
final Context ctx = (Context) context;
for(int runIdx = 0; runIdx < startPositions.size(); runIdx++) {
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
index d9682e0e7c7..132298c3a96 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseByteUpdateByOperator.java
@@ -5,6 +5,11 @@
*/
package io.deephaven.engine.table.impl.updateby.internal;
+import io.deephaven.util.QueryConstants;
+import io.deephaven.engine.table.impl.sources.ByteArraySource;
+import io.deephaven.engine.table.impl.sources.ByteSparseArraySource;
+import io.deephaven.engine.table.WritableColumnSource;
+
import io.deephaven.chunk.Chunk;
import io.deephaven.chunk.IntChunk;
import io.deephaven.chunk.LongChunk;
@@ -18,7 +23,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -205,7 +209,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = nullValue;
@@ -268,10 +272,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -319,34 +323,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final byte previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? nullValue : outputSource.getByte(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
index fe6c7f1780e..217e2324016 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseCharUpdateByOperator.java
@@ -13,7 +13,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -186,7 +185,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = QueryConstants.NULL_CHAR;
@@ -249,10 +248,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -298,34 +297,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final char previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? QueryConstants.NULL_CHAR : outputSource.getChar(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
index 3a0b7e6d278..5850d36c7e3 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseDoubleUpdateByOperator.java
@@ -18,7 +18,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.DoubleArraySource;
import io.deephaven.engine.table.impl.sources.DoubleSparseArraySource;
import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource;
@@ -238,10 +237,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -278,34 +277,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketRowSet,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketRowSet,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final double previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? NULL_DOUBLE : outputSource.getDouble(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceRowSet) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceRowSet) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
index 8e3dc96020d..6654c974852 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseFloatUpdateByOperator.java
@@ -13,7 +13,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.FloatArraySource;
import io.deephaven.engine.table.impl.sources.FloatSparseArraySource;
import io.deephaven.engine.table.impl.sources.WritableRedirectedColumnSource;
@@ -233,10 +232,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -273,34 +272,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketRowSet,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketRowSet,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final float previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? NULL_FLOAT : outputSource.getFloat(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceRowSet) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceRowSet) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
index 4b5c875a0f2..7f8ffa847a7 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseIntUpdateByOperator.java
@@ -18,7 +18,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -191,7 +190,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = QueryConstants.NULL_INT;
@@ -254,10 +253,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -303,34 +302,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final int previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? QueryConstants.NULL_INT : outputSource.getInt(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
index d1b13b1f1ec..db99a87c21e 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseLongUpdateByOperator.java
@@ -18,7 +18,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -191,7 +190,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = QueryConstants.NULL_LONG;
@@ -254,10 +253,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -303,34 +302,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final long previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? QueryConstants.NULL_LONG : outputSource.getLong(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectBinaryOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectBinaryOperator.java
index d960dc4cd27..687bb30c7e4 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectBinaryOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectBinaryOperator.java
@@ -24,12 +24,12 @@ public BaseObjectBinaryOperator(@NotNull final Class type,
// region Addition
@Override
- public void addChunk(@NotNull final UpdateContext context,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk startPositions,
- @NotNull final IntChunk runLengths) {
+ public void addChunkBucketed(@NotNull final UpdateContext context,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk startPositions,
+ @NotNull final IntChunk runLengths) {
final ObjectChunk asObject = values.asObjectChunk();
//noinspection unchecked
final Context ctx = (Context) context;
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
index e9a9c6fe1f5..affaeeb8d29 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseObjectUpdateByOperator.java
@@ -18,7 +18,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -194,7 +193,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = null;
@@ -257,10 +256,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -306,34 +305,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final T previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? null : outputSource.get(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk valuesChunk,
- @NotNull final RowSet postUpdateSourceIndex) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk valuesChunk,
+ @NotNull final RowSet postUpdateSourceIndex) {
final Context ctx = (Context) updateContext;
doAddChunk(ctx, inputKeys, valuesChunk, 0);
ctx.getModifiedBuilder().appendRowSequence(inputKeys);
}
@Override
- public void reprocessChunk(@NotNull UpdateContext updateContext,
- @NotNull final RowSequence chunkOk,
- @NotNull final Chunk values,
- @NotNull final LongChunk extends RowKeys> keyChunk,
- @NotNull final IntChunk bucketPositions,
- @NotNull final IntChunk runStartPositions,
- @NotNull final IntChunk runLengths) {
- addChunk(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
+ public void reprocessChunkBucketed(@NotNull UpdateContext updateContext,
+ @NotNull final RowSequence chunkOk,
+ @NotNull final Chunk values,
+ @NotNull final LongChunk extends RowKeys> keyChunk,
+ @NotNull final IntChunk bucketPositions,
+ @NotNull final IntChunk runStartPositions,
+ @NotNull final IntChunk runLengths) {
+ addChunkBucketed(updateContext, values, keyChunk, bucketPositions, runStartPositions, runLengths);
((Context)updateContext).getModifiedBuilder().appendRowSequence(chunkOk);
}
diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
index 233124ffbee..169d51bbfc9 100644
--- a/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
+++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/updateby/internal/BaseShortUpdateByOperator.java
@@ -18,7 +18,6 @@
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.UpdateBy;
import io.deephaven.engine.table.impl.UpdateByCumulativeOperator;
-import io.deephaven.engine.table.impl.UpdateByOperator;
import io.deephaven.engine.table.impl.sources.*;
import io.deephaven.engine.table.impl.util.RowRedirection;
import io.deephaven.engine.table.impl.util.SizedSafeCloseable;
@@ -191,7 +190,7 @@ public void initializeForUpdate(@NotNull final UpdateContext context,
@Override
public void initializeFor(@NotNull final UpdateContext updateContext,
- @NotNull final RowSet updateIndex,
+ @NotNull final RowSet updateRowSet,
@NotNull final UpdateBy.UpdateType type) {
((Context)updateContext).currentUpdateType = type;
((Context)updateContext).curVal = QueryConstants.NULL_SHORT;
@@ -254,10 +253,10 @@ public boolean canProcessNormalUpdate(@NotNull UpdateContext context) {
// region Addition
@Override
public void addChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk values,
- long bucketPosition) {
+ @NotNull final RowSequence inputKeys,
+ @Nullable final LongChunk keyChunk,
+ @NotNull final Chunk values,
+ long bucketPosition) {
final Context ctx = (Context) updateContext;
if (ctx.canProcessDirectly) {
doAddChunk(ctx, inputKeys, values, bucketPosition);
@@ -303,34 +302,34 @@ public void resetForReprocess(@NotNull final UpdateContext context,
@Override
- public void resetForReprocess(@NotNull final UpdateContext ctx,
- @NotNull final RowSet bucketIndex,
- final long bucketPosition,
- final long firstUnmodifiedKey) {
+ public void resetForReprocessBucketed(@NotNull final UpdateContext ctx,
+ @NotNull final RowSet bucketIndex,
+ final long bucketPosition,
+ final long firstUnmodifiedKey) {
final short previousVal = firstUnmodifiedKey == NULL_ROW_KEY ? QueryConstants.NULL_SHORT : outputSource.getShort(firstUnmodifiedKey);
bucketLastVal.set(bucketPosition, previousVal);
}
@Override
public void reprocessChunk(@NotNull final UpdateContext updateContext,
- @NotNull final RowSequence inputKeys,
- @Nullable final LongChunk keyChunk,
- @NotNull final Chunk