Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UpdateBy config changes in support of gRPC #2633

Merged
merged 2 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,12 @@ private class BucketedContext implements SafeCloseable {
BucketedContext(@NotNull final TableUpdate upstream,
@NotNull final ModifiedColumnSet keyModifiedColumnSet,
@Nullable final ModifiedColumnSet[] inputModifiedColumnSets) {
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacity());
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacityOrDefault());

this.inputChunkPopulated = new boolean[operators.length];
this.keysModified = upstream.modifiedColumnSet().containsAny(keyModifiedColumnSet);
this.chunkSize = UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacity());
this.chunkSize =
UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacityOrDefault());
this.opAffected = new boolean[operators.length];
// noinspection unchecked
this.fillContexts = new SizedSafeCloseable[operators.length];
Expand Down Expand Up @@ -722,7 +723,7 @@ public void processBucketedUpdates() {
}

try (final RowSequence.Iterator okIt = modifiedBucketIndex.getRowSequenceIterator()) {
final int newChunkSize = (int) Math.min(control.chunkCapacity(), modifiedBucketIndex.size());
final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), modifiedBucketIndex.size());
setChunkSize(newChunkSize);
initializeFor(modifiedBucketIndex, UpdateType.Reprocess);

Expand Down Expand Up @@ -957,7 +958,7 @@ private class GroupedContext implements SafeCloseable {
final WritableChunk<Values>[] postWorkingChunks;

GroupedContext(final TableUpdate upstream) {
this.chunkSize = Math.min((int) source.size(), control.chunkCapacity());
this.chunkSize = Math.min((int) source.size(), control.chunkCapacityOrDefault());
this.inputChunkPopulated = new boolean[operators.length];
this.fillContexts = new ChunkSource.FillContext[operators.length];
this.opContext = new UpdateByOperator.UpdateContext[operators.length];
Expand Down Expand Up @@ -1068,20 +1069,20 @@ private BucketedUpdateBy(@NotNull final UpdateByOperator[] operators,
this.keySources = keySources;

if (source.isRefreshing() && !source.isAddOnly()) {
final int hashTableSize = control.initialHashTableSize();
slotTracker = new UpdateBySlotTracker(control.chunkCapacity());
final int hashTableSize = control.initialHashTableSizeOrDefault();
slotTracker = new UpdateBySlotTracker(control.chunkCapacityOrDefault());
this.hashTable = new IncrementalUpdateByStateManager(keySources,
hashTableSize,
control.maximumLoadFactor(),
control.targetLoadFactor());
control.maximumLoadFactorOrDefault(),
control.targetLoadFactorOrDefault());
} else {
slotTracker = null;
if (!useGrouping) {
final int hashTableSize = control.initialHashTableSize();
final int hashTableSize = control.initialHashTableSizeOrDefault();
this.hashTable = new AddOnlyUpdateByStateManager(keySources,
hashTableSize,
control.maximumLoadFactor(),
control.targetLoadFactor());
control.maximumLoadFactorOrDefault(),
control.targetLoadFactorOrDefault());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ public static Table updateBy(@NotNull final QueryTable source,
@NotNull final UpdateByControl control) {

WritableRowRedirection rowRedirection = null;
if (control.useRedirection()) {
if (control.useRedirectionOrDefault()) {
if (!source.isRefreshing()) {
if (!source.isFlat() && SparseConstants.sparseStructureExceedsOverhead(source.getRowSet(),
control.maxStaticSparseMemoryOverhead())) {
control.maxStaticSparseMemoryOverheadOrDefault())) {
rowRedirection = new InverseRowRedirectionImpl(source.getRowSet());
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,28 +225,28 @@ private UpdateByOperator makeEmaOperator(@NotNull final MatchPair pair,
final long timeScaleUnits = ema.timeScale().timescaleUnits();

if (csType == byte.class || csType == Byte.class) {
return new ByteEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new ByteEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == short.class || csType == Short.class) {
return new ShortEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new ShortEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == int.class || csType == Integer.class) {
return new IntEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new IntEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == long.class || csType == Long.class) {
return new LongEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new LongEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == float.class || csType == Float.class) {
return new FloatEMAOperator(pair, affectingColumns, ema.control(), recorder, timeScaleUnits,
return new FloatEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder, timeScaleUnits,
columnSource, rowRedirection);
} else if (csType == double.class || csType == Double.class) {
return new DoubleEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new DoubleEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new BigDecimalEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
} else if (csType == BigInteger.class) {
return new BigIntegerEMAOperator(pair, affectingColumns, ema.control(), recorder,
return new BigIntegerEMAOperator(pair, affectingColumns, ema.controlOrDefault(), recorder,
timeScaleUnits, columnSource, rowRedirection);
}

Expand Down Expand Up @@ -285,7 +285,7 @@ private UpdateByOperator makeCumProdOperator(MatchPair fc, TableWithDefaults sou
} else if (csType == double.class || csType == Double.class) {
return new DoubleCumProdOperator(fc, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalCumProdOperator(fc, rowRedirection, control.mathContext());
return new BigDecimalCumProdOperator(fc, rowRedirection, control.mathContextOrDefault());
} else if (csType == BigInteger.class) {
return new BigIntegerCumProdOperator(fc, rowRedirection);
}
Expand Down Expand Up @@ -333,7 +333,7 @@ private UpdateByOperator makeCumSumOperator(MatchPair fc, TableWithDefaults sour
} else if (csType == double.class || csType == Double.class) {
return new DoubleCumSumOperator(fc, rowRedirection);
} else if (csType == BigDecimal.class) {
return new BigDecimalCumSumOperator(fc, rowRedirection, control.mathContext());
return new BigDecimalCumSumOperator(fc, rowRedirection, control.mathContextOrDefault());
} else if (csType == BigInteger.class) {
return new BigIntegerCumSumOperator(fc, rowRedirection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,10 @@ private class UpdateContext implements SafeCloseable {
UpdateContext(@NotNull final TableUpdate upstream,
@Nullable final ModifiedColumnSet[] inputModifiedColumnSets,
final boolean isInitializeStep) {
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacity());
final int updateSize = UpdateSizeCalculator.chunkSize(upstream, control.chunkCapacityOrDefault());

this.chunkSize = UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacity());
this.chunkSize =
UpdateSizeCalculator.chunkSize(updateSize, upstream.shifted(), control.chunkCapacityOrDefault());
this.opAffected = new boolean[operators.length];
// noinspection unchecked
this.fillContexts = new SizedSafeCloseable[operators.length];
Expand Down Expand Up @@ -413,7 +414,7 @@ private void reprocessRows(RowSetShiftData shifted) {
final RowSet sourceRowSet = source.getRowSet();
try (final RowSet indexToReprocess =
sourceRowSet.subSetByKeyRange(smallestModifiedKey, sourceRowSet.lastRowKey())) {
final int newChunkSize = (int) Math.min(control.chunkCapacity(), indexToReprocess.size());
final int newChunkSize = (int) Math.min(control.chunkCapacityOrDefault(), indexToReprocess.size());
setChunkSize(newChunkSize);

final long keyBefore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,25 +288,25 @@ void handleBadData(@NotNull final EmaContext ctx,
final boolean isNullTime) {
boolean doReset = false;
if (isNull) {
if (control.onNullValue() == BadDataBehavior.Throw) {
if (control.onNullValueOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered null value during EMA processing");
}
doReset = control.onNullValue() == BadDataBehavior.Reset;
doReset = control.onNullValueOrDefault() == BadDataBehavior.RESET;
} else if (isNan) {
if (control.onNanValue() == BadDataBehavior.Throw) {
if (control.onNanValueOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered NaN value during EMA processing");
} else if (control.onNanValue() == BadDataBehavior.Poison) {
} else if (control.onNanValueOrDefault() == BadDataBehavior.POISON) {
ctx.curVal = Double.NaN;
} else {
doReset = control.onNanValue() == BadDataBehavior.Reset;
doReset = control.onNanValueOrDefault() == BadDataBehavior.RESET;
}
}

if (isNullTime) {
if (control.onNullTime() == BadDataBehavior.Throw) {
if (control.onNullTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered null timestamp during EMA processing");
}
doReset = control.onNullTime() == BadDataBehavior.Reset;
doReset = control.onNullTimeOrDefault() == BadDataBehavior.RESET;
}

if (doReset) {
Expand All @@ -318,15 +318,15 @@ void handleBadData(@NotNull final EmaContext ctx,
void handleBadTime(@NotNull final EmaContext ctx, final long dt) {
boolean doReset = false;
if (dt == 0) {
if (control.onZeroDeltaTime() == BadDataBehavior.Throw) {
if (control.onZeroDeltaTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered zero delta time during EMA processing");
}
doReset = control.onZeroDeltaTime() == BadDataBehavior.Reset;
doReset = control.onZeroDeltaTimeOrDefault() == BadDataBehavior.RESET;
} else if (dt < 0) {
if (control.onNegativeDeltaTime() == BadDataBehavior.Throw) {
if (control.onNegativeDeltaTimeOrDefault() == BadDataBehavior.THROW) {
throw new TableDataException("Encountered negative delta time during EMA processing");
}
doReset = control.onNegativeDeltaTime() == BadDataBehavior.Reset;
doReset = control.onNegativeDeltaTimeOrDefault() == BadDataBehavior.RESET;
}

if (doReset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,9 @@ void computeWithTicks(final EmaContext ctx,
if (ctx.curVal == null) {
ctx.curVal = input;
} else {
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
.add(input.multiply(ctx.oneMinusAlpha, control.bigValueContext()),
control.bigValueContext());
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(input.multiply(ctx.oneMinusAlpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
}
}

Expand Down Expand Up @@ -90,11 +90,11 @@ void computeWithTime(final EmaContext ctx,
handleBadTime(ctx, dt);
} else {
ctx.alpha = BigDecimal.valueOf(Math.exp(-dt / timeScaleUnits));
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(input.multiply(
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()),
control.bigValueContext()),
control.bigValueContext());
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
ctx.lastStamp = timestamp;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ void computeWithTicks(final EmaContext ctx,
if(input == null) {
handleBadData(ctx, true, false);
} else {
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContext());
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContextOrDefault());
if(ctx.curVal == null) {
ctx.curVal = decimalInput;
} else {
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(decimalInput.multiply(
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()),
control.bigValueContext()),
control.bigValueContext());
BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()),
control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
}
}

Expand All @@ -86,7 +86,7 @@ void computeWithTime(final EmaContext ctx,
if(isNull || isNullTime) {
handleBadData(ctx, isNull, isNullTime);
} else {
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContext());
final BigDecimal decimalInput = new BigDecimal(input, control.bigValueContextOrDefault());
if(ctx.curVal == null) {
ctx.curVal = decimalInput;
ctx.lastStamp = timestamp;
Expand All @@ -96,9 +96,9 @@ void computeWithTime(final EmaContext ctx,
handleBadTime(ctx, dt);
} else {
ctx.alpha = BigDecimal.valueOf(Math.exp(-dt / timeScaleUnits));
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContext())
.add(decimalInput.multiply(BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContext()), control.bigValueContext()),
control.bigValueContext());
ctx.curVal = ctx.curVal.multiply(ctx.alpha, control.bigValueContextOrDefault())
.add(decimalInput.multiply(BigDecimal.ONE.subtract(ctx.alpha, control.bigValueContextOrDefault()), control.bigValueContextOrDefault()),
control.bigValueContextOrDefault());
ctx.lastStamp = timestamp;
}
}
Expand Down
Loading