Skip to content

Commit

Permalink
UpdateBy config changes in support of gRPC (#2633)
Browse files Browse the repository at this point in the history
  • Loading branch information
devinrsmith authored Jul 11, 2022
1 parent 9a160ed commit c4415bc
Show file tree
Hide file tree
Showing 18 changed files with 383 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ private class BucketedContext implements SafeCloseable {
BucketedContext(@NotNull final TableUpdate upstream,
@NotNull final ModifiedColumnSet keyModifiedColumnSet,
@Nullable final ModifiedColumnSet[] inputModifiedColumnSets) {
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacity());
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacityOrDefault());

this.inputChunkPopulated = new boolean[operators.length];
this.keysModified = upstream.modifiedColumnSet().containsAny(keyModifiedColumnSet);
this.chunkSize = UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacity());
this.chunkSize =
UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacityOrDefault());
this.opAffected = new boolean[operators.length];
// noinspection unchecked
this.fillContexts = new SizedSafeCloseable[operators.length];
Expand Down Expand Up @@ -722,7 +723,7 @@ public void processBucketedUpdates() {
}

try (final RowSequence.Iterator okIt = modifiedBucketIndex.getRowSequenceIterator()) {
final int newChunkSize = (int) Math.min(control.chunkCapacity(), modifiedBucketIndex.size());
final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), modifiedBucketIndex.size());
setChunkSize(newChunkSize);
initializeFor(modifiedBucketIndex, UpdateType.Reprocess);

Expand Down Expand Up @@ -957,7 +958,7 @@ private class GroupedContext implements SafeCloseable {
final WritableChunk<Values>[] postWorkingChunks;

GroupedContext(final TableUpdate upstream) {
this.chunkSize = Math.min((int) source.size(), control.chunkCapacity());
this.chunkSize = Math.min((int) source.size(), control.chunkCapacityOrDefault());
this.inputChunkPopulated = new boolean[operators.length];
this.fillContexts = new ChunkSource.FillContext[operators.length];
this.opContext = new UpdateByOperator.UpdateContext[operators.length];
Expand Down Expand Up @@ -1068,20 +1069,20 @@ private BucketedUpdateBy(@NotNull final UpdateByOperator[] operators,
this.keySources = keySources;

if (source.isRefreshing() && !source.isAddOnly()) {
final int hashTableSize = control.initialHashTableSize();
slotTracker = new UpdateBySlotTracker(control.chunkCapacity());
final int hashTableSize = control.initialHashTableSizeOrDefault();
slotTracker = new UpdateBySlotTracker(control.chunkCapacityOrDefault());
this.hashTable = new IncrementalUpdateByStateManager(keySources,
hashTableSize,
control.maximumLoadFactor(),
control.targetLoadFactor());
control.maximumLoadFactorOrDefault(),
control.targetLoadFactorOrDefault());
} else {
slotTracker = null;
if (!useGrouping) {
final int hashTableSize = control.initialHashTableSize();
final int hashTableSize = control.initialHashTableSizeOrDefault();
this.hashTable = new AddOnlyUpdateByStateManager(keySources,
hashTableSize,
control.maximumLoadFactor(),
control.targetLoadFactor());
control.maximumLoadFactorOrDefault(),
control.targetLoadFactorOrDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ public static Table updateBy(@NotNull final QueryTable source,
@NotNull final UpdateByControl control) {

WritableRowRedirection rowRedirection = null;
if (control.useRedirection()) {
if (control.useRedirectionOrDefault()) {
if (!source.isRefreshing()) {
if (!source.isFlat() && SparseConstants.sparseStructureExceedsOverhead(source.getRowSet(),
control.maxStaticSparseMemoryOverhead())) {
control.maxStaticSparseMemoryOverheadOrDefault())) {
rowRedirection = new InverseRowRedirectionImpl(source.getRowSet());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,28 @@ private UpdateByOperator makeEmaOperator(@NotNull final MatchPair pair,
final long timeScaleUnits = ema.timeScale().timescaleUnits();

if (csType == byte.class || csType == Byte.class) {
return new ByteEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new ByteEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == short.class || csType == Short.class) {
return new ShortEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new ShortEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == int.class || csType == Integer.class) {
return new IntEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new IntEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == long.class || csType == Long.class) {
return new LongEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new LongEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == float.class || csType == Float.class) {
return new FloatEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new FloatEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == double.class || csType == Double.class) {
return new DoubleEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new DoubleEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new BigDecimalEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
} else if (csType == BigInteger.class) {
return new BigIntegerEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new BigIntegerEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
}

Expand Down Expand Up @@ -285,7 +285,7 @@ private UpdateByOperator makeCumProdOperator(MatchPair fc, TableWithDefaults sou
} else if (csType == double.class || csType == Double.class) {
return new DoubleCumProdOperator(fc, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalCumProdOperator(fc, rowRedirection, control.mathContext());
return new BigDecimalCumProdOperator(fc, rowRedirection, control.mathContextOrDefault());
} else if (csType == BigInteger.class) {
return new BigIntegerCumProdOperator(fc, rowRedirection);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ private UpdateByOperator makeCumSumOperator(MatchPair fc, TableWithDefaults sour
} else if (csType == double.class || csType == Double.class) {
return new DoubleCumSumOperator(fc, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalCumSumOperator(fc, rowRedirection, control.mathContext());
return new BigDecimalCumSumOperator(fc, rowRedirection, control.mathContextOrDefault());
} else if (csType == BigInteger.class) {
return new BigIntegerCumSumOperator(fc, rowRedirection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ private class UpdateContext implements SafeCloseable {
UpdateContext(@NotNull final TableUpdate upstream,
@Nullable final ModifiedColumnSet[] inputModifiedColumnSets,
final boolean isInitializeStep) {
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacity());
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacityOrDefault());

this.chunkSize = UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacity());
this.chunkSize =
UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacityOrDefault());
this.opAffected = new boolean[operators.length];
// noinspection unchecked
this.fillContexts = new SizedSafeCloseable[operators.length];
Expand Down Expand Up @@ -413,7 +414,7 @@ private void reprocessRows(RowSetShiftData shifted) {
final RowSet sourceRowSet = source.getRowSet();
try (final RowSet indexToReprocess =
sourceRowSet.subSetByKeyRange(smallestModifiedKey, sourceRowSet.lastRowKey())) {
final int newChunkSize = (int) Math.min(control.chunkCapacity(), indexToReprocess.size());
final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), indexToReprocess.size());
setChunkSize(newChunkSize);

final long keyBefore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,25 @@ void handleBadData(@NotNull final EmaContext ctx,
final boolean isNullTime) {
boolean doReset = false;
if (isNull) {
if (control.onNullValue() == BadDataBehavior.Throw) {
if (control.onNullValueOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered null value during EMA processing");
}
doReset = control.onNullValue() == BadDataBehavior.Reset;
doReset = control.onNullValueOrDefault() == BadDataBehavior.RESET;
} else if (isNan) {
if (control.onNanValue() == BadDataBehavior.Throw) {
if (control.onNanValueOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered NaN value during EMA processing");
} else if (control.onNanValue() == BadDataBehavior.Poison) {
} else if (control.onNanValueOrDefault() == BadDataBehavior.POISON) {
ctx.curVal = Double.NaN;
} else {
doReset = control.onNanValue() == BadDataBehavior.Reset;
doReset = control.onNanValueOrDefault() == BadDataBehavior.RESET;
}
}

if (isNullTime) {
if (control.onNullTime() == BadDataBehavior.Throw) {
if (control.onNullTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered null timestamp during EMA processing");
}
doReset = control.onNullTime() == BadDataBehavior.Reset;
doReset = control.onNullTimeOrDefault() == BadDataBehavior.RESET;
}

if (doReset) {
Expand All @@ -318,15 +318,15 @@ void handleBadData(@NotNull final EmaContext ctx,
void handleBadTime(@NotNull final EmaContext ctx, final long dt) {
boolean doReset = false;
if (dt == 0) {
if (control.onZeroDeltaTime() == BadDataBehavior.Throw) {
if (control.onZeroDeltaTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered zero delta time during EMA processing");
}
doReset = control.onZeroDeltaTime() == BadDataBehavior.Reset;
doReset = control.onZeroDeltaTimeOrDefault() == BadDataBehavior.RESET;
} else if (dt < 0) {
if (control.onNegativeDeltaTime() == BadDataBehavior.Throw) {
if (control.onNegativeDeltaTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered negative delta time during EMA processing");
}
doReset = control.onNegativeDeltaTime() == BadDataBehavior.Reset;
doReset = control.onNegativeDeltaTimeOrDefault() == BadDataBehavior.RESET;
}

if (doReset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ void computeWithTicks(final EmaContext ctx,
if (ctx.curVal == null) {
ctx.curVal = input;
} else {
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
.add(input.multiply(ctx.oneMinusAlpha, control.bigValueContext()),
control.bigValueContext());
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(input.multiply(ctx.oneMinusAlpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
}
}

Expand Down Expand Up @@ -90,11 +90,11 @@ void computeWithTime(final EmaContext ctx,
handleBadTime(ctx, dt);
} else {
ctx.alpha = BigDecimal.valueOf(Math.exp(-dt / timeScaleUnits));
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(input.multiply(
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()),
control.bigValueContext()),
control.bigValueContext());
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
ctx.lastStamp = timestamp;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ void computeWithTicks(final EmaContext ctx,
if(input == null) {
handleBadData(ctx, true, false);
} else {
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContext());
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContextOrDefault());
if(ctx.curVal == null) {
ctx.curVal = decimalInput;
} else {
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(decimalInput.multiply(
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()),
control.bigValueContext()),
control.bigValueContext());
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
}
}

Expand All @@ -86,7 +86,7 @@ void computeWithTime(final EmaContext ctx,
if(isNull || isNullTime) {
handleBadData(ctx, isNull, isNullTime);
} else {
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContext());
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContextOrDefault());
if(ctx.curVal == null) {
ctx.curVal = decimalInput;
ctx.lastStamp = timestamp;
Expand All @@ -96,9 +96,9 @@ void computeWithTime(final EmaContext ctx,
handleBadTime(ctx, dt);
} else {
ctx.alpha = BigDecimal.valueOf(Math.exp(-dt / timeScaleUnits));
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
.add(decimalInput.multiply(BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()), control.bigValueContext()),
control.bigValueContext());
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(decimalInput.multiply(BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()), control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
ctx.lastStamp = timestamp;
}
}
Expand Down
Loading

0 comments on commit c4415bc

Please sign in to comment.