Skip to content

Commit

Permalink
Clean up context usage in NanosBaseTime*ArraySource (#4225)
Browse files Browse the repository at this point in the history
  • Loading branch information
rcaudy authored Jul 25, 2023
1 parent 24282e4 commit bbc87f2
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.ChunkSource;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.WritableSourceWithPrepareForParallelPopulation;
import io.deephaven.engine.table.impl.util.ShiftData;
Expand Down Expand Up @@ -103,7 +104,14 @@ public void prepareForParallelPopulation(RowSequence rowSequence) {

// region Chunking
@Override
public void fillChunk(@NotNull ChunkSource.FillContext context, @NotNull WritableChunk<? super Values> dest,
public FillContext makeFillContext(final int chunkCapacity, final SharedContext sharedContext) {
return nanoSource.makeFillContext(chunkCapacity, sharedContext);
}

@Override
public void fillChunk(
@NotNull ChunkSource.FillContext context,
@NotNull WritableChunk<? super Values> dest,
@NotNull RowSequence rowSequence) {
nanoSource.fillChunk(context, dest, rowSequence, this::makeValue);
}
Expand All @@ -113,21 +121,7 @@ public void fillPrevChunk(
@NotNull ColumnSource.FillContext context,
@NotNull WritableChunk<? super Values> destination,
@NotNull RowSequence rowSequence) {
if (rowSequence.getAverageRunLengthEstimate() < USE_RANGES_AVERAGE_RUN_LENGTH) {
nanoSource.fillSparsePrevChunk(destination, rowSequence, this::makeValue);
} else {
nanoSource.fillPrevChunk(context, destination, rowSequence, this::makeValue);
}
}

@Override
public Chunk<Values> getChunk(@NotNull GetContext context, @NotNull RowSequence rowSequence) {
return getChunkByFilling(context, rowSequence);
}

@Override
public Chunk<Values> getPrevChunk(@NotNull GetContext context, @NotNull RowSequence rowSequence) {
return getPrevChunkByFilling(context, rowSequence);
nanoSource.fillPrevChunk(context, destination, rowSequence, this::makeValue);
}

@Override
Expand All @@ -151,6 +145,23 @@ public void fillPrevChunkUnordered(
nanoSource.fillSparsePrevChunkUnordered(dest, keys, this::makeValue);
}

@Override
public FillFromContext makeFillFromContext(final int chunkCapacity) {
return nanoSource.makeFillFromContext(chunkCapacity);
}

@Override
public void fillFromChunk(
@NotNull FillFromContext context,
@NotNull Chunk<? extends Values> src,
@NotNull RowSequence rowSequence) {
if (rowSequence.getAverageRunLengthEstimate() < USE_RANGES_AVERAGE_RUN_LENGTH) {
nanoSource.fillFromChunkByKeys(rowSequence, src, this::toNanos);
} else {
nanoSource.fillFromChunkByRanges(rowSequence, src, this::toNanos);
}
}

@Override
public void fillFromChunkUnordered(
@NotNull FillFromContext context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.chunkattributes.RowKeys;
import io.deephaven.engine.table.ColumnSource;
import io.deephaven.engine.table.SharedContext;
import io.deephaven.engine.table.WritableColumnSource;
import io.deephaven.engine.table.WritableSourceWithPrepareForParallelPopulation;
import io.deephaven.engine.table.impl.util.ShiftData;
Expand Down Expand Up @@ -114,15 +115,24 @@ public void setImmutable() {

// region Chunking
@Override
public Chunk<Values> getChunk(@NotNull GetContext context, @NotNull RowSequence rowSequence) {
return getChunkByFilling(context, rowSequence);
public FillContext makeFillContext(final int chunkCapacity, final SharedContext sharedContext) {
return nanoSource.makeFillContext(chunkCapacity, sharedContext);
}

@Override
public Chunk<Values> getPrevChunk(@NotNull GetContext context, @NotNull RowSequence rowSequence) {
return getPrevChunkByFilling(context, rowSequence);
public void fillChunk(
@NotNull FillContext context,
@NotNull WritableChunk<? super Values> destination,
@NotNull RowSequence rowSequence) {
if (rowSequence.getAverageRunLengthEstimate() < USE_RANGES_AVERAGE_RUN_LENGTH) {
nanoSource.fillByKeys(destination, rowSequence, this::makeValue);
} else {
nanoSource.fillByRanges(destination, rowSequence, this::makeValue);
}
}

// TODO (https://github.com/deephaven/deephaven-core/issues/4224): Override fillPrevChunk when suitable

@Override
public boolean providesFillUnordered() {
return true;
Expand All @@ -144,7 +154,14 @@ public void fillPrevChunkUnordered(
nanoSource.fillPrevByUnRowSequence(dest, keys, this::makeValue);
}

public void fillFromChunk(@NotNull FillFromContext context, @NotNull Chunk<? extends Values> src,
@Override
public FillFromContext makeFillFromContext(final int chunkCapacity) {
return nanoSource.makeFillFromContext(chunkCapacity);
}

public void fillFromChunk(
@NotNull FillFromContext context,
@NotNull Chunk<? extends Values> src,
@NotNull RowSequence rowSequence) {
if (rowSequence.getAverageRunLengthEstimate() < USE_RANGES_AVERAGE_RUN_LENGTH) {
nanoSource.fillFromChunkByKeys(rowSequence, src, this::toNanos);
Expand All @@ -154,21 +171,13 @@ public void fillFromChunk(@NotNull FillFromContext context, @NotNull Chunk<? ext
}

@Override
public void fillFromChunkUnordered(@NotNull FillFromContext context, @NotNull Chunk<? extends Values> src,
public void fillFromChunkUnordered(
@NotNull FillFromContext context,
@NotNull Chunk<? extends Values> src,
@NotNull LongChunk<RowKeys> keys) {
nanoSource.fillFromChunkUnordered(context, src, keys, this::toNanos);
}

@Override
public void fillChunk(@NotNull FillContext context, @NotNull WritableChunk<? super Values> dest,
@NotNull RowSequence rowSequence) {
if (rowSequence.getAverageRunLengthEstimate() < USE_RANGES_AVERAGE_RUN_LENGTH) {
nanoSource.fillByKeys(dest, rowSequence, this::makeValue);
} else {
nanoSource.fillByRanges(dest, rowSequence, this::makeValue);
}
}

@Override
public void setNull(RowSequence rowSequence) {
nanoSource.setNull(rowSequence);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
*/
package io.deephaven.engine.table.impl.select;

import io.deephaven.chunk.WritableLongChunk;
import io.deephaven.chunk.WritableObjectChunk;
import io.deephaven.chunk.LongChunk;
import io.deephaven.chunk.ObjectChunk;
import io.deephaven.chunk.attributes.Values;
import io.deephaven.engine.rowset.RowSet;
import io.deephaven.engine.rowset.RowSetFactory;
Expand Down Expand Up @@ -96,6 +96,8 @@ private QueryTable makeObjectTable(
cols.put("I", iSource);
cols.put("ZDT", zdtSource);

cols.values().forEach(ColumnSource::startTrackingPrevValues);

return new QueryTable(RowSetFactory.flat(ROW_COUNT).toTracking(), cols);
}

Expand All @@ -119,6 +121,8 @@ private QueryTable makeTable(
cols.put("I", iSource);
cols.put("ZDT", zdtSource);

cols.values().forEach(ColumnSource::startTrackingPrevValues);

return new QueryTable(RowSetFactory.flat(ROW_COUNT).toTracking(), cols);
}

Expand Down Expand Up @@ -170,7 +174,7 @@ private void testReinterpretLong(final Table initial, boolean isSorted, boolean
assertEquals(DateTimeUtils.epochNanos(baseZDT) + tOff, table.getColumnSource(zdtColName).getLong(key));
}

// Repeat the same comparisons, but actuate fillChunk instead
// Repeat the same comparisons, but actuate getChunk instead
reinterpLongChunkCheck(table.getColumnSource(lColName), table.getRowSet(), isSorted, baseLongTime);
reinterpLongChunkCheck(table.getColumnSource(iColName), table.getRowSet(), isSorted,
DateTimeUtils.epochNanos(baseInstant));
Expand All @@ -184,13 +188,16 @@ private void testReinterpretLong(final Table initial, boolean isSorted, boolean

private void reinterpLongChunkCheck(final ColumnSource<Long> cs, RowSet rowSet, final boolean isSorted,
final long baseNanos) {
try (final ChunkSource.FillContext fc = cs.makeFillContext(64);
final WritableLongChunk<Values> chunk = WritableLongChunk.makeWritableChunk(64)) {
cs.fillChunk(fc, chunk, rowSet);

for (int ii = 0; ii < chunk.size(); ii++) {
final long tOff = computeTimeDiff(ii, isSorted);
assertEquals(baseNanos + tOff, chunk.get(ii));
try (final ChunkSource.GetContext gc = cs.makeGetContext(64)) {
for (final boolean usePrev : new boolean[] {false, true}) {
final LongChunk<? extends Values> chunk = usePrev
? cs.getPrevChunk(gc, rowSet).asLongChunk()
: cs.getChunk(gc, rowSet).asLongChunk();

for (int ii = 0; ii < chunk.size(); ii++) {
final long tOff = computeTimeDiff(ii, isSorted);
assertEquals(baseNanos + tOff, chunk.get(ii));
}
}
}
}
Expand Down Expand Up @@ -255,7 +262,7 @@ private <T> void doReinterpretTestBasic(final Table initial,
extraCheck.accept((T) table.getColumnSource(zdtColName).get(key));
}

// Repeat the same comparisons, but actuate fillChunk instead
// Repeat the same comparisons, but actuate getChunk instead
reinterpBasicChunkCheck(table.getColumnSource(lColName), table.getRowSet(), toNanoFunc, isSorted,
baseLongTime, extraCheck);
reinterpBasicChunkCheck(table.getColumnSource(iColName), table.getRowSet(), toNanoFunc, isSorted,
Expand All @@ -272,14 +279,17 @@ private <T> void doReinterpretTestBasic(final Table initial,
private <T> void reinterpBasicChunkCheck(final ColumnSource<T> cs, final RowSet rowSet,
final Function<T, Long> toNanoFunc, final boolean isSorted, final long baseNanos,
final Consumer<T> extraCheck) {
try (final ChunkSource.FillContext fc = cs.makeFillContext(64);
final WritableObjectChunk<T, Values> chunk = WritableObjectChunk.makeWritableChunk(64)) {
cs.fillChunk(fc, chunk, rowSet);

for (int ii = 0; ii < chunk.size(); ii++) {
final long tOff = computeTimeDiff(ii, isSorted);
assertEquals(baseNanos + tOff, (long) toNanoFunc.apply(chunk.get(ii)));
extraCheck.accept(chunk.get(ii));
try (final ChunkSource.GetContext gc = cs.makeGetContext(64)) {
for (final boolean usePrev : new boolean[] {false, true}) {
final ObjectChunk<T, ? extends Values> chunk = usePrev
? cs.getPrevChunk(gc, rowSet).asObjectChunk()
: cs.getChunk(gc, rowSet).asObjectChunk();

for (int ii = 0; ii < chunk.size(); ii++) {
final long tOff = computeTimeDiff(ii, isSorted);
assertEquals(baseNanos + tOff, (long) toNanoFunc.apply(chunk.get(ii)));
extraCheck.accept(chunk.get(ii));
}
}
}
}
Expand Down Expand Up @@ -317,12 +327,15 @@ public void testReinterpretZdt() {

private <T> void reinterpWrappedChunkCheck(final ColumnSource<T> cs, RowSet rowSet, final boolean isSorted,
final BiFunction<Integer, Boolean, T> expectedSupplier) {
try (final ChunkSource.FillContext fc = cs.makeFillContext(64);
final WritableObjectChunk<T, Values> chunk = WritableObjectChunk.makeWritableChunk(64)) {
cs.fillChunk(fc, chunk, rowSet);

for (int ii = 0; ii < chunk.size(); ii++) {
assertEquals(expectedSupplier.apply(ii, isSorted), chunk.get(ii));
try (final ChunkSource.GetContext gc = cs.makeGetContext(64)) {
for (final boolean usePrev : new boolean[] {false, true}) {
final ObjectChunk<T, ? extends Values> chunk = usePrev
? cs.getPrevChunk(gc, rowSet).asObjectChunk()
: cs.getChunk(gc, rowSet).asObjectChunk();

for (int ii = 0; ii < chunk.size(); ii++) {
assertEquals(expectedSupplier.apply(ii, isSorted), chunk.get(ii));
}
}
}
}
Expand Down

0 comments on commit bbc87f2

Please sign in to comment.